21
21
import logging
22
22
import queue
23
23
import warnings
24
- from typing import Any , Union
24
+ from typing import Any , Union , Optional , Callable , Generator , List
25
25
26
26
27
27
from google .cloud .bigquery import _pyarrow_helpers
28
28
from google .cloud .bigquery import _versions_helpers
29
29
from google .cloud .bigquery import schema
30
30
31
+
31
32
try :
32
33
import pandas # type: ignore
33
34
@@ -75,7 +76,7 @@ def _to_wkb(v):
75
76
_to_wkb = _to_wkb ()
76
77
77
78
try :
78
- from google .cloud .bigquery_storage import ArrowSerializationOptions
79
+ from google .cloud .bigquery_storage_v1 . types import ArrowSerializationOptions
79
80
except ImportError :
80
81
_ARROW_COMPRESSION_SUPPORT = False
81
82
else :
@@ -821,18 +822,54 @@ def _nowait(futures):
821
822
822
823
823
824
def _download_table_bqstorage (
824
- project_id ,
825
- table ,
826
- bqstorage_client ,
827
- preserve_order = False ,
828
- selected_fields = None ,
829
- page_to_item = None ,
830
- max_queue_size = _MAX_QUEUE_SIZE_DEFAULT ,
831
- ):
832
- """Use (faster, but billable) BQ Storage API to construct DataFrame."""
825
+ project_id : str ,
826
+ table : Any ,
827
+ bqstorage_client : Any ,
828
+ preserve_order : bool = False ,
829
+ selected_fields : Optional [List [Any ]] = None ,
830
+ page_to_item : Optional [Callable ] = None ,
831
+ max_queue_size : Any = _MAX_QUEUE_SIZE_DEFAULT ,
832
+ max_stream_count : Optional [int ] = None ,
833
+ ) -> Generator [Any , None , None ]:
834
+ """Downloads a BigQuery table using the BigQuery Storage API.
835
+
836
+ This method uses the faster, but potentially more expensive, BigQuery
837
+ Storage API to download a table as a Pandas DataFrame. It supports
838
+ parallel downloads and optional data transformations.
839
+
840
+ Args:
841
+ project_id (str): The ID of the Google Cloud project containing
842
+ the table.
843
+ table (Any): The BigQuery table to download.
844
+ bqstorage_client (Any): An
845
+ authenticated BigQuery Storage API client.
846
+ preserve_order (bool, optional): Whether to preserve the order
847
+ of the rows as they are read from BigQuery. If True this limits
848
+ the number of streams to one and overrides `max_stream_count`.
849
+ Defaults to False.
850
+ selected_fields (Optional[List[SchemaField]]):
851
+ A list of BigQuery schema fields to select for download. If None,
852
+ all fields are downloaded. Defaults to None.
853
+ page_to_item (Optional[Callable]): An optional callable
854
+ function that takes a page of data from the BigQuery Storage API
855
+ max_stream_count (Optional[int]): The maximum number of
856
+ concurrent streams to use for downloading data. If `preserve_order`
857
+ is True, the requested streams are limited to 1 regardless of the
858
+ `max_stream_count` value. If 0 or None, then the number of
859
+ requested streams will be unbounded. Defaults to None.
860
+
861
+ Yields:
862
+ pandas.DataFrame: Pandas DataFrames, one for each chunk of data
863
+ downloaded from BigQuery.
864
+
865
+ Raises:
866
+ ValueError: If attempting to read from a specific partition or snapshot.
867
+
868
+ Note:
869
+ This method requires the `google-cloud-bigquery-storage` library
870
+ to be installed.
871
+ """
833
872
834
- # Passing a BQ Storage client in implies that the BigQuery Storage library
835
- # is available and can be imported.
836
873
from google .cloud import bigquery_storage
837
874
838
875
if "$" in table .table_id :
@@ -842,18 +879,20 @@ def _download_table_bqstorage(
842
879
if "@" in table .table_id :
843
880
raise ValueError ("Reading from a specific snapshot is not currently supported." )
844
881
845
- requested_streams = 1 if preserve_order else 0
882
+ requested_streams = determine_requested_streams ( preserve_order , max_stream_count )
846
883
847
- requested_session = bigquery_storage .types .ReadSession (
848
- table = table .to_bqstorage (), data_format = bigquery_storage .types .DataFormat .ARROW
884
+ requested_session = bigquery_storage .types .stream .ReadSession (
885
+ table = table .to_bqstorage (),
886
+ data_format = bigquery_storage .types .stream .DataFormat .ARROW ,
849
887
)
850
888
if selected_fields is not None :
851
889
for field in selected_fields :
852
890
requested_session .read_options .selected_fields .append (field .name )
853
891
854
892
if _ARROW_COMPRESSION_SUPPORT :
855
893
requested_session .read_options .arrow_serialization_options .buffer_compression = (
856
- ArrowSerializationOptions .CompressionCodec .LZ4_FRAME
894
+ # CompressionCodec(1) -> LZ4_FRAME
895
+ ArrowSerializationOptions .CompressionCodec (1 )
857
896
)
858
897
859
898
session = bqstorage_client .create_read_session (
@@ -889,7 +928,7 @@ def _download_table_bqstorage(
889
928
elif max_queue_size is None :
890
929
max_queue_size = 0 # unbounded
891
930
892
- worker_queue = queue .Queue (maxsize = max_queue_size )
931
+ worker_queue : queue . Queue [ int ] = queue .Queue (maxsize = max_queue_size )
893
932
894
933
with concurrent .futures .ThreadPoolExecutor (max_workers = total_streams ) as pool :
895
934
try :
@@ -915,7 +954,7 @@ def _download_table_bqstorage(
915
954
# we want to block on the queue's get method, instead. This
916
955
# prevents the queue from filling up, because the main thread
917
956
# has smaller gaps in time between calls to the queue's get
918
- # method. For a detailed explaination , see:
957
+ # method. For a detailed explanation , see:
919
958
# https://friendliness.dev/2019/06/18/python-nowait/
920
959
done , not_done = _nowait (not_done )
921
960
for future in done :
@@ -954,6 +993,7 @@ def download_arrow_bqstorage(
954
993
preserve_order = False ,
955
994
selected_fields = None ,
956
995
max_queue_size = _MAX_QUEUE_SIZE_DEFAULT ,
996
+ max_stream_count = None ,
957
997
):
958
998
return _download_table_bqstorage (
959
999
project_id ,
@@ -963,6 +1003,7 @@ def download_arrow_bqstorage(
963
1003
selected_fields = selected_fields ,
964
1004
page_to_item = _bqstorage_page_to_arrow ,
965
1005
max_queue_size = max_queue_size ,
1006
+ max_stream_count = max_stream_count ,
966
1007
)
967
1008
968
1009
@@ -975,6 +1016,7 @@ def download_dataframe_bqstorage(
975
1016
preserve_order = False ,
976
1017
selected_fields = None ,
977
1018
max_queue_size = _MAX_QUEUE_SIZE_DEFAULT ,
1019
+ max_stream_count = None ,
978
1020
):
979
1021
page_to_item = functools .partial (_bqstorage_page_to_dataframe , column_names , dtypes )
980
1022
return _download_table_bqstorage (
@@ -985,6 +1027,7 @@ def download_dataframe_bqstorage(
985
1027
selected_fields = selected_fields ,
986
1028
page_to_item = page_to_item ,
987
1029
max_queue_size = max_queue_size ,
1030
+ max_stream_count = max_stream_count ,
988
1031
)
989
1032
990
1033
@@ -1029,3 +1072,40 @@ def verify_pandas_imports():
1029
1072
raise ValueError (_NO_PANDAS_ERROR ) from pandas_import_exception
1030
1073
if db_dtypes is None :
1031
1074
raise ValueError (_NO_DB_TYPES_ERROR ) from db_dtypes_import_exception
1075
+
1076
+
1077
+ def determine_requested_streams (
1078
+ preserve_order : bool ,
1079
+ max_stream_count : Union [int , None ],
1080
+ ) -> int :
1081
+ """Determines the value of requested_streams based on the values of
1082
+ `preserve_order` and `max_stream_count`.
1083
+
1084
+ Args:
1085
+ preserve_order (bool): Whether to preserve the order of streams. If True,
1086
+ this limits the number of streams to one. `preserve_order` takes
1087
+ precedence over `max_stream_count`.
1088
+ max_stream_count (Union[int, None]]): The maximum number of streams
1089
+ allowed. Must be a non-negative number or None, where None indicates
1090
+ the value is unset. NOTE: if `preserve_order` is also set, it takes
1091
+ precedence over `max_stream_count`, thus to ensure that `max_stream_count`
1092
+ is used, ensure that `preserve_order` is None.
1093
+
1094
+ Returns:
1095
+ (int) The appropriate value for requested_streams.
1096
+ """
1097
+
1098
+ if preserve_order :
1099
+ # If preserve order is set, it takes precendence.
1100
+ # Limit the requested streams to 1, to ensure that order
1101
+ # is preserved)
1102
+ return 1
1103
+
1104
+ elif max_stream_count is not None :
1105
+ # If preserve_order is not set, only then do we consider max_stream_count
1106
+ if max_stream_count <= - 1 :
1107
+ raise ValueError ("max_stream_count must be non-negative OR None" )
1108
+ return max_stream_count
1109
+
1110
+ # Default to zero requested streams (unbounded).
1111
+ return 0
0 commit comments