Skip to content

Commit d225a94

Browse files
authored
perf: DB-API uses more efficient query_and_wait when no job ID is provided (#1747)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #1745 🦕
1 parent 02a7d12 commit d225a94

File tree

7 files changed

+219
-115
lines changed

7 files changed

+219
-115
lines changed

google/cloud/bigquery/_job_helpers.py

+1
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@ def do_query():
491491
job_id=query_results.job_id,
492492
query_id=query_results.query_id,
493493
project=query_results.project,
494+
num_dml_affected_rows=query_results.num_dml_affected_rows,
494495
)
495496

496497
if job_retry is not None:

google/cloud/bigquery/client.py

+6
Original file line numberDiff line numberDiff line change
@@ -3963,6 +3963,7 @@ def _list_rows_from_query_results(
39633963
timeout: TimeoutType = DEFAULT_TIMEOUT,
39643964
query_id: Optional[str] = None,
39653965
first_page_response: Optional[Dict[str, Any]] = None,
3966+
num_dml_affected_rows: Optional[int] = None,
39663967
) -> RowIterator:
39673968
"""List the rows of a completed query.
39683969
See
@@ -4007,6 +4008,10 @@ def _list_rows_from_query_results(
40074008
and not guaranteed to be populated.
40084009
first_page_response (Optional[dict]):
40094010
API response for the first page of results (if available).
4011+
num_dml_affected_rows (Optional[int]):
4012+
If this RowIterator is the result of a DML query, the number of
4013+
rows that were affected.
4014+
40104015
Returns:
40114016
google.cloud.bigquery.table.RowIterator:
40124017
Iterator of row data
@@ -4047,6 +4052,7 @@ def _list_rows_from_query_results(
40474052
job_id=job_id,
40484053
query_id=query_id,
40494054
first_page_response=first_page_response,
4055+
num_dml_affected_rows=num_dml_affected_rows,
40504056
)
40514057
return row_iterator
40524058

google/cloud/bigquery/dbapi/cursor.py

+69-53
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414

1515
"""Cursor for the Google BigQuery DB-API."""
1616

17+
from __future__ import annotations
18+
1719
import collections
1820
from collections import abc as collections_abc
19-
import copy
20-
import logging
2121
import re
22+
from typing import Optional
2223

2324
try:
2425
from google.cloud.bigquery_storage import ArrowSerializationOptions
@@ -34,8 +35,6 @@
3435
import google.cloud.exceptions # type: ignore
3536

3637

37-
_LOGGER = logging.getLogger(__name__)
38-
3938
# Per PEP 249: A 7-item sequence containing information describing one result
4039
# column. The first two items (name and type_code) are mandatory, the other
4140
# five are optional and are set to None if no meaningful values can be
@@ -76,18 +75,31 @@ def __init__(self, connection):
7675
# most appropriate size.
7776
self.arraysize = None
7877
self._query_data = None
79-
self._query_job = None
78+
self._query_rows = None
8079
self._closed = False
8180

8281
@property
83-
def query_job(self):
84-
"""google.cloud.bigquery.job.query.QueryJob: The query job created by
85-
the last ``execute*()`` call.
82+
def query_job(self) -> Optional[job.QueryJob]:
83+
"""google.cloud.bigquery.job.query.QueryJob | None: The query job
84+
created by the last ``execute*()`` call, if a query job was created.
8685
8786
.. note::
8887
If the last ``execute*()`` call was ``executemany()``, this is the
8988
last job created by ``executemany()``."""
90-
return self._query_job
89+
rows = self._query_rows
90+
91+
if rows is None:
92+
return None
93+
94+
job_id = rows.job_id
95+
project = rows.project
96+
location = rows.location
97+
client = self.connection._client
98+
99+
if job_id is None:
100+
return None
101+
102+
return client.get_job(job_id, location=location, project=project)
91103

92104
def close(self):
93105
"""Mark the cursor as closed, preventing its further use."""
@@ -117,8 +129,8 @@ def _set_description(self, schema):
117129
for field in schema
118130
)
119131

120-
def _set_rowcount(self, query_results):
121-
"""Set the rowcount from query results.
132+
def _set_rowcount(self, rows):
133+
"""Set the rowcount from a RowIterator.
122134
123135
Normally, this sets rowcount to the number of rows returned by the
124136
query, but if it was a DML statement, it sets rowcount to the number
@@ -129,10 +141,10 @@ def _set_rowcount(self, query_results):
129141
Results of a query.
130142
"""
131143
total_rows = 0
132-
num_dml_affected_rows = query_results.num_dml_affected_rows
144+
num_dml_affected_rows = rows.num_dml_affected_rows
133145

134-
if query_results.total_rows is not None and query_results.total_rows > 0:
135-
total_rows = query_results.total_rows
146+
if rows.total_rows is not None and rows.total_rows > 0:
147+
total_rows = rows.total_rows
136148
if num_dml_affected_rows is not None and num_dml_affected_rows > 0:
137149
total_rows = num_dml_affected_rows
138150
self.rowcount = total_rows
@@ -165,9 +177,10 @@ def execute(self, operation, parameters=None, job_id=None, job_config=None):
165177
parameters (Union[Mapping[str, Any], Sequence[Any]]):
166178
(Optional) dictionary or sequence of parameter values.
167179
168-
job_id (str):
169-
(Optional) The job_id to use. If not set, a job ID
170-
is generated at random.
180+
job_id (str | None):
181+
(Optional and discouraged) The job ID to use when creating
182+
the query job. For best performance and reliability, manually
183+
setting a job ID is discouraged.
171184
172185
job_config (google.cloud.bigquery.job.QueryJobConfig):
173186
(Optional) Extra configuration options for the query job.
@@ -181,7 +194,7 @@ def _execute(
181194
self, formatted_operation, parameters, job_id, job_config, parameter_types
182195
):
183196
self._query_data = None
184-
self._query_job = None
197+
self._query_results = None
185198
client = self.connection._client
186199

187200
# The DB-API uses the pyformat formatting, since the way BigQuery does
@@ -190,33 +203,35 @@ def _execute(
190203
# libraries.
191204
query_parameters = _helpers.to_query_parameters(parameters, parameter_types)
192205

193-
if client._default_query_job_config:
194-
if job_config:
195-
config = job_config._fill_from_default(client._default_query_job_config)
196-
else:
197-
config = copy.deepcopy(client._default_query_job_config)
198-
else:
199-
config = job_config or job.QueryJobConfig(use_legacy_sql=False)
200-
206+
config = job_config or job.QueryJobConfig()
201207
config.query_parameters = query_parameters
202-
self._query_job = client.query(
203-
formatted_operation, job_config=config, job_id=job_id
204-
)
205208

206-
if self._query_job.dry_run:
207-
self._set_description(schema=None)
208-
self.rowcount = 0
209-
return
210-
211-
# Wait for the query to finish.
209+
# Start the query and wait for the query to finish.
212210
try:
213-
self._query_job.result()
211+
if job_id is not None:
212+
rows = client.query(
213+
formatted_operation,
214+
job_config=job_config,
215+
job_id=job_id,
216+
).result(
217+
page_size=self.arraysize,
218+
)
219+
else:
220+
rows = client.query_and_wait(
221+
formatted_operation,
222+
job_config=config,
223+
page_size=self.arraysize,
224+
)
214225
except google.cloud.exceptions.GoogleCloudError as exc:
215226
raise exceptions.DatabaseError(exc)
216227

217-
query_results = self._query_job._query_results
218-
self._set_rowcount(query_results)
219-
self._set_description(query_results.schema)
228+
self._query_rows = rows
229+
self._set_description(rows.schema)
230+
231+
if config.dry_run:
232+
self.rowcount = 0
233+
else:
234+
self._set_rowcount(rows)
220235

221236
def executemany(self, operation, seq_of_parameters):
222237
"""Prepare and execute a database operation multiple times.
@@ -250,25 +265,26 @@ def _try_fetch(self, size=None):
250265
251266
Mutates self to indicate that iteration has started.
252267
"""
253-
if self._query_job is None:
268+
if self._query_data is not None:
269+
# Already started fetching the data.
270+
return
271+
272+
rows = self._query_rows
273+
if rows is None:
254274
raise exceptions.InterfaceError(
255275
"No query results: execute() must be called before fetch."
256276
)
257277

258-
if self._query_job.dry_run:
259-
self._query_data = iter([])
278+
bqstorage_client = self.connection._bqstorage_client
279+
if rows._should_use_bqstorage(
280+
bqstorage_client,
281+
create_bqstorage_client=False,
282+
):
283+
rows_iterable = self._bqstorage_fetch(bqstorage_client)
284+
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
260285
return
261286

262-
if self._query_data is None:
263-
bqstorage_client = self.connection._bqstorage_client
264-
265-
if bqstorage_client is not None:
266-
rows_iterable = self._bqstorage_fetch(bqstorage_client)
267-
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
268-
return
269-
270-
rows_iter = self._query_job.result(page_size=self.arraysize)
271-
self._query_data = iter(rows_iter)
287+
self._query_data = iter(rows)
272288

273289
def _bqstorage_fetch(self, bqstorage_client):
274290
"""Start fetching data with the BigQuery Storage API.
@@ -290,7 +306,7 @@ def _bqstorage_fetch(self, bqstorage_client):
290306
# bigquery_storage can indeed be imported here without errors.
291307
from google.cloud import bigquery_storage
292308

293-
table_reference = self._query_job.destination
309+
table_reference = self._query_rows._table
294310

295311
requested_session = bigquery_storage.types.ReadSession(
296312
table=table_reference.to_bqstorage(),

google/cloud/bigquery/job/query.py

+2
Original file line numberDiff line numberDiff line change
@@ -1614,6 +1614,7 @@ def do_get_result():
16141614
project=self.project,
16151615
job_id=self.job_id,
16161616
query_id=self.query_id,
1617+
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
16171618
)
16181619

16191620
# We know that there's at least 1 row, so only treat the response from
@@ -1639,6 +1640,7 @@ def do_get_result():
16391640
timeout=timeout,
16401641
query_id=self.query_id,
16411642
first_page_response=first_page_response,
1643+
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
16421644
)
16431645
rows._preserve_order = _contains_order_by(self.query)
16441646
return rows

google/cloud/bigquery/table.py

+24-8
Original file line numberDiff line numberDiff line change
@@ -1566,6 +1566,7 @@ def __init__(
15661566
job_id: Optional[str] = None,
15671567
query_id: Optional[str] = None,
15681568
project: Optional[str] = None,
1569+
num_dml_affected_rows: Optional[int] = None,
15691570
):
15701571
super(RowIterator, self).__init__(
15711572
client,
@@ -1592,6 +1593,7 @@ def __init__(
15921593
self._job_id = job_id
15931594
self._query_id = query_id
15941595
self._project = project
1596+
self._num_dml_affected_rows = num_dml_affected_rows
15951597

15961598
@property
15971599
def _billing_project(self) -> Optional[str]:
@@ -1616,6 +1618,16 @@ def location(self) -> Optional[str]:
16161618
"""
16171619
return self._location
16181620

1621+
@property
1622+
def num_dml_affected_rows(self) -> Optional[int]:
1623+
"""If this RowIterator is the result of a DML query, the number of
1624+
rows that were affected.
1625+
1626+
See:
1627+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.num_dml_affected_rows
1628+
"""
1629+
return self._num_dml_affected_rows
1630+
16191631
@property
16201632
def project(self) -> Optional[str]:
16211633
"""GCP Project ID where these rows are read from."""
@@ -1635,7 +1647,10 @@ def _is_almost_completely_cached(self):
16351647
This is useful to know, because we can avoid alternative download
16361648
mechanisms.
16371649
"""
1638-
if self._first_page_response is None:
1650+
if (
1651+
not hasattr(self, "_first_page_response")
1652+
or self._first_page_response is None
1653+
):
16391654
return False
16401655

16411656
total_cached_rows = len(self._first_page_response.get(self._items_key, []))
@@ -1655,7 +1670,7 @@ def _is_almost_completely_cached(self):
16551670

16561671
return False
16571672

1658-
def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
1673+
def _should_use_bqstorage(self, bqstorage_client, create_bqstorage_client):
16591674
"""Returns True if the BigQuery Storage API can be used.
16601675
16611676
Returns:
@@ -1669,8 +1684,9 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
16691684
if self._table is None:
16701685
return False
16711686

1672-
# The developer is manually paging through results if this is set.
1673-
if self.next_page_token is not None:
1687+
# The developer has already started paging through results if
1688+
# next_page_token is set.
1689+
if hasattr(self, "next_page_token") and self.next_page_token is not None:
16741690
return False
16751691

16761692
if self._is_almost_completely_cached():
@@ -1726,7 +1742,7 @@ def schema(self):
17261742

17271743
@property
17281744
def total_rows(self):
1729-
"""int: The total number of rows in the table."""
1745+
"""int: The total number of rows in the table or query results."""
17301746
return self._total_rows
17311747

17321748
def _maybe_warn_max_results(
@@ -1752,7 +1768,7 @@ def _maybe_warn_max_results(
17521768
def _to_page_iterable(
17531769
self, bqstorage_download, tabledata_list_download, bqstorage_client=None
17541770
):
1755-
if not self._validate_bqstorage(bqstorage_client, False):
1771+
if not self._should_use_bqstorage(bqstorage_client, False):
17561772
bqstorage_client = None
17571773

17581774
result_pages = (
@@ -1882,7 +1898,7 @@ def to_arrow(
18821898

18831899
self._maybe_warn_max_results(bqstorage_client)
18841900

1885-
if not self._validate_bqstorage(bqstorage_client, create_bqstorage_client):
1901+
if not self._should_use_bqstorage(bqstorage_client, create_bqstorage_client):
18861902
create_bqstorage_client = False
18871903
bqstorage_client = None
18881904

@@ -2223,7 +2239,7 @@ def to_dataframe(
22232239

22242240
self._maybe_warn_max_results(bqstorage_client)
22252241

2226-
if not self._validate_bqstorage(bqstorage_client, create_bqstorage_client):
2242+
if not self._should_use_bqstorage(bqstorage_client, create_bqstorage_client):
22272243
create_bqstorage_client = False
22282244
bqstorage_client = None
22292245

0 commit comments

Comments
 (0)