Skip to content

Commit 494f275

Browse files
tswastLinchin
andauthored
feat: add job_id, location, project, and query_id properties on RowIterator (#1733)
* feat: add `job_id`, `location`, `project`, and `query_id` properties on `RowIterator` These can be used to recover the original job metadata when `RowIterator` is the result of a `QueryJob`. * rename bqstorage_project to billing project * Update google/cloud/bigquery/table.py Co-authored-by: Lingqing Gan <[email protected]> --------- Co-authored-by: Lingqing Gan <[email protected]>
1 parent f804d63 commit 494f275

File tree

9 files changed

+163
-11
lines changed

9 files changed

+163
-11
lines changed

google/cloud/bigquery/client.py

+10
Original file line numberDiff line numberDiff line change
@@ -3843,6 +3843,8 @@ def list_rows(
38433843
# tables can be fetched without a column filter.
38443844
selected_fields=selected_fields,
38453845
total_rows=getattr(table, "num_rows", None),
3846+
project=table.project,
3847+
location=table.location,
38463848
)
38473849
return row_iterator
38483850

@@ -3859,6 +3861,7 @@ def _list_rows_from_query_results(
38593861
page_size: Optional[int] = None,
38603862
retry: retries.Retry = DEFAULT_RETRY,
38613863
timeout: TimeoutType = DEFAULT_TIMEOUT,
3864+
query_id: Optional[str] = None,
38623865
) -> RowIterator:
38633866
"""List the rows of a completed query.
38643867
See
@@ -3898,6 +3901,9 @@ def _list_rows_from_query_results(
38983901
would otherwise be a successful response.
38993902
If multiple requests are made under the hood, ``timeout``
39003903
applies to each individual request.
3904+
query_id (Optional[str]):
3905+
[Preview] ID of a completed query. This ID is auto-generated
3906+
and not guaranteed to be populated.
39013907
Returns:
39023908
google.cloud.bigquery.table.RowIterator:
39033909
Iterator of row data
@@ -3928,6 +3934,10 @@ def _list_rows_from_query_results(
39283934
table=destination,
39293935
extra_params=params,
39303936
total_rows=total_rows,
3937+
project=project,
3938+
location=location,
3939+
job_id=job_id,
3940+
query_id=query_id,
39313941
)
39323942
return row_iterator
39333943

google/cloud/bigquery/job/query.py

+22-2
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,15 @@ def query(self):
930930
self._properties, ["configuration", "query", "query"]
931931
)
932932

933+
@property
934+
def query_id(self) -> Optional[str]:
935+
"""[Preview] ID of a completed query.
936+
937+
This ID is auto-generated and not guaranteed to be populated.
938+
"""
939+
query_results = self._query_results
940+
return query_results.query_id if query_results is not None else None
941+
933942
@property
934943
def query_parameters(self):
935944
"""See
@@ -1525,7 +1534,12 @@ def result( # type: ignore # (complaints about the overloaded signature)
15251534
provided and the job is not retryable.
15261535
"""
15271536
if self.dry_run:
1528-
return _EmptyRowIterator()
1537+
return _EmptyRowIterator(
1538+
project=self.project,
1539+
location=self.location,
1540+
# Intentionally omit job_id and query_id since this doesn't
1541+
# actually correspond to a finished query job.
1542+
)
15291543
try:
15301544
retry_do_query = getattr(self, "_retry_do_query", None)
15311545
if retry_do_query is not None:
@@ -1594,7 +1608,12 @@ def do_get_result():
15941608
# indicate success and avoid calling tabledata.list on a table which
15951609
# can't be read (such as a view table).
15961610
if self._query_results.total_rows is None:
1597-
return _EmptyRowIterator()
1611+
return _EmptyRowIterator(
1612+
location=self.location,
1613+
project=self.project,
1614+
job_id=self.job_id,
1615+
query_id=self.query_id,
1616+
)
15981617

15991618
rows = self._client._list_rows_from_query_results(
16001619
self.job_id,
@@ -1608,6 +1627,7 @@ def do_get_result():
16081627
start_index=start_index,
16091628
retry=retry,
16101629
timeout=timeout,
1630+
query_id=self.query_id,
16111631
)
16121632
rows._preserve_order = _contains_order_by(self.query)
16131633
return rows

google/cloud/bigquery/query.py

+8
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,14 @@ def job_id(self):
911911
"""
912912
return self._properties.get("jobReference", {}).get("jobId")
913913

914+
@property
915+
def query_id(self) -> Optional[str]:
916+
"""[Preview] ID of a completed query.
917+
918+
This ID is auto-generated and not guaranteed to be populated.
919+
"""
920+
return self._properties.get("queryId")
921+
914922
@property
915923
def page_token(self):
916924
"""Token for fetching next bach of results.

google/cloud/bigquery/table.py

+46-3
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,10 @@ def __init__(
15581558
selected_fields=None,
15591559
total_rows=None,
15601560
first_page_response=None,
1561+
location: Optional[str] = None,
1562+
job_id: Optional[str] = None,
1563+
query_id: Optional[str] = None,
1564+
project: Optional[str] = None,
15611565
):
15621566
super(RowIterator, self).__init__(
15631567
client,
@@ -1575,12 +1579,51 @@ def __init__(
15751579
self._field_to_index = _helpers._field_to_index_mapping(schema)
15761580
self._page_size = page_size
15771581
self._preserve_order = False
1578-
self._project = client.project if client is not None else None
15791582
self._schema = schema
15801583
self._selected_fields = selected_fields
15811584
self._table = table
15821585
self._total_rows = total_rows
15831586
self._first_page_response = first_page_response
1587+
self._location = location
1588+
self._job_id = job_id
1589+
self._query_id = query_id
1590+
self._project = project
1591+
1592+
@property
1593+
def _billing_project(self) -> Optional[str]:
1594+
"""GCP Project ID where BQ API will bill to (if applicable)."""
1595+
client = self.client
1596+
return client.project if client is not None else None
1597+
1598+
@property
1599+
def job_id(self) -> Optional[str]:
1600+
"""ID of the query job (if applicable).
1601+
1602+
To get the job metadata, call
1603+
``job = client.get_job(rows.job_id, location=rows.location)``.
1604+
"""
1605+
return self._job_id
1606+
1607+
@property
1608+
def location(self) -> Optional[str]:
1609+
"""Location where the query executed (if applicable).
1610+
1611+
See: https://cloud.google.com/bigquery/docs/locations
1612+
"""
1613+
return self._location
1614+
1615+
@property
1616+
def project(self) -> Optional[str]:
1617+
"""GCP Project ID where these rows are read from."""
1618+
return self._project
1619+
1620+
@property
1621+
def query_id(self) -> Optional[str]:
1622+
"""[Preview] ID of a completed query.
1623+
1624+
This ID is auto-generated and not guaranteed to be populated.
1625+
"""
1626+
return self._query_id
15841627

15851628
def _is_completely_cached(self):
15861629
"""Check if all results are completely cached.
@@ -1723,7 +1766,7 @@ def to_arrow_iterable(
17231766

17241767
bqstorage_download = functools.partial(
17251768
_pandas_helpers.download_arrow_bqstorage,
1726-
self._project,
1769+
self._billing_project,
17271770
self._table,
17281771
bqstorage_client,
17291772
preserve_order=self._preserve_order,
@@ -1903,7 +1946,7 @@ def to_dataframe_iterable(
19031946
column_names = [field.name for field in self._schema]
19041947
bqstorage_download = functools.partial(
19051948
_pandas_helpers.download_dataframe_bqstorage,
1906-
self._project,
1949+
self._billing_project,
19071950
self._table,
19081951
bqstorage_client,
19091952
column_names,

tests/unit/job/test_query.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,7 @@ def test_result(self):
952952
},
953953
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
954954
"totalRows": "2",
955+
"queryId": "abc-def",
955956
}
956957
job_resource = self._make_resource(started=True, location="EU")
957958
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
@@ -980,6 +981,10 @@ def test_result(self):
980981
rows = list(result)
981982
self.assertEqual(len(rows), 1)
982983
self.assertEqual(rows[0].col1, "abc")
984+
self.assertEqual(result.job_id, self.JOB_ID)
985+
self.assertEqual(result.location, "EU")
986+
self.assertEqual(result.project, self.PROJECT)
987+
self.assertEqual(result.query_id, "abc-def")
983988
# Test that the total_rows property has changed during iteration, based
984989
# on the response from tabledata.list.
985990
self.assertEqual(result.total_rows, 1)
@@ -1023,6 +1028,12 @@ def test_result_dry_run(self):
10231028
calls = conn.api_request.mock_calls
10241029
self.assertIsInstance(result, _EmptyRowIterator)
10251030
self.assertEqual(calls, [])
1031+
self.assertEqual(result.location, "EU")
1032+
self.assertEqual(result.project, self.PROJECT)
1033+
# Intentionally omit job_id and query_id since this doesn't
1034+
# actually correspond to a finished query job.
1035+
self.assertIsNone(result.job_id)
1036+
self.assertIsNone(result.query_id)
10261037

10271038
def test_result_with_done_job_calls_get_query_results(self):
10281039
query_resource_done = {
@@ -1180,16 +1191,21 @@ def test_result_w_empty_schema(self):
11801191
"jobComplete": True,
11811192
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
11821193
"schema": {"fields": []},
1194+
"queryId": "xyz-abc",
11831195
}
11841196
connection = make_connection(query_resource, query_resource)
11851197
client = _make_client(self.PROJECT, connection=connection)
1186-
resource = self._make_resource(ended=True)
1198+
resource = self._make_resource(ended=True, location="asia-northeast1")
11871199
job = self._get_target_class().from_api_repr(resource, client)
11881200

11891201
result = job.result()
11901202

11911203
self.assertIsInstance(result, _EmptyRowIterator)
11921204
self.assertEqual(list(result), [])
1205+
self.assertEqual(result.project, self.PROJECT)
1206+
self.assertEqual(result.job_id, self.JOB_ID)
1207+
self.assertEqual(result.location, "asia-northeast1")
1208+
self.assertEqual(result.query_id, "xyz-abc")
11931209

11941210
def test_result_invokes_begins(self):
11951211
begun_resource = self._make_resource()

tests/unit/job/test_query_pandas.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
560560
[name_array, age_array], schema=arrow_schema
561561
)
562562
connection = make_connection(query_resource)
563-
client = _make_client(connection=connection)
563+
client = _make_client(connection=connection, project="bqstorage-billing-project")
564564
job = target_class.from_api_repr(resource, client)
565565
session = bigquery_storage.types.ReadSession()
566566
session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
@@ -597,7 +597,9 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
597597
**table_read_options_kwarg,
598598
)
599599
bqstorage_client.create_read_session.assert_called_once_with(
600-
parent=f"projects/{client.project}",
600+
# The billing project can differ from the data project. Make sure we
601+
# are charging to the billing project, not the data project.
602+
parent="projects/bqstorage-billing-project",
601603
read_session=expected_session,
602604
max_stream_count=0, # Use default number of streams for best performance.
603605
)
@@ -618,7 +620,7 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
618620
"schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]},
619621
}
620622
connection = make_connection(query_resource)
621-
client = _make_client(connection=connection)
623+
client = _make_client(connection=connection, project="bqstorage-billing-project")
622624
job = target_class.from_api_repr(resource, client)
623625
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
624626
session = bigquery_storage.types.ReadSession()
@@ -646,7 +648,9 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
646648
data_format=bigquery_storage.DataFormat.ARROW,
647649
)
648650
bqstorage_client.create_read_session.assert_called_once_with(
649-
parent=f"projects/{client.project}",
651+
# The billing project can differ from the data project. Make sure we
652+
# are charging to the billing project, not the data project.
653+
parent="projects/bqstorage-billing-project",
650654
read_session=expected_session,
651655
max_stream_count=0,
652656
)

tests/unit/test_client.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -6401,11 +6401,16 @@ def test_list_rows(self):
64016401
age = SchemaField("age", "INTEGER", mode="NULLABLE")
64026402
joined = SchemaField("joined", "TIMESTAMP", mode="NULLABLE")
64036403
table = Table(self.TABLE_REF, schema=[full_name, age, joined])
6404+
table._properties["location"] = "us-central1"
64046405
table._properties["numRows"] = 7
64056406

64066407
iterator = client.list_rows(table, timeout=7.5)
64076408

6408-
# Check that initial total_rows is populated from the table.
6409+
# Check that initial RowIterator is populated from the table metadata.
6410+
self.assertIsNone(iterator.job_id)
6411+
self.assertEqual(iterator.location, "us-central1")
6412+
self.assertEqual(iterator.project, table.project)
6413+
self.assertIsNone(iterator.query_id)
64096414
self.assertEqual(iterator.total_rows, 7)
64106415
page = next(iterator.pages)
64116416
rows = list(page)
@@ -6521,6 +6526,10 @@ def test_list_rows_empty_table(self):
65216526
selected_fields=[],
65226527
)
65236528

6529+
self.assertIsNone(rows.job_id)
6530+
self.assertIsNone(rows.location)
6531+
self.assertEqual(rows.project, self.TABLE_REF.project)
6532+
self.assertIsNone(rows.query_id)
65246533
# When a table reference / string and selected_fields is provided,
65256534
# total_rows can't be populated until iteration starts.
65266535
self.assertIsNone(rows.total_rows)

tests/unit/test_query.py

+10
Original file line numberDiff line numberDiff line change
@@ -1386,6 +1386,16 @@ def test_page_token_present(self):
13861386
query = self._make_one(resource)
13871387
self.assertEqual(query.page_token, "TOKEN")
13881388

1389+
def test_query_id_missing(self):
1390+
query = self._make_one(self._make_resource())
1391+
self.assertIsNone(query.query_id)
1392+
1393+
def test_query_id_present(self):
1394+
resource = self._make_resource()
1395+
resource["queryId"] = "test-query-id"
1396+
query = self._make_one(resource)
1397+
self.assertEqual(query.query_id, "test-query-id")
1398+
13891399
def test_total_rows_present_integer(self):
13901400
resource = self._make_resource()
13911401
resource["totalRows"] = 42

tests/unit/test_table.py

+32
Original file line numberDiff line numberDiff line change
@@ -2113,6 +2113,38 @@ def test_constructor_with_dict_schema(self):
21132113
]
21142114
self.assertEqual(iterator.schema, expected_schema)
21152115

2116+
def test_job_id_missing(self):
2117+
rows = self._make_one()
2118+
self.assertIsNone(rows.job_id)
2119+
2120+
def test_job_id_present(self):
2121+
rows = self._make_one(job_id="abc-123")
2122+
self.assertEqual(rows.job_id, "abc-123")
2123+
2124+
def test_location_missing(self):
2125+
rows = self._make_one()
2126+
self.assertIsNone(rows.location)
2127+
2128+
def test_location_present(self):
2129+
rows = self._make_one(location="asia-northeast1")
2130+
self.assertEqual(rows.location, "asia-northeast1")
2131+
2132+
def test_project_missing(self):
2133+
rows = self._make_one()
2134+
self.assertIsNone(rows.project)
2135+
2136+
def test_project_present(self):
2137+
rows = self._make_one(project="test-project")
2138+
self.assertEqual(rows.project, "test-project")
2139+
2140+
def test_query_id_missing(self):
2141+
rows = self._make_one()
2142+
self.assertIsNone(rows.query_id)
2143+
2144+
def test_query_id_present(self):
2145+
rows = self._make_one(query_id="xyz-987")
2146+
self.assertEqual(rows.query_id, "xyz-987")
2147+
21162148
def test_iterate(self):
21172149
from google.cloud.bigquery.schema import SchemaField
21182150

0 commit comments

Comments
 (0)