Skip to content

Commit c16e4be

Browse files
committed
feat: add Client.query_and_wait which directly returns a RowIterator of results
Set the `QUERY_PREVIEW_ENABLED=TRUE` environment variable to use this with the new JOB_CREATION_OPTIONAL mode (currently in preview).
1 parent 05d6a3e commit c16e4be

File tree

4 files changed

+181
-13
lines changed

4 files changed

+181
-13
lines changed

google/cloud/bigquery/_job_helpers.py

+124-11
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,31 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Helpers for interacting with the job REST APIs from the client."""
15+
"""Helpers for interacting with the job REST APIs from the client.
16+
17+
For queries, there are three cases to consider:
18+
19+
1. jobs.insert: This always returns a job resource.
20+
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
21+
This sometimes can return the results inline, but always includes a job ID.
22+
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
23+
This sometimes doesn't create a job at all, instead returning the results.
24+
For better debugging, a query ID is included in the response (not always a
25+
job ID).
26+
27+
Client.query() calls either (1) or (2), depending on what the user provides
28+
for the api_method parameter. query() always returns a QueryJob object, which
29+
can retry the query when the query job fails for a retriable reason.
30+
31+
Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
32+
local results from the response or may wrap a query job containing multiple
33+
pages of results. Even though query_and_wait() waits for the job to complete,
34+
we still need a separate job_retry object because there are different
35+
predicates where it is safe to generate a new query ID.
36+
"""
1637

1738
import copy
39+
import os
1840
import uuid
1941
from typing import Any, Dict, TYPE_CHECKING, Optional
2042

@@ -23,6 +45,7 @@
2345

2446
from google.cloud.bigquery import job
2547
import google.cloud.bigquery.query
48+
from google.cloud.bigquery import table
2649

2750
# Avoid circular imports
2851
if TYPE_CHECKING: # pragma: NO COVER
@@ -123,7 +146,12 @@ def do_query():
123146
return future
124147

125148

126-
def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
149+
def _to_query_request(
150+
query: str,
151+
job_config: Optional[job.QueryJobConfig],
152+
location: Optional[str],
153+
timeout: Optional[float],
154+
) -> Dict[str, Any]:
127155
"""Transform from Job resource to QueryRequest resource.
128156
129157
Most of the keys in job.configuration.query are in common with
@@ -150,6 +178,12 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any
150178
request_body.setdefault("formatOptions", {})
151179
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
152180

181+
if timeout is not None:
182+
# Subtract a buffer for context switching, network latency, etc.
183+
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
184+
request_body["location"] = location
185+
request_body["query"] = query
186+
153187
return request_body
154188

155189

@@ -207,6 +241,10 @@ def _to_query_job(
207241
return query_job
208242

209243

244+
def _to_query_path(project: str) -> str:
245+
return f"/projects/{project}/queries"
246+
247+
210248
def query_jobs_query(
211249
client: "Client",
212250
query: str,
@@ -217,18 +255,12 @@ def query_jobs_query(
217255
timeout: Optional[float],
218256
job_retry: retries.Retry,
219257
) -> job.QueryJob:
220-
"""Initiate a query using jobs.query.
258+
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
221259
222260
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
223261
"""
224-
path = f"/projects/{project}/queries"
225-
request_body = _to_query_request(job_config)
226-
227-
if timeout is not None:
228-
# Subtract a buffer for context switching, network latency, etc.
229-
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
230-
request_body["location"] = location
231-
request_body["query"] = query
262+
path = _to_query_path(project)
263+
request_body = _to_query_request(query, job_config, location, timeout)
232264

233265
def do_query():
234266
request_body["requestId"] = make_job_id()
@@ -253,3 +285,84 @@ def do_query():
253285
future._job_retry = job_retry
254286

255287
return future
288+
289+
290+
def query_and_wait(
291+
client: "Client",
292+
query: str,
293+
job_config: Optional[job.QueryJobConfig],
294+
location: Optional[str],
295+
project: str,
296+
retry: retries.Retry,
297+
timeout: Optional[float],
298+
job_retry: retries.Retry,
299+
) -> table.RowIterator:
300+
"""Initiate a query using jobs.query and waits for results.
301+
302+
While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview, use the
303+
default ``jobCreationMode`` unless the environment variable
304+
``QUERY_PREVIEW_ENABLED=true``. After ``jobCreationMode`` is GA, this
305+
method will always use ``jobCreationMode=JOB_CREATION_OPTIONAL``.
306+
307+
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
308+
"""
309+
path = _to_query_path(project)
310+
request_body = _to_query_request(query, job_config, location, timeout)
311+
312+
if os.getenv("QUERY_PREVIEW_ENABLED").casefold() == "true":
313+
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"
314+
315+
@job_retry
316+
def do_query():
317+
request_body["requestId"] = make_job_id()
318+
span_attributes = {"path": path}
319+
320+
response = client._call_api(
321+
retry,
322+
span_name="BigQuery.query",
323+
span_attributes=span_attributes,
324+
method="POST",
325+
path=path,
326+
data=request_body,
327+
timeout=timeout,
328+
)
329+
330+
# The query hasn't finished, so we expect there to be a job ID now.
331+
# Wait until the query finishes.
332+
if not response.get("jobComplete", False):
333+
return _to_query_job(client, query, job_config, response).result()
334+
335+
# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
336+
# to fetch, there will be a job ID for jobs.getQueryResults.
337+
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(response)
338+
job_id = query_results.job_id
339+
location = query_results.location
340+
rows = query_results.rows
341+
total_rows = query_results.total_rows
342+
more_pages = (
343+
job_id is not None
344+
and location is not None
345+
and len(rows) < total_rows
346+
)
347+
348+
if more_pages:
349+
# TODO(swast): Call client._list_rows_from_query_results directly
350+
# after updating RowIterator to fetch destination only if needed.
351+
return _to_query_job(client, query, job_config, response).result()
352+
353+
return table.RowIterator(
354+
client=client,
355+
api_request=client._call_api,
356+
path=None,
357+
schema=query_results.schema,
358+
# TODO(swast): Support max_results
359+
max_results=None,
360+
total_rows=total_rows,
361+
first_page_response=response,
362+
location=location,
363+
job_id=job_id,
364+
query_id=query_results.query_id,
365+
project=project,
366+
)
367+
368+
return do_query()

google/cloud/bigquery/client.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -3405,6 +3405,20 @@ def query(
34053405
else:
34063406
raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")
34073407

3408+
def query_and_wait(self) -> RowIterator:
3409+
"""[Preview] Run the query and return the results."""
3410+
maybe_job = _job_helpers.query_jobs_query(
3411+
self,
3412+
query,
3413+
job_config,
3414+
location,
3415+
project,
3416+
retry,
3417+
timeout,
3418+
job_retry,
3419+
# TODO: add no job mode
3420+
)
3421+
34083422
def insert_rows(
34093423
self,
34103424
table: Union[Table, TableReference, str],
@@ -3853,7 +3867,7 @@ def _list_rows_from_query_results(
38533867
job_id: str,
38543868
location: str,
38553869
project: str,
3856-
schema: SchemaField,
3870+
schema: Sequence[SchemaField],
38573871
total_rows: Optional[int] = None,
38583872
destination: Optional[Union[Table, TableReference, TableListItem, str]] = None,
38593873
max_results: Optional[int] = None,

google/cloud/bigquery/query.py

+12
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,18 @@ def job_id(self):
911911
"""
912912
return self._properties.get("jobReference", {}).get("jobId")
913913

914+
@property
915+
def location(self):
916+
"""Location of the query job these results are from.
917+
918+
See:
919+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.job_reference
920+
921+
Returns:
922+
str: Job ID of the query job.
923+
"""
924+
return self._properties.get("jobReference", {}).get("location")
925+
914926
@property
915927
def query_id(self) -> Optional[str]:
916928
"""[Preview] ID of a completed query.

google/cloud/bigquery/table.py

+30-1
Original file line numberDiff line numberDiff line change
@@ -1591,7 +1591,36 @@ def __init__(
15911591
self._location = location
15921592
self._job_id = job_id
15931593
self._query_id = query_id
1594-
self._project = project
1594+
1595+
if project:
1596+
self._project = project
1597+
elif client is not None:
1598+
self._project = client.project
1599+
else:
1600+
self._project = None
1601+
1602+
def project(self) -> Optional[str]:
1603+
"""GCP Project ID where these rows are read from."""
1604+
return self._project
1605+
1606+
def location(self) -> Optional[str]:
1607+
"""Location where the query executed (if applicable).
1608+
1609+
See: https://cloud.google.com/bigquery/docs/locations
1610+
"""
1611+
self._location
1612+
1613+
def job_id(self) -> Optional[str]:
1614+
"""ID of the query job (if applicable).
1615+
1616+
To get the job metadata, call
1617+
``job = client.get_job(rows.job_id, location=rows.location)``.
1618+
"""
1619+
return self._job_id
1620+
1621+
def query_id(self) -> Optional[str]:
1622+
"""ID of the stateless query (if applicable)."""
1623+
return self._query_id
15951624

15961625
@property
15971626
def _billing_project(self) -> Optional[str]:

0 commit comments

Comments
 (0)