Skip to content

Commit 1319771

Browse files
authored
Support results_queue_size parameter in make_batch_reader api (#783)
* Support results_queue_size parameter in make_batch_reader api * Test results_queue_size is propagated properly to the workers pool * Update release notes * Reformat unit test
1 parent 170b22a commit 1319771

File tree

3 files changed

+13
-1
lines changed

3 files changed

+13
-1
lines changed

docs/release-notes.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Release notes
88
Release 0.12.1 (unreleased)
99
===========================
1010
- `PR 777 <https://github.com/uber/petastorm/pull/777>`_: Remove ``LocalDiskArrowTableCache`` class as it was using deprecated pyarrow serialization API. Speed up ``LocalDiskCache`` by using the highest pickle protocol in cache serialization.
11+
- `PR 783 <https://github.com/uber/petastorm/pull/783>`_: Support results_queue_size parameter in make_batch_reader API
1112

1213
Release 0.12.0
1314
===========================

petastorm/reader.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ def make_reader(dataset_url,
205205
def make_batch_reader(dataset_url_or_urls,
206206
schema_fields=None,
207207
reader_pool_type='thread', workers_count=10,
208+
results_queue_size=50,
208209
seed=None, shuffle_rows=False,
209210
shuffle_row_groups=True, shuffle_row_drop_partitions=1,
210211
predicate=None,
@@ -243,6 +244,8 @@ def make_batch_reader(dataset_url_or_urls,
243244
denoting a thread pool, process pool, or running everything in the master thread. Defaults to 'thread'
244245
:param workers_count: An int for the number of workers to use in the reader pool. This only is used for the
245246
thread or process pool. Defaults to 10
247+
:param results_queue_size: Size of the results queue to store prefetched row-groups. Currently only applicable to
248+
thread reader pool type.
246249
:param seed: Random seed specified for shuffle and sharding with reproducible outputs. Defaults to None
247250
:param shuffle_rows: Whether to shuffle inside a single row group. Defaults to False.
248251
:param shuffle_row_groups: Whether to shuffle row groups (the order in which full row groups are read)
@@ -312,7 +315,7 @@ def make_batch_reader(dataset_url_or_urls,
312315
raise ValueError('Unknown cache_type: {}'.format(cache_type))
313316

314317
if reader_pool_type == 'thread':
315-
reader_pool = ThreadPool(workers_count)
318+
reader_pool = ThreadPool(workers_count, results_queue_size)
316319
elif reader_pool_type == 'process':
317320
serializer = ArrowTableSerializer()
318321
reader_pool = ProcessPool(workers_count, serializer, zmq_copy_buffers=zmq_copy_buffers)

petastorm/tests/test_parquet_reader.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,3 +258,11 @@ def test_random_seed(scalar_dataset):
258258
results.append(actual_row_ids)
259259
# Shuffled results are expected to be same
260260
np.testing.assert_equal(results[0], results[1])
261+
262+
263+
def test_results_queue_size_propagation_in_make_batch_reader(scalar_dataset):
264+
expected_results_queue_size = 42
265+
with make_batch_reader(scalar_dataset.url, reader_pool_type='thread',
266+
results_queue_size=expected_results_queue_size) as batch_reader:
267+
actual_results_queue_size = batch_reader._workers_pool._results_queue_size
268+
assert actual_results_queue_size == expected_results_queue_size

0 commit comments

Comments
 (0)