Skip to content

Commit 931285f

Browse files
authored
feat: add reference_file_schema_uri to LoadJobConfig, ExternalConfig (#1399)
* feat: add 'reference_file_schema_uri' to LoadJobConfig and ExternalConfig
1 parent 5d3e5d3 commit 931285f

File tree

7 files changed

+258
-5
lines changed

7 files changed

+258
-5
lines changed

google/cloud/bigquery/external_config.py

+14
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,20 @@ def hive_partitioning(self, value):
756756
prop = value.to_api_repr() if value is not None else None
757757
self._properties["hivePartitioningOptions"] = prop
758758

759+
@property
760+
def reference_file_schema_uri(self):
761+
"""Optional[str]:
762+
When creating an external table, the user can provide a reference file with the
763+
table schema. This is enabled for the following formats:
764+
765+
AVRO, PARQUET, ORC
766+
"""
767+
return self._properties.get("referenceFileSchemaUri")
768+
769+
@reference_file_schema_uri.setter
770+
def reference_file_schema_uri(self, value):
771+
self._properties["referenceFileSchemaUri"] = value
772+
759773
@property
760774
def ignore_unknown_values(self):
761775
"""bool: If :data:`True`, extra values that are not represented in the

google/cloud/bigquery/job/load.py

+21
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,20 @@ def range_partitioning(self, value):
379379
)
380380
self._set_sub_prop("rangePartitioning", resource)
381381

382+
@property
383+
def reference_file_schema_uri(self):
384+
"""Optional[str]:
385+
When creating an external table, the user can provide a reference file with the
386+
table schema. This is enabled for the following formats:
387+
388+
AVRO, PARQUET, ORC
389+
"""
390+
return self._get_sub_prop("referenceFileSchemaUri")
391+
392+
@reference_file_schema_uri.setter
393+
def reference_file_schema_uri(self, value):
394+
return self._set_sub_prop("referenceFileSchemaUri", value)
395+
382396
@property
383397
def schema(self):
384398
"""Optional[Sequence[Union[ \
@@ -651,6 +665,13 @@ def quote_character(self):
651665
"""
652666
return self._configuration.quote_character
653667

668+
@property
669+
def reference_file_schema_uri(self):
670+
"""See:
671+
attr:`google.cloud.bigquery.job.LoadJobConfig.reference_file_schema_uri`.
672+
"""
673+
return self._configuration.reference_file_schema_uri
674+
654675
@property
655676
def skip_leading_rows(self):
656677
"""See

testing/constraints-3.7.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ python-dateutil==2.7.3
2525
requests==2.21.0
2626
Shapely==1.6.4.post2
2727
six==1.13.0
28-
tqdm==4.7.4
28+
tqdm==4.7.4

tests/system/test_client.py

+203
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,20 @@
9797
),
9898
]
9999

100+
SOURCE_URIS_AVRO = [
101+
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.avro",
102+
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.avro",
103+
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter.avro",
104+
]
105+
SOURCE_URIS_PARQUET = [
106+
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet",
107+
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.parquet",
108+
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter.parquet",
109+
]
110+
REFERENCE_FILE_SCHEMA_URI_AVRO = "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.avro"
111+
REFERENCE_FILE_SCHEMA_URI_PARQUET = "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet"
112+
113+
100114
# The VPC-SC team maintains a mirror of the GCS bucket used for code
101115
# samples. The public bucket crosses the configured security boundary.
102116
# See: https://github.com/googleapis/google-cloud-python/issues/8550
@@ -1052,6 +1066,195 @@ def test_load_table_from_file_w_explicit_location(self):
10521066
table_ref, "gs://{}/letters-us.csv".format(bucket_name), location="US"
10531067
).result()
10541068

1069+
def test_create_external_table_with_reference_file_schema_uri_avro(self):
1070+
client = Config.CLIENT
1071+
dataset_id = _make_dataset_id("external_reference_file_avro")
1072+
self.temp_dataset(dataset_id)
1073+
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
1074+
table_id = "test_ref_file_avro"
1075+
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)
1076+
1077+
expected_schema = [
1078+
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
1079+
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
1080+
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
1081+
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
1082+
]
1083+
1084+
# By default, the table should have the c-twitter schema because it is lexicographically last
1085+
# in the `SOURCE_URIs` list:
1086+
# a-twitter schema: (username, tweet, timestamp, likes)
1087+
# b-twitter schema: (username, tweet, timestamp)
1088+
# c-twitter schema: (username, tweet)
1089+
1090+
# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema
1091+
1092+
# Create external data configuration
1093+
external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.AVRO)
1094+
external_config.source_uris = SOURCE_URIS_AVRO
1095+
external_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_AVRO
1096+
1097+
table = bigquery.Table(table_ref)
1098+
table.external_data_configuration = external_config
1099+
1100+
table = client.create_table(table)
1101+
1102+
# Get table created by the create_table API call
1103+
generated_table = client.get_table(table_ref)
1104+
1105+
self.assertEqual(generated_table.schema, expected_schema)
1106+
self.assertEqual(
1107+
generated_table.external_data_configuration._properties[
1108+
"referenceFileSchemaUri"
1109+
],
1110+
REFERENCE_FILE_SCHEMA_URI_AVRO,
1111+
)
1112+
1113+
# Clean up test
1114+
self.to_delete.insert(0, generated_table)
1115+
1116+
def test_load_table_from_uri_with_reference_file_schema_uri_avro(self):
1117+
dataset_id = _make_dataset_id("test_reference_file_avro")
1118+
self.temp_dataset(dataset_id)
1119+
client = Config.CLIENT
1120+
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
1121+
table_id = "test_ref_file_avro"
1122+
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)
1123+
1124+
expected_schema = [
1125+
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
1126+
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
1127+
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
1128+
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
1129+
]
1130+
1131+
# By default, the table should have the c-twitter schema because it is lexicographically last
1132+
# in the `SOURCE_URIS` list:
1133+
# a-twitter schema: (username, tweet, timestamp, likes)
1134+
# b-twitter schema: (username, tweet, timestamp)
1135+
# c-twitter schema: (username, tweet)
1136+
1137+
# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema
1138+
1139+
# Create load job configuration
1140+
load_job_config = bigquery.LoadJobConfig(
1141+
source_format=bigquery.SourceFormat.AVRO
1142+
)
1143+
load_job_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_AVRO
1144+
1145+
load_job = client.load_table_from_uri(
1146+
source_uris=SOURCE_URIS_AVRO,
1147+
destination=table_ref,
1148+
job_config=load_job_config,
1149+
)
1150+
# Wait for load job to complete
1151+
result = load_job.result()
1152+
1153+
# Get table created by the load job
1154+
generated_table = client.get_table(table_ref)
1155+
self.assertEqual(generated_table.schema, expected_schema)
1156+
self.assertEqual(
1157+
result._properties["configuration"]["load"]["referenceFileSchemaUri"],
1158+
REFERENCE_FILE_SCHEMA_URI_AVRO,
1159+
)
1160+
1161+
# Clean up test
1162+
self.to_delete.insert(0, generated_table)
1163+
1164+
def test_create_external_table_with_reference_file_schema_uri_parquet(self):
1165+
client = Config.CLIENT
1166+
dataset_id = _make_dataset_id("external_table_ref_file_parquet")
1167+
self.temp_dataset(dataset_id)
1168+
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
1169+
table_id = "test_ref_file_parquet"
1170+
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)
1171+
1172+
expected_schema = [
1173+
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
1174+
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
1175+
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
1176+
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
1177+
]
1178+
1179+
# By default, the table should have the c-twitter schema because it is lexicographically last
1180+
# in the `SOURCE_URIS` list:
1181+
# a-twitter schema: (username, tweet, timestamp, likes)
1182+
# b-twitter schema: (username, tweet, timestamp)
1183+
# c-twitter schema: (username, tweet)
1184+
1185+
# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema
1186+
1187+
# Create external data configuration
1188+
external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.PARQUET)
1189+
external_config.source_uris = SOURCE_URIS_PARQUET
1190+
external_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_PARQUET
1191+
1192+
table = bigquery.Table(table_ref)
1193+
table.external_data_configuration = external_config
1194+
1195+
table = client.create_table(table)
1196+
1197+
# Get table created by the create_table API call
1198+
generated_table = client.get_table(table_ref)
1199+
self.assertEqual(generated_table.schema, expected_schema)
1200+
self.assertEqual(
1201+
generated_table.external_data_configuration._properties[
1202+
"referenceFileSchemaUri"
1203+
],
1204+
REFERENCE_FILE_SCHEMA_URI_PARQUET,
1205+
)
1206+
1207+
# Clean up test
1208+
self.to_delete.insert(0, generated_table)
1209+
1210+
def test_load_table_from_uri_with_reference_file_schema_uri_parquet(self):
1211+
dataset_id = _make_dataset_id("test_reference_file_parquet")
1212+
self.temp_dataset(dataset_id)
1213+
client = Config.CLIENT
1214+
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
1215+
table_id = "test_ref_file_parquet"
1216+
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)
1217+
1218+
expected_schema = [
1219+
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
1220+
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
1221+
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
1222+
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
1223+
]
1224+
1225+
# By default, the table should have the c-twitter schema because it is lexicographically last
1226+
# in the `SOURCE_URIS` list:
1227+
# a-twitter schema: (username, tweet, timestamp, likes)
1228+
# b-twitter schema: (username, tweet, timestamp)
1229+
# c-twitter schema: (username, tweet)
1230+
1231+
# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema
1232+
1233+
# Create load job configuration
1234+
load_job_config = bigquery.LoadJobConfig(
1235+
source_format=bigquery.SourceFormat.PARQUET
1236+
)
1237+
load_job_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_PARQUET
1238+
1239+
load_job = client.load_table_from_uri(
1240+
source_uris=SOURCE_URIS_PARQUET,
1241+
destination=table_ref,
1242+
job_config=load_job_config,
1243+
)
1244+
# Wait for load job to complete
1245+
result = load_job.result()
1246+
1247+
# Get table created by the load job
1248+
generated_table = client.get_table(table_ref)
1249+
self.assertEqual(generated_table.schema, expected_schema)
1250+
self.assertEqual(
1251+
result._properties["configuration"]["load"]["referenceFileSchemaUri"],
1252+
REFERENCE_FILE_SCHEMA_URI_PARQUET,
1253+
)
1254+
1255+
# Clean up test
1256+
self.to_delete.insert(0, generated_table)
1257+
10551258
def _write_csv_to_storage(self, bucket_name, blob_name, header_row, data_rows):
10561259
from google.cloud._testing import _NamedTemporaryFile
10571260

tests/unit/job/test_base.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,6 @@ def test_result_default_wo_state(self):
943943
conn = make_connection(
944944
_make_retriable_exception(),
945945
begun_job_resource,
946-
_make_retriable_exception(),
947946
done_job_resource,
948947
)
949948
client = _make_client(project=self.PROJECT, connection=conn)
@@ -963,9 +962,7 @@ def test_result_default_wo_state(self):
963962
query_params={"location": "US"},
964963
timeout=None,
965964
)
966-
conn.api_request.assert_has_calls(
967-
[begin_call, begin_call, reload_call, reload_call]
968-
)
965+
conn.api_request.assert_has_calls([begin_call, begin_call, reload_call])
969966

970967
def test_result_w_retry_wo_state(self):
971968
begun_job_resource = _make_job_resource(

tests/unit/job/test_load.py

+12
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def _setUpConstants(self):
3737
self.INPUT_BYTES = 12345
3838
self.OUTPUT_BYTES = 23456
3939
self.OUTPUT_ROWS = 345
40+
self.REFERENCE_FILE_SCHEMA_URI = "gs://path/to/reference"
4041

4142
def _make_resource(self, started=False, ended=False):
4243
resource = super(TestLoadJob, self)._make_resource(started, ended)
@@ -47,6 +48,7 @@ def _make_resource(self, started=False, ended=False):
4748
"datasetId": self.DS_ID,
4849
"tableId": self.TABLE_ID,
4950
}
51+
config["referenceFileSchemaUri"] = self.REFERENCE_FILE_SCHEMA_URI
5052

5153
if ended:
5254
resource["status"] = {"state": "DONE"}
@@ -136,6 +138,12 @@ def _verifyResourceProperties(self, job, resource):
136138
self.assertEqual(str(job.skip_leading_rows), config["skipLeadingRows"])
137139
else:
138140
self.assertIsNone(job.skip_leading_rows)
141+
if "referenceFileSchemaUri" in config:
142+
self.assertEqual(
143+
job.reference_file_schema_uri, config["referenceFileSchemaUri"]
144+
)
145+
else:
146+
self.assertIsNone(job.reference_file_schema_uri)
139147

140148
if "destinationEncryptionConfiguration" in config:
141149
self.assertIsNotNone(job.destination_encryption_configuration)
@@ -186,6 +194,7 @@ def test_ctor(self):
186194
self.assertIsNone(job.use_avro_logical_types)
187195
self.assertIsNone(job.clustering_fields)
188196
self.assertIsNone(job.schema_update_options)
197+
self.assertIsNone(job.reference_file_schema_uri)
189198

190199
def test_ctor_w_config(self):
191200
from google.cloud.bigquery.schema import SchemaField
@@ -461,6 +470,7 @@ def test_begin_w_bound_client(self):
461470
"datasetId": self.DS_ID,
462471
"tableId": self.TABLE_ID,
463472
},
473+
"referenceFileSchemaUri": self.REFERENCE_FILE_SCHEMA_URI,
464474
}
465475
},
466476
},
@@ -503,6 +513,7 @@ def test_begin_w_autodetect(self):
503513
"datasetId": self.DS_ID,
504514
"tableId": self.TABLE_ID,
505515
},
516+
"referenceFileSchemaUri": self.REFERENCE_FILE_SCHEMA_URI,
506517
"autodetect": True,
507518
}
508519
},
@@ -585,6 +596,7 @@ def test_begin_w_alternate_client(self):
585596
config.use_avro_logical_types = True
586597
config.write_disposition = WriteDisposition.WRITE_TRUNCATE
587598
config.schema_update_options = [SchemaUpdateOption.ALLOW_FIELD_ADDITION]
599+
config.reference_file_schema_uri = "gs://path/to/reference"
588600
with mock.patch(
589601
"google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes"
590602
) as final_attributes:

tests/unit/test_external_config.py

+6
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ def test_connection_id(self):
9999
ec.connection_id = "path/to/connection"
100100
self.assertEqual(ec.connection_id, "path/to/connection")
101101

102+
def test_reference_file_schema_uri(self):
103+
ec = external_config.ExternalConfig("")
104+
self.assertIsNone(ec.reference_file_schema_uri)
105+
ec.reference_file_schema_uri = "path/to/reference"
106+
self.assertEqual(ec.reference_file_schema_uri, "path/to/reference")
107+
102108
def test_schema_None(self):
103109
ec = external_config.ExternalConfig("")
104110
ec.schema = None

0 commit comments

Comments
 (0)