File tree Expand file tree Collapse file tree 3 files changed +28
-8
lines changed Expand file tree Collapse file tree 3 files changed +28
-8
lines changed Original file line number Diff line number Diff line change @@ -517,9 +517,14 @@ def _materialize_local(
517
517
) -> Tuple [pd .DataFrame , bigquery .QueryJob ]:
518
518
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
519
519
# TODO(swast): Allow for dry run and timeout.
520
- results_iterator , query_job = self .session ._execute (
521
- self .expr , sorted = materialize_options .ordered
520
+ _ , query_job = self .session ._query_to_destination (
521
+ self .session ._to_sql (self .expr , sorted = True ),
522
+ list (self .index_columns ),
523
+ api_name = "cached" ,
524
+ do_clustering = False ,
522
525
)
526
+ results_iterator = query_job .result ()
527
+
523
528
table_size = (
524
529
self .session ._get_table_size (query_job .destination ) / _BYTES_TO_MEGABYTES
525
530
)
Original file line number Diff line number Diff line change @@ -430,7 +430,8 @@ def _query_to_destination(
430
430
index_cols : List [str ],
431
431
api_name : str ,
432
432
configuration : dict = {"query" : {"useQueryCache" : True }},
433
- ) -> Tuple [Optional [bigquery .TableReference ], Optional [bigquery .QueryJob ]]:
433
+ do_clustering = True ,
434
+ ) -> Tuple [Optional [bigquery .TableReference ], bigquery .QueryJob ]:
434
435
# If a dry_run indicates this is not a query type job, then don't
435
436
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
436
437
dry_run_config = bigquery .QueryJobConfig ()
@@ -444,11 +445,14 @@ def _query_to_destination(
444
445
# internal issue 303057336.
445
446
# Since we have a `statement_type == 'SELECT'`, schema should be populated.
446
447
schema = typing .cast (Iterable [bigquery .SchemaField ], dry_run_job .schema )
447
- cluster_cols = [
448
- item .name
449
- for item in schema
450
- if (item .name in index_cols ) and _can_cluster_bq (item )
451
- ][:_MAX_CLUSTER_COLUMNS ]
448
+ if do_clustering :
449
+ cluster_cols = [
450
+ item .name
451
+ for item in schema
452
+ if (item .name in index_cols ) and _can_cluster_bq (item )
453
+ ][:_MAX_CLUSTER_COLUMNS ]
454
+ else :
455
+ cluster_cols = []
452
456
temp_table = self ._create_empty_temp_table (schema , cluster_cols )
453
457
454
458
timeout_ms = configuration .get ("jobTimeoutMs" ) or configuration ["query" ].get (
Original file line number Diff line number Diff line change @@ -90,3 +90,14 @@ def test_to_pandas_batches_large_table():
90
90
del df
91
91
92
92
assert row_count == expected_row_count
93
+
94
+
95
+ def test_to_pandas_large_table ():
96
+ df = bpd .read_gbq ("load_testing.scalars_10gb" )
97
+ # df will be downloaded locally
98
+ expected_row_count , expected_column_count = df .shape
99
+
100
+ df = df .to_pandas ()
101
+ row_count , column_count = df .shape
102
+ assert column_count == expected_column_count
103
+ assert row_count == expected_row_count
You can’t perform that action at this time.
0 commit comments