Skip to content

Commit d461297

Browse files
kien-truongLinchin
andauthored
feat: support setting max_stream_count when fetching query result (#2051)
* feat: support setting max_stream_count when fetching query result Allow user to set max_stream_count when fetching result using BigQuery Storage API with RowIterator's incremental methods: * to_arrow_iterable * to_dataframe_iterable * docs: update docs about max_stream_count for ordered query * fix: add max_stream_count params to _EmptyRowIterator's methods * test: add tests for RowIterator's max_stream_count parameter * docs: add notes on valid max_stream_count range in docstring * use a different way to iterate result --------- Co-authored-by: Lingqing Gan <[email protected]>
1 parent fffe6ba commit d461297

File tree

2 files changed

+114
-0
lines changed

2 files changed

+114
-0
lines changed

google/cloud/bigquery/table.py

+44
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,7 @@ def to_arrow_iterable(
18121812
self,
18131813
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
18141814
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
1815+
max_stream_count: Optional[int] = None,
18151816
) -> Iterator["pyarrow.RecordBatch"]:
18161817
"""[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream.
18171818
@@ -1836,6 +1837,22 @@ def to_arrow_iterable(
18361837
created by the server. If ``max_queue_size`` is :data:`None`, the queue
18371838
size is infinite.
18381839
1840+
max_stream_count (Optional[int]):
1841+
The maximum number of parallel download streams when
1842+
using BigQuery Storage API. Ignored if
1843+
BigQuery Storage API is not used.
1844+
1845+
This setting also has no effect if the query result
1846+
is deterministically ordered with ORDER BY,
1847+
in which case, the number of download stream is always 1.
1848+
1849+
If set to 0 or None (the default), the number of download
1850+
streams is determined by BigQuery the server. However, this behaviour
1851+
can require a lot of memory to store temporary download result,
1852+
especially with very large queries. In that case,
1853+
setting this parameter value to a value > 0 can help
1854+
reduce system resource consumption.
1855+
18391856
Returns:
18401857
pyarrow.RecordBatch:
18411858
A generator of :class:`~pyarrow.RecordBatch`.
@@ -1852,6 +1869,7 @@ def to_arrow_iterable(
18521869
preserve_order=self._preserve_order,
18531870
selected_fields=self._selected_fields,
18541871
max_queue_size=max_queue_size,
1872+
max_stream_count=max_stream_count,
18551873
)
18561874
tabledata_list_download = functools.partial(
18571875
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
@@ -1978,6 +1996,7 @@ def to_dataframe_iterable(
19781996
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
19791997
dtypes: Optional[Dict[str, Any]] = None,
19801998
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
1999+
max_stream_count: Optional[int] = None,
19812000
) -> "pandas.DataFrame":
19822001
"""Create an iterable of pandas DataFrames, to process the table as a stream.
19832002
@@ -2008,6 +2027,22 @@ def to_dataframe_iterable(
20082027
20092028
.. versionadded:: 2.14.0
20102029
2030+
max_stream_count (Optional[int]):
2031+
The maximum number of parallel download streams when
2032+
using BigQuery Storage API. Ignored if
2033+
BigQuery Storage API is not used.
2034+
2035+
This setting also has no effect if the query result
2036+
is deterministically ordered with ORDER BY,
2037+
in which case, the number of download stream is always 1.
2038+
2039+
If set to 0 or None (the default), the number of download
2040+
streams is determined by BigQuery the server. However, this behaviour
2041+
can require a lot of memory to store temporary download result,
2042+
especially with very large queries. In that case,
2043+
setting this parameter value to a value > 0 can help
2044+
reduce system resource consumption.
2045+
20112046
Returns:
20122047
pandas.DataFrame:
20132048
A generator of :class:`~pandas.DataFrame`.
@@ -2034,6 +2069,7 @@ def to_dataframe_iterable(
20342069
preserve_order=self._preserve_order,
20352070
selected_fields=self._selected_fields,
20362071
max_queue_size=max_queue_size,
2072+
max_stream_count=max_stream_count,
20372073
)
20382074
tabledata_list_download = functools.partial(
20392075
_pandas_helpers.download_dataframe_row_iterator,
@@ -2690,6 +2726,7 @@ def to_dataframe_iterable(
26902726
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
26912727
dtypes: Optional[Dict[str, Any]] = None,
26922728
max_queue_size: Optional[int] = None,
2729+
max_stream_count: Optional[int] = None,
26932730
) -> Iterator["pandas.DataFrame"]:
26942731
"""Create an iterable of pandas DataFrames, to process the table as a stream.
26952732
@@ -2705,6 +2742,9 @@ def to_dataframe_iterable(
27052742
max_queue_size:
27062743
Ignored. Added for compatibility with RowIterator.
27072744
2745+
max_stream_count:
2746+
Ignored. Added for compatibility with RowIterator.
2747+
27082748
Returns:
27092749
An iterator yielding a single empty :class:`~pandas.DataFrame`.
27102750
@@ -2719,6 +2759,7 @@ def to_arrow_iterable(
27192759
self,
27202760
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
27212761
max_queue_size: Optional[int] = None,
2762+
max_stream_count: Optional[int] = None,
27222763
) -> Iterator["pyarrow.RecordBatch"]:
27232764
"""Create an iterable of pandas DataFrames, to process the table as a stream.
27242765
@@ -2731,6 +2772,9 @@ def to_arrow_iterable(
27312772
max_queue_size:
27322773
Ignored. Added for compatibility with RowIterator.
27332774
2775+
max_stream_count:
2776+
Ignored. Added for compatibility with RowIterator.
2777+
27342778
Returns:
27352779
An iterator yielding a single empty :class:`~pyarrow.RecordBatch`.
27362780
"""

tests/unit/test_table.py

+70
Original file line numberDiff line numberDiff line change
@@ -5822,3 +5822,73 @@ def test_table_reference_to_bqstorage_v1_stable(table_path):
58225822
for klass in (mut.TableReference, mut.Table, mut.TableListItem):
58235823
got = klass.from_string(table_path).to_bqstorage()
58245824
assert got == expected
5825+
5826+
5827+
@pytest.mark.parametrize("preserve_order", [True, False])
5828+
def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order):
5829+
pytest.importorskip("pandas")
5830+
pytest.importorskip("google.cloud.bigquery_storage")
5831+
from google.cloud.bigquery import schema
5832+
from google.cloud.bigquery import table as mut
5833+
from google.cloud import bigquery_storage
5834+
5835+
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
5836+
session = bigquery_storage.types.ReadSession()
5837+
bqstorage_client.create_read_session.return_value = session
5838+
5839+
row_iterator = mut.RowIterator(
5840+
_mock_client(),
5841+
api_request=None,
5842+
path=None,
5843+
schema=[
5844+
schema.SchemaField("colA", "INTEGER"),
5845+
],
5846+
table=mut.TableReference.from_string("proj.dset.tbl"),
5847+
)
5848+
row_iterator._preserve_order = preserve_order
5849+
5850+
max_stream_count = 132
5851+
result_iterable = row_iterator.to_arrow_iterable(
5852+
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
5853+
)
5854+
list(result_iterable)
5855+
bqstorage_client.create_read_session.assert_called_once_with(
5856+
parent=mock.ANY,
5857+
read_session=mock.ANY,
5858+
max_stream_count=max_stream_count if not preserve_order else 1,
5859+
)
5860+
5861+
5862+
@pytest.mark.parametrize("preserve_order", [True, False])
5863+
def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order):
5864+
pytest.importorskip("pandas")
5865+
pytest.importorskip("google.cloud.bigquery_storage")
5866+
from google.cloud.bigquery import schema
5867+
from google.cloud.bigquery import table as mut
5868+
from google.cloud import bigquery_storage
5869+
5870+
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
5871+
session = bigquery_storage.types.ReadSession()
5872+
bqstorage_client.create_read_session.return_value = session
5873+
5874+
row_iterator = mut.RowIterator(
5875+
_mock_client(),
5876+
api_request=None,
5877+
path=None,
5878+
schema=[
5879+
schema.SchemaField("colA", "INTEGER"),
5880+
],
5881+
table=mut.TableReference.from_string("proj.dset.tbl"),
5882+
)
5883+
row_iterator._preserve_order = preserve_order
5884+
5885+
max_stream_count = 132
5886+
result_iterable = row_iterator.to_dataframe_iterable(
5887+
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
5888+
)
5889+
list(result_iterable)
5890+
bqstorage_client.create_read_session.assert_called_once_with(
5891+
parent=mock.ANY,
5892+
read_session=mock.ANY,
5893+
max_stream_count=max_stream_count if not preserve_order else 1,
5894+
)

0 commit comments

Comments
 (0)