Skip to content

Commit 93d47e7

Browse files
committed
feat: add Client.query_and_wait which directly returns a RowIterator of results
Set the `QUERY_PREVIEW_ENABLED=TRUE` environment variable to use this with the new JOB_CREATION_OPTIONAL mode (currently in preview).
1 parent 58b3152 commit 93d47e7

File tree

3 files changed

+137
-12
lines changed

3 files changed

+137
-12
lines changed

google/cloud/bigquery/_job_helpers.py

+86-11
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,37 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Helpers for interacting with the job REST APIs from the client."""
15+
"""Helpers for interacting with the job REST APIs from the client.
16+
17+
For queries, there are three cases to consider:
18+
19+
1. jobs.insert: This always returns a job resource.
20+
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
21+
This sometimes can return the results inline, but always includes a job ID.
22+
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
23+
This sometimes doesn't create a job at all, instead returning the results.
24+
25+
Client.query() calls either (1) or (2), depending on what the user provides
26+
for the api_method parameter. query() always returns a QueryJob object, which
27+
can retry the query when the query job fails for a retriable reason.
28+
29+
Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
30+
local results from the response or may wrap a query job containing multiple
31+
pages of results. Even though query_and_wait() waits for the job to complete,
32+
we still need a separate job_retry object because there are different
33+
predicates where it is safe to generate a new query ID.
34+
"""
1635

1736
import copy
37+
import os
1838
import uuid
1939
from typing import Any, Dict, TYPE_CHECKING, Optional
2040

2141
import google.api_core.exceptions as core_exceptions
2242
from google.api_core import retry as retries
2343

2444
from google.cloud.bigquery import job
45+
from google.cloud.bigquery import table
2546

2647
# Avoid circular imports
2748
if TYPE_CHECKING: # pragma: NO COVER
@@ -122,7 +143,12 @@ def do_query():
122143
return future
123144

124145

125-
def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
146+
def _to_query_request(
147+
query: str,
148+
job_config: Optional[job.QueryJobConfig],
149+
location: Optional[str],
150+
timeout: Optional[float],
151+
) -> Dict[str, Any]:
126152
"""Transform from Job resource to QueryRequest resource.
127153
128154
Most of the keys in job.configuration.query are in common with
@@ -149,6 +175,12 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any
149175
request_body.setdefault("formatOptions", {})
150176
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
151177

178+
if timeout is not None:
179+
# Subtract a buffer for context switching, network latency, etc.
180+
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
181+
request_body["location"] = location
182+
request_body["query"] = query
183+
152184
return request_body
153185

154186

@@ -211,6 +243,10 @@ def _to_query_job(
211243
return query_job
212244

213245

246+
def _to_query_path(project: str) -> str:
247+
return f"/projects/{project}/queries"
248+
249+
214250
def query_jobs_query(
215251
client: "Client",
216252
query: str,
@@ -221,18 +257,12 @@ def query_jobs_query(
221257
timeout: Optional[float],
222258
job_retry: retries.Retry,
223259
) -> job.QueryJob:
224-
"""Initiate a query using jobs.query.
260+
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
225261
226262
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
227263
"""
228-
path = f"/projects/{project}/queries"
229-
request_body = _to_query_request(job_config)
230-
231-
if timeout is not None:
232-
# Subtract a buffer for context switching, network latency, etc.
233-
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
234-
request_body["location"] = location
235-
request_body["query"] = query
264+
path = _to_query_path(project)
265+
request_body = _to_query_request(query, job_config, location, timeout)
236266

237267
def do_query():
238268
request_body["requestId"] = make_job_id()
@@ -257,3 +287,48 @@ def do_query():
257287
future._job_retry = job_retry
258288

259289
return future
290+
291+
292+
def query_and_wait(
293+
client: "Client",
294+
query: str,
295+
job_config: Optional[job.QueryJobConfig],
296+
location: Optional[str],
297+
project: str,
298+
retry: retries.Retry,
299+
timeout: Optional[float],
300+
job_retry: retries.Retry,
301+
) -> table.RowIterator:
302+
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
303+
304+
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
305+
"""
306+
path = _to_query_path(project)
307+
request_body = _to_query_request(query, job_config, location, timeout)
308+
309+
if os.getenv("QUERY_PREVIEW_ENABLED").casefold() == "true":
310+
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"
311+
312+
@job_retry
313+
def do_query():
314+
request_body["requestId"] = make_job_id()
315+
span_attributes = {"path": path}
316+
return client._call_api(
317+
retry,
318+
span_name="BigQuery.query",
319+
span_attributes=span_attributes,
320+
method="POST",
321+
path=path,
322+
data=request_body,
323+
timeout=timeout,
324+
)
325+
326+
results_or_not = do_query()
327+
328+
# The future might be in a failed state now, but if it's
329+
# unrecoverable, we'll find out when we ask for it's result, at which
330+
# point, we may retry.
331+
future._retry_do_query = do_query # in case we have to retry later
332+
future._job_retry = job_retry
333+
334+
return future

google/cloud/bigquery/client.py

+14
Original file line numberDiff line numberDiff line change
@@ -3405,6 +3405,20 @@ def query(
34053405
else:
34063406
raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")
34073407

3408+
def query_and_wait(self) -> RowIterator:
3409+
"""[Preview] Run the query and return the results."""
3410+
maybe_job = _job_helpers.query_jobs_query(
3411+
self,
3412+
query,
3413+
job_config,
3414+
location,
3415+
project,
3416+
retry,
3417+
timeout,
3418+
job_retry,
3419+
# TODO: add no job mode
3420+
)
3421+
34083422
def insert_rows(
34093423
self,
34103424
table: Union[Table, TableReference, str],

google/cloud/bigquery/table.py

+37-1
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,10 @@ def __init__(
15581558
selected_fields=None,
15591559
total_rows=None,
15601560
first_page_response=None,
1561+
location: Optional[str] = None,
1562+
job_id: Optional[str] = None,
1563+
query_id: Optional[str] = None,
1564+
project: Optional[str] = None,
15611565
):
15621566
super(RowIterator, self).__init__(
15631567
client,
@@ -1575,12 +1579,44 @@ def __init__(
15751579
self._field_to_index = _helpers._field_to_index_mapping(schema)
15761580
self._page_size = page_size
15771581
self._preserve_order = False
1578-
self._project = client.project if client is not None else None
15791582
self._schema = schema
15801583
self._selected_fields = selected_fields
15811584
self._table = table
15821585
self._total_rows = total_rows
15831586
self._first_page_response = first_page_response
1587+
self._location = location
1588+
self._job_id = job_id
1589+
self._query_id = query_id
1590+
1591+
if project:
1592+
self._project = project
1593+
elif client is not None:
1594+
self._project = client.project
1595+
else:
1596+
self._project = None
1597+
1598+
def project(self) -> Optional[str]:
1599+
"""GCP Project ID where these rows are read from."""
1600+
return self._project
1601+
1602+
def location(self) -> Optional[str]:
1603+
"""Location where the query executed (if applicable).
1604+
1605+
See: https://cloud.google.com/bigquery/docs/locations
1606+
"""
1607+
self._location
1608+
1609+
def job_id(self) -> Optional[str]:
1610+
"""ID of the query job (if applicable).
1611+
1612+
To get the job metadata, call
1613+
``job = client.get_job(rows.job_id, location=rows.location)``.
1614+
"""
1615+
return self._job_id
1616+
1617+
def query_id(self) -> Optional[str]:
1618+
"""ID of the stateless query (if applicable)."""
1619+
return self._query_id
15841620

15851621
def _is_completely_cached(self):
15861622
"""Check if all results are completely cached.

0 commit comments

Comments
 (0)