@@ -210,6 +210,9 @@ class Client(ClientWithProject):
210
210
default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
211
211
Default ``QueryJobConfig``.
212
212
Will be merged into job configs passed into the ``query`` method.
213
+ default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
214
+ Default ``LoadJobConfig``.
215
+ Will be merged into job configs passed into the ``load_table_*`` methods.
213
216
client_info (Optional[google.api_core.client_info.ClientInfo]):
214
217
The client info used to send a user-agent string along with API
215
218
requests. If ``None``, then default info will be used. Generally,
@@ -235,6 +238,7 @@ def __init__(
235
238
_http = None ,
236
239
location = None ,
237
240
default_query_job_config = None ,
241
+ default_load_job_config = None ,
238
242
client_info = None ,
239
243
client_options = None ,
240
244
) -> None :
@@ -260,6 +264,7 @@ def __init__(
260
264
self ._connection = Connection (self , ** kw_args )
261
265
self ._location = location
262
266
self ._default_query_job_config = copy .deepcopy (default_query_job_config )
267
+ self ._default_load_job_config = copy .deepcopy (default_load_job_config )
263
268
264
269
@property
265
270
def location (self ):
@@ -277,6 +282,17 @@ def default_query_job_config(self):
277
282
def default_query_job_config (self , value : QueryJobConfig ):
278
283
self ._default_query_job_config = copy .deepcopy (value )
279
284
285
+ @property
286
+ def default_load_job_config (self ):
287
+ """Default ``LoadJobConfig``.
288
+ Will be merged into job configs passed into the ``load_table_*`` methods.
289
+ """
290
+ return self ._default_load_job_config
291
+
292
+ @default_load_job_config .setter
293
+ def default_load_job_config (self , value : LoadJobConfig ):
294
+ self ._default_load_job_config = copy .deepcopy (value )
295
+
280
296
def close (self ):
281
297
"""Close the underlying transport objects, releasing system resources.
282
298
@@ -1976,15 +1992,8 @@ def create_job(
1976
1992
)
1977
1993
destination = _get_sub_prop (job_config , ["copy" , "destinationTable" ])
1978
1994
destination = TableReference .from_api_repr (destination )
1979
- sources = []
1980
- source_configs = _get_sub_prop (job_config , ["copy" , "sourceTables" ])
1981
- if source_configs is None :
1982
- source_configs = [_get_sub_prop (job_config , ["copy" , "sourceTable" ])]
1983
- for source_config in source_configs :
1984
- table_ref = TableReference .from_api_repr (source_config )
1985
- sources .append (table_ref )
1986
1995
return self .copy_table (
1987
- sources ,
1996
+ [], # Source table(s) already in job_config resource.
1988
1997
destination ,
1989
1998
job_config = typing .cast (CopyJobConfig , copy_job_config ),
1990
1999
retry = retry ,
@@ -2337,8 +2346,8 @@ def load_table_from_uri(
2337
2346
2338
2347
Raises:
2339
2348
TypeError:
2340
- If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2341
- class.
2349
+ If ``job_config`` is not an instance of
2350
+ :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2342
2351
"""
2343
2352
job_id = _make_job_id (job_id , job_id_prefix )
2344
2353
@@ -2355,11 +2364,14 @@ def load_table_from_uri(
2355
2364
2356
2365
destination = _table_arg_to_table_ref (destination , default_project = self .project )
2357
2366
2358
- if job_config :
2359
- job_config = copy .deepcopy (job_config )
2360
- _verify_job_config_type (job_config , google .cloud .bigquery .job .LoadJobConfig )
2367
+ if job_config is not None :
2368
+ _verify_job_config_type (job_config , LoadJobConfig )
2369
+ else :
2370
+ job_config = job .LoadJobConfig ()
2361
2371
2362
- load_job = job .LoadJob (job_ref , source_uris , destination , self , job_config )
2372
+ new_job_config = job_config ._fill_from_default (self ._default_load_job_config )
2373
+
2374
+ load_job = job .LoadJob (job_ref , source_uris , destination , self , new_job_config )
2363
2375
load_job ._begin (retry = retry , timeout = timeout )
2364
2376
2365
2377
return load_job
@@ -2431,8 +2443,8 @@ def load_table_from_file(
2431
2443
mode.
2432
2444
2433
2445
TypeError:
2434
- If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2435
- class.
2446
+ If ``job_config`` is not an instance of
2447
+ :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2436
2448
"""
2437
2449
job_id = _make_job_id (job_id , job_id_prefix )
2438
2450
@@ -2444,10 +2456,15 @@ def load_table_from_file(
2444
2456
2445
2457
destination = _table_arg_to_table_ref (destination , default_project = self .project )
2446
2458
job_ref = job ._JobReference (job_id , project = project , location = location )
2447
- if job_config :
2448
- job_config = copy .deepcopy (job_config )
2449
- _verify_job_config_type (job_config , google .cloud .bigquery .job .LoadJobConfig )
2450
- load_job = job .LoadJob (job_ref , None , destination , self , job_config )
2459
+
2460
+ if job_config is not None :
2461
+ _verify_job_config_type (job_config , LoadJobConfig )
2462
+ else :
2463
+ job_config = job .LoadJobConfig ()
2464
+
2465
+ new_job_config = job_config ._fill_from_default (self ._default_load_job_config )
2466
+
2467
+ load_job = job .LoadJob (job_ref , None , destination , self , new_job_config )
2451
2468
job_resource = load_job .to_api_repr ()
2452
2469
2453
2470
if rewind :
@@ -2571,43 +2588,40 @@ def load_table_from_dataframe(
2571
2588
If a usable parquet engine cannot be found. This method
2572
2589
requires :mod:`pyarrow` to be installed.
2573
2590
TypeError:
2574
- If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2575
- class.
2591
+ If ``job_config`` is not an instance of
2592
+ :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2576
2593
"""
2577
2594
job_id = _make_job_id (job_id , job_id_prefix )
2578
2595
2579
- if job_config :
2580
- _verify_job_config_type (job_config , google .cloud .bigquery .job .LoadJobConfig )
2581
- # Make a copy so that the job config isn't modified in-place.
2582
- job_config_properties = copy .deepcopy (job_config ._properties )
2583
- job_config = job .LoadJobConfig ()
2584
- job_config ._properties = job_config_properties
2585
-
2596
+ if job_config is not None :
2597
+ _verify_job_config_type (job_config , LoadJobConfig )
2586
2598
else :
2587
2599
job_config = job .LoadJobConfig ()
2588
2600
2601
+ new_job_config = job_config ._fill_from_default (self ._default_load_job_config )
2602
+
2589
2603
supported_formats = {job .SourceFormat .CSV , job .SourceFormat .PARQUET }
2590
- if job_config .source_format is None :
2604
+ if new_job_config .source_format is None :
2591
2605
# default value
2592
- job_config .source_format = job .SourceFormat .PARQUET
2606
+ new_job_config .source_format = job .SourceFormat .PARQUET
2593
2607
2594
2608
if (
2595
- job_config .source_format == job .SourceFormat .PARQUET
2596
- and job_config .parquet_options is None
2609
+ new_job_config .source_format == job .SourceFormat .PARQUET
2610
+ and new_job_config .parquet_options is None
2597
2611
):
2598
2612
parquet_options = ParquetOptions ()
2599
2613
# default value
2600
2614
parquet_options .enable_list_inference = True
2601
- job_config .parquet_options = parquet_options
2615
+ new_job_config .parquet_options = parquet_options
2602
2616
2603
- if job_config .source_format not in supported_formats :
2617
+ if new_job_config .source_format not in supported_formats :
2604
2618
raise ValueError (
2605
2619
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported" .format (
2606
- job_config .source_format
2620
+ new_job_config .source_format
2607
2621
)
2608
2622
)
2609
2623
2610
- if pyarrow is None and job_config .source_format == job .SourceFormat .PARQUET :
2624
+ if pyarrow is None and new_job_config .source_format == job .SourceFormat .PARQUET :
2611
2625
# pyarrow is now the only supported parquet engine.
2612
2626
raise ValueError ("This method requires pyarrow to be installed" )
2613
2627
@@ -2618,8 +2632,8 @@ def load_table_from_dataframe(
2618
2632
# schema, and check if dataframe schema is compatible with it - except
2619
2633
# for WRITE_TRUNCATE jobs, the existing schema does not matter then.
2620
2634
if (
2621
- not job_config .schema
2622
- and job_config .write_disposition != job .WriteDisposition .WRITE_TRUNCATE
2635
+ not new_job_config .schema
2636
+ and new_job_config .write_disposition != job .WriteDisposition .WRITE_TRUNCATE
2623
2637
):
2624
2638
try :
2625
2639
table = self .get_table (destination )
@@ -2630,7 +2644,7 @@ def load_table_from_dataframe(
2630
2644
name
2631
2645
for name , _ in _pandas_helpers .list_columns_and_indexes (dataframe )
2632
2646
)
2633
- job_config .schema = [
2647
+ new_job_config .schema = [
2634
2648
# Field description and policy tags are not needed to
2635
2649
# serialize a data frame.
2636
2650
SchemaField (
@@ -2644,11 +2658,11 @@ def load_table_from_dataframe(
2644
2658
if field .name in columns_and_indexes
2645
2659
]
2646
2660
2647
- job_config .schema = _pandas_helpers .dataframe_to_bq_schema (
2648
- dataframe , job_config .schema
2661
+ new_job_config .schema = _pandas_helpers .dataframe_to_bq_schema (
2662
+ dataframe , new_job_config .schema
2649
2663
)
2650
2664
2651
- if not job_config .schema :
2665
+ if not new_job_config .schema :
2652
2666
# the schema could not be fully detected
2653
2667
warnings .warn (
2654
2668
"Schema could not be detected for all columns. Loading from a "
@@ -2659,13 +2673,13 @@ def load_table_from_dataframe(
2659
2673
)
2660
2674
2661
2675
tmpfd , tmppath = tempfile .mkstemp (
2662
- suffix = "_job_{}.{}" .format (job_id [:8 ], job_config .source_format .lower ())
2676
+ suffix = "_job_{}.{}" .format (job_id [:8 ], new_job_config .source_format .lower ())
2663
2677
)
2664
2678
os .close (tmpfd )
2665
2679
2666
2680
try :
2667
2681
2668
- if job_config .source_format == job .SourceFormat .PARQUET :
2682
+ if new_job_config .source_format == job .SourceFormat .PARQUET :
2669
2683
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS :
2670
2684
msg = (
2671
2685
"Loading dataframe data in PARQUET format with pyarrow "
@@ -2676,13 +2690,13 @@ def load_table_from_dataframe(
2676
2690
)
2677
2691
warnings .warn (msg , category = RuntimeWarning )
2678
2692
2679
- if job_config .schema :
2693
+ if new_job_config .schema :
2680
2694
if parquet_compression == "snappy" : # adjust the default value
2681
2695
parquet_compression = parquet_compression .upper ()
2682
2696
2683
2697
_pandas_helpers .dataframe_to_parquet (
2684
2698
dataframe ,
2685
- job_config .schema ,
2699
+ new_job_config .schema ,
2686
2700
tmppath ,
2687
2701
parquet_compression = parquet_compression ,
2688
2702
parquet_use_compliant_nested_type = True ,
@@ -2722,7 +2736,7 @@ def load_table_from_dataframe(
2722
2736
job_id_prefix = job_id_prefix ,
2723
2737
location = location ,
2724
2738
project = project ,
2725
- job_config = job_config ,
2739
+ job_config = new_job_config ,
2726
2740
timeout = timeout ,
2727
2741
)
2728
2742
@@ -2798,22 +2812,22 @@ def load_table_from_json(
2798
2812
2799
2813
Raises:
2800
2814
TypeError:
2801
- If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2802
- class.
2815
+ If ``job_config`` is not an instance of
2816
+ :class:`~google.cloud.bigquery.job.LoadJobConfig` class.
2803
2817
"""
2804
2818
job_id = _make_job_id (job_id , job_id_prefix )
2805
2819
2806
- if job_config :
2807
- _verify_job_config_type (job_config , google .cloud .bigquery .job .LoadJobConfig )
2808
- # Make a copy so that the job config isn't modified in-place.
2809
- job_config = copy .deepcopy (job_config )
2820
+ if job_config is not None :
2821
+ _verify_job_config_type (job_config , LoadJobConfig )
2810
2822
else :
2811
2823
job_config = job .LoadJobConfig ()
2812
2824
2813
- job_config .source_format = job .SourceFormat .NEWLINE_DELIMITED_JSON
2825
+ new_job_config = job_config ._fill_from_default (self ._default_load_job_config )
2826
+
2827
+ new_job_config .source_format = job .SourceFormat .NEWLINE_DELIMITED_JSON
2814
2828
2815
- if job_config .schema is None :
2816
- job_config .autodetect = True
2829
+ if new_job_config .schema is None :
2830
+ new_job_config .autodetect = True
2817
2831
2818
2832
if project is None :
2819
2833
project = self .project
@@ -2835,7 +2849,7 @@ def load_table_from_json(
2835
2849
job_id_prefix = job_id_prefix ,
2836
2850
location = location ,
2837
2851
project = project ,
2838
- job_config = job_config ,
2852
+ job_config = new_job_config ,
2839
2853
timeout = timeout ,
2840
2854
)
2841
2855
0 commit comments