Skip to content

Commit fe58f61

Browse files
fix: mutations batcher race condition (#896)
1 parent 4f050aa commit fe58f61

File tree

3 files changed

+104
-52
lines changed

3 files changed

+104
-52
lines changed

google/cloud/bigtable/batcher.py

Lines changed: 67 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,19 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT
5353
self.flush_count = flush_count
5454

5555
def get(self):
56-
"""Retrieve an item from the queue. Recalculate queue size."""
57-
row = self._queue.get()
58-
mutation_size = row.get_mutations_size()
59-
self.total_mutation_count -= len(row._get_mutations())
60-
self.total_size -= mutation_size
61-
return row
56+
"""
57+
Retrieve an item from the queue. Recalculate queue size.
58+
59+
If the queue is empty, return None.
60+
"""
61+
try:
62+
row = self._queue.get_nowait()
63+
mutation_size = row.get_mutations_size()
64+
self.total_mutation_count -= len(row._get_mutations())
65+
self.total_size -= mutation_size
66+
return row
67+
except queue.Empty:
68+
return None
6269

6370
def put(self, item):
6471
"""Insert an item to the queue. Recalculate queue size."""
@@ -79,9 +86,6 @@ def full(self):
7986
return True
8087
return False
8188

82-
def empty(self):
83-
return self._queue.empty()
84-
8589

8690
@dataclass
8791
class _BatchInfo:
@@ -292,8 +296,10 @@ def flush(self):
292296
* :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
293297
"""
294298
rows_to_flush = []
295-
while not self._rows.empty():
296-
rows_to_flush.append(self._rows.get())
299+
row = self._rows.get()
300+
while row is not None:
301+
rows_to_flush.append(row)
302+
row = self._rows.get()
297303
response = self._flush_rows(rows_to_flush)
298304
return response
299305

@@ -303,58 +309,68 @@ def _flush_async(self):
303309
:raises:
304310
* :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
305311
"""
306-
307-
rows_to_flush = []
308-
mutations_count = 0
309-
mutations_size = 0
310-
rows_count = 0
311-
batch_info = _BatchInfo()
312-
313-
while not self._rows.empty():
314-
row = self._rows.get()
315-
mutations_count += len(row._get_mutations())
316-
mutations_size += row.get_mutations_size()
317-
rows_count += 1
318-
rows_to_flush.append(row)
319-
batch_info.mutations_count = mutations_count
320-
batch_info.rows_count = rows_count
321-
batch_info.mutations_size = mutations_size
322-
323-
if (
324-
rows_count >= self.flush_count
325-
or mutations_size >= self.max_row_bytes
326-
or mutations_count >= self.flow_control.max_mutations
327-
or mutations_size >= self.flow_control.max_mutation_bytes
328-
or self._rows.empty() # submit when it reached the end of the queue
312+
next_row = self._rows.get()
313+
while next_row is not None:
314+
# start a new batch
315+
rows_to_flush = [next_row]
316+
batch_info = _BatchInfo(
317+
mutations_count=len(next_row._get_mutations()),
318+
rows_count=1,
319+
mutations_size=next_row.get_mutations_size(),
320+
)
321+
# fill up batch with rows
322+
next_row = self._rows.get()
323+
while next_row is not None and self._row_fits_in_batch(
324+
next_row, batch_info
329325
):
330-
# wait for resources to become available, before submitting any new batch
331-
self.flow_control.wait()
332-
# once unblocked, submit a batch
333-
# event flag will be set by control_flow to block subsequent thread, but not blocking this one
334-
self.flow_control.control_flow(batch_info)
335-
future = self._executor.submit(self._flush_rows, rows_to_flush)
336-
self.futures_mapping[future] = batch_info
337-
future.add_done_callback(self._batch_completed_callback)
338-
339-
# reset and start a new batch
340-
rows_to_flush = []
341-
mutations_size = 0
342-
rows_count = 0
343-
mutations_count = 0
344-
batch_info = _BatchInfo()
326+
rows_to_flush.append(next_row)
327+
batch_info.mutations_count += len(next_row._get_mutations())
328+
batch_info.rows_count += 1
329+
batch_info.mutations_size += next_row.get_mutations_size()
330+
next_row = self._rows.get()
331+
# send batch over network
332+
# wait for resources to become available
333+
self.flow_control.wait()
334+
# once unblocked, submit the batch
335+
# event flag will be set by control_flow to block subsequent thread, but not blocking this one
336+
self.flow_control.control_flow(batch_info)
337+
future = self._executor.submit(self._flush_rows, rows_to_flush)
338+
# schedule release of resources from flow control
339+
self.futures_mapping[future] = batch_info
340+
future.add_done_callback(self._batch_completed_callback)
345341

346342
def _batch_completed_callback(self, future):
347343
"""Callback for when the mutation has finished to clean up the current batch
348344
and release items from the flow controller.
349-
350345
Raise exceptions if there's any.
351346
Release the resources locked by the flow control and allow enqueued tasks to be run.
352347
"""
353-
354348
processed_rows = self.futures_mapping[future]
355349
self.flow_control.release(processed_rows)
356350
del self.futures_mapping[future]
357351

352+
def _row_fits_in_batch(self, row, batch_info):
353+
"""Checks if a row can fit in the current batch.
354+
355+
:type row: class
356+
:param row: :class:`~google.cloud.bigtable.row.DirectRow`.
357+
358+
:type batch_info: :class:`_BatchInfo`
359+
:param batch_info: Information about the current batch.
360+
361+
:rtype: bool
362+
:returns: True if the row can fit in the current batch.
363+
"""
364+
new_rows_count = batch_info.rows_count + 1
365+
new_mutations_count = batch_info.mutations_count + len(row._get_mutations())
366+
new_mutations_size = batch_info.mutations_size + row.get_mutations_size()
367+
return (
368+
new_rows_count <= self.flush_count
369+
and new_mutations_size <= self.max_row_bytes
370+
and new_mutations_count <= self.flow_control.max_mutations
371+
and new_mutations_size <= self.flow_control.max_mutation_bytes
372+
)
373+
358374
def _flush_rows(self, rows_to_flush):
359375
"""Mutate the specified rows.
360376

tests/system/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def location_id():
5858

5959
@pytest.fixture(scope="session")
6060
def serve_nodes():
61-
return 3
61+
return 1
6262

6363

6464
@pytest.fixture(scope="session")

tests/system/test_data_api.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,3 +381,39 @@ def test_access_with_non_admin_client(data_client, data_instance_id, data_table_
381381
instance = data_client.instance(data_instance_id)
382382
table = instance.table(data_table_id)
383383
assert table.read_row("nonesuch") is None # no raise
384+
385+
386+
def test_mutations_batcher_threading(data_table, rows_to_delete):
387+
"""
388+
Test the mutations batcher by sending a bunch of mutations using different
389+
flush methods
390+
"""
391+
import mock
392+
import time
393+
from google.cloud.bigtable.batcher import MutationsBatcher
394+
395+
num_sent = 20
396+
all_results = []
397+
398+
def callback(results):
399+
all_results.extend(results)
400+
401+
# override flow control max elements
402+
with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", 2):
403+
with MutationsBatcher(
404+
data_table,
405+
flush_count=5,
406+
flush_interval=0.07,
407+
batch_completed_callback=callback,
408+
) as batcher:
409+
# send mutations in a way that timed flushes and count flushes interleave
410+
for i in range(num_sent):
411+
row = data_table.direct_row("row{}".format(i))
412+
row.set_cell(
413+
COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")
414+
)
415+
rows_to_delete.append(row)
416+
batcher.mutate(row)
417+
time.sleep(0.01)
418+
# ensure all mutations were sent
419+
assert len(all_results) == num_sent

0 commit comments

Comments
 (0)