13
13
# limitations under the License.
14
14
15
15
import itertools
16
- import json
17
16
import logging
18
17
import time
19
18
import unittest
@@ -2271,26 +2270,26 @@ def test_to_dataframe_w_bqstorage_logs_session(self):
2271
2270
@unittest .skipIf (
2272
2271
bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
2273
2272
)
2273
+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
2274
2274
def test_to_dataframe_w_bqstorage_empty_streams (self ):
2275
2275
from google .cloud .bigquery import schema
2276
2276
from google .cloud .bigquery import table as mut
2277
2277
from google .cloud .bigquery_storage_v1beta1 import reader
2278
2278
2279
+ arrow_fields = [
2280
+ pyarrow .field ("colA" , pyarrow .int64 ()),
2281
+ # Not alphabetical to test column order.
2282
+ pyarrow .field ("colC" , pyarrow .float64 ()),
2283
+ pyarrow .field ("colB" , pyarrow .utf8 ()),
2284
+ ]
2285
+ arrow_schema = pyarrow .schema (arrow_fields )
2286
+
2279
2287
bqstorage_client = mock .create_autospec (
2280
2288
bigquery_storage_v1beta1 .BigQueryStorageClient
2281
2289
)
2282
2290
session = bigquery_storage_v1beta1 .types .ReadSession (
2283
- streams = [{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" }]
2284
- )
2285
- session .avro_schema .schema = json .dumps (
2286
- {
2287
- "fields" : [
2288
- {"name" : "colA" },
2289
- # Not alphabetical to test column order.
2290
- {"name" : "colC" },
2291
- {"name" : "colB" },
2292
- ]
2293
- }
2291
+ streams = [{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" }],
2292
+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2294
2293
)
2295
2294
bqstorage_client .create_read_session .return_value = session
2296
2295
@@ -2327,11 +2326,20 @@ def test_to_dataframe_w_bqstorage_empty_streams(self):
2327
2326
@unittest .skipIf (
2328
2327
bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
2329
2328
)
2329
+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
2330
2330
def test_to_dataframe_w_bqstorage_nonempty (self ):
2331
2331
from google .cloud .bigquery import schema
2332
2332
from google .cloud .bigquery import table as mut
2333
2333
from google .cloud .bigquery_storage_v1beta1 import reader
2334
2334
2335
+ arrow_fields = [
2336
+ pyarrow .field ("colA" , pyarrow .int64 ()),
2337
+ # Not alphabetical to test column order.
2338
+ pyarrow .field ("colC" , pyarrow .float64 ()),
2339
+ pyarrow .field ("colB" , pyarrow .utf8 ()),
2340
+ ]
2341
+ arrow_schema = pyarrow .schema (arrow_fields )
2342
+
2335
2343
bqstorage_client = mock .create_autospec (
2336
2344
bigquery_storage_v1beta1 .BigQueryStorageClient
2337
2345
)
@@ -2340,16 +2348,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
2340
2348
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
2341
2349
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
2342
2350
]
2343
- session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
2344
- session .avro_schema .schema = json .dumps (
2345
- {
2346
- "fields" : [
2347
- {"name" : "colA" },
2348
- # Not alphabetical to test column order.
2349
- {"name" : "colC" },
2350
- {"name" : "colB" },
2351
- ]
2352
- }
2351
+ session = bigquery_storage_v1beta1 .types .ReadSession (
2352
+ streams = streams ,
2353
+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2353
2354
)
2354
2355
bqstorage_client .create_read_session .return_value = session
2355
2356
@@ -2400,17 +2401,23 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
2400
2401
@unittest .skipIf (
2401
2402
bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
2402
2403
)
2404
+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
2403
2405
def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index (self ):
2404
2406
from google .cloud .bigquery import schema
2405
2407
from google .cloud .bigquery import table as mut
2406
2408
from google .cloud .bigquery_storage_v1beta1 import reader
2407
2409
2410
+ arrow_fields = [pyarrow .field ("colA" , pyarrow .int64 ())]
2411
+ arrow_schema = pyarrow .schema (arrow_fields )
2412
+
2408
2413
streams = [
2409
2414
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
2410
2415
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
2411
2416
]
2412
- session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
2413
- session .avro_schema .schema = json .dumps ({"fields" : [{"name" : "colA" }]})
2417
+ session = bigquery_storage_v1beta1 .types .ReadSession (
2418
+ streams = streams ,
2419
+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2420
+ )
2414
2421
2415
2422
bqstorage_client = mock .create_autospec (
2416
2423
bigquery_storage_v1beta1 .BigQueryStorageClient
@@ -2448,6 +2455,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):
2448
2455
bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
2449
2456
)
2450
2457
@unittest .skipIf (tqdm is None , "Requires `tqdm`" )
2458
+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
2451
2459
@mock .patch ("tqdm.tqdm" )
2452
2460
def test_to_dataframe_w_bqstorage_updates_progress_bar (self , tqdm_mock ):
2453
2461
from google .cloud .bigquery import schema
@@ -2457,6 +2465,9 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
2457
2465
# Speed up testing.
2458
2466
mut ._PROGRESS_INTERVAL = 0.01
2459
2467
2468
+ arrow_fields = [pyarrow .field ("testcol" , pyarrow .int64 ())]
2469
+ arrow_schema = pyarrow .schema (arrow_fields )
2470
+
2460
2471
bqstorage_client = mock .create_autospec (
2461
2472
bigquery_storage_v1beta1 .BigQueryStorageClient
2462
2473
)
@@ -2466,8 +2477,10 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
2466
2477
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
2467
2478
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
2468
2479
]
2469
- session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
2470
- session .avro_schema .schema = json .dumps ({"fields" : [{"name" : "testcol" }]})
2480
+ session = bigquery_storage_v1beta1 .types .ReadSession (
2481
+ streams = streams ,
2482
+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2483
+ )
2471
2484
bqstorage_client .create_read_session .return_value = session
2472
2485
2473
2486
mock_rowstream = mock .create_autospec (reader .ReadRowsStream )
@@ -2521,6 +2534,7 @@ def blocking_to_dataframe(*args, **kwargs):
2521
2534
@unittest .skipIf (
2522
2535
bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
2523
2536
)
2537
+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
2524
2538
def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt (self ):
2525
2539
from google .cloud .bigquery import schema
2526
2540
from google .cloud .bigquery import table as mut
@@ -2529,6 +2543,14 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
2529
2543
# Speed up testing.
2530
2544
mut ._PROGRESS_INTERVAL = 0.01
2531
2545
2546
+ arrow_fields = [
2547
+ pyarrow .field ("colA" , pyarrow .int64 ()),
2548
+ # Not alphabetical to test column order.
2549
+ pyarrow .field ("colC" , pyarrow .float64 ()),
2550
+ pyarrow .field ("colB" , pyarrow .utf8 ()),
2551
+ ]
2552
+ arrow_schema = pyarrow .schema (arrow_fields )
2553
+
2532
2554
bqstorage_client = mock .create_autospec (
2533
2555
bigquery_storage_v1beta1 .BigQueryStorageClient
2534
2556
)
@@ -2539,10 +2561,8 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
2539
2561
# ends early.
2540
2562
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
2541
2563
{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
2542
- ]
2543
- )
2544
- session .avro_schema .schema = json .dumps (
2545
- {"fields" : [{"name" : "colA" }, {"name" : "colB" }, {"name" : "colC" }]}
2564
+ ],
2565
+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2546
2566
)
2547
2567
bqstorage_client .create_read_session .return_value = session
2548
2568
0 commit comments