Skip to content

Commit 89a647e

Browse files
tswastparthea
andauthored
feat: add Client.query_and_wait which directly returns a RowIterator of results (#1722)
* perf: use the first page a results when `query(api_method="QUERY")` * add tests * respect max_results with cached page * respect page_size, also avoid bqstorage if almost fully downloaded * skip true test if bqstorage not installed * coverage * feat: add `Client.query_and_wait` which directly returns a `RowIterator` of results Set the `QUERY_PREVIEW_ENABLED=TRUE` environment variable to use this with the new JOB_CREATION_OPTIONAL mode (currently in preview). * implement basic query_and_wait and add code sample to test * avoid duplicated QueryJob construction * update unit tests * fix merge conflict in rowiterator * support max_results, add tests * retry tests * unit test coverage * dont retry twice * fix mypy_samples session * consolidate docstrings for query_and_wait * remove mention of job ID * fallback to jobs.insert for unsupported features * distinguish API timeout from wait timeout * add test for jobs.insert fallback * populate default job config * refactor default config * add coverage for job_config * cancel job if hasn't finished * mypy * allow unrealeased features in samples * fix for 3.12 * fix: keep `RowIterator.total_rows` populated after iteration This was being reset in some cases when the rows were all available in the first page of results. * Update google/cloud/bigquery/table.py Co-authored-by: Anthonios Partheniou <[email protected]> * fix comments --------- Co-authored-by: Anthonios Partheniou <[email protected]>
1 parent 8482f47 commit 89a647e

File tree

12 files changed

+1550
-80
lines changed

12 files changed

+1550
-80
lines changed

google/cloud/bigquery/_job_helpers.py

+303-13
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,32 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Helpers for interacting with the job REST APIs from the client."""
15+
"""Helpers for interacting with the job REST APIs from the client.
16+
17+
For queries, there are three cases to consider:
18+
19+
1. jobs.insert: This always returns a job resource.
20+
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
21+
This sometimes can return the results inline, but always includes a job ID.
22+
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
23+
This sometimes doesn't create a job at all, instead returning the results.
24+
For better debugging, an auto-generated query ID is included in the
25+
response.
26+
27+
Client.query() calls either (1) or (2), depending on what the user provides
28+
for the api_method parameter. query() always returns a QueryJob object, which
29+
can retry the query when the query job fails for a retriable reason.
30+
31+
Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
32+
local results from the response or may wrap a query job containing multiple
33+
pages of results. Even though query_and_wait() waits for the job to complete,
34+
we still need a separate job_retry object because there are different
35+
predicates where it is safe to generate a new query ID.
36+
"""
1637

1738
import copy
39+
import functools
40+
import os
1841
import uuid
1942
from typing import Any, Dict, TYPE_CHECKING, Optional
2043

@@ -23,6 +46,7 @@
2346

2447
from google.cloud.bigquery import job
2548
import google.cloud.bigquery.query
49+
from google.cloud.bigquery import table
2650

2751
# Avoid circular imports
2852
if TYPE_CHECKING: # pragma: NO COVER
@@ -59,6 +83,25 @@ def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> s
5983
return str(uuid.uuid4())
6084

6185

86+
def job_config_with_defaults(
87+
job_config: Optional[job.QueryJobConfig],
88+
default_job_config: Optional[job.QueryJobConfig],
89+
) -> Optional[job.QueryJobConfig]:
90+
"""Create a copy of `job_config`, replacing unset values with those from
91+
`default_job_config`.
92+
"""
93+
if job_config is None:
94+
return default_job_config
95+
96+
if default_job_config is None:
97+
return job_config
98+
99+
# Both job_config and default_job_config are not None, so make a copy of
100+
# job_config merged with default_job_config. Anything already explicitly
101+
# set on job_config should not be replaced.
102+
return job_config._fill_from_default(default_job_config)
103+
104+
62105
def query_jobs_insert(
63106
client: "Client",
64107
query: str,
@@ -67,9 +110,9 @@ def query_jobs_insert(
67110
job_id_prefix: Optional[str],
68111
location: Optional[str],
69112
project: str,
70-
retry: retries.Retry,
113+
retry: Optional[retries.Retry],
71114
timeout: Optional[float],
72-
job_retry: retries.Retry,
115+
job_retry: Optional[retries.Retry],
73116
) -> job.QueryJob:
74117
"""Initiate a query using jobs.insert.
75118
@@ -123,7 +166,13 @@ def do_query():
123166
return future
124167

125168

126-
def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
169+
def _to_query_request(
170+
job_config: Optional[job.QueryJobConfig] = None,
171+
*,
172+
query: str,
173+
location: Optional[str] = None,
174+
timeout: Optional[float] = None,
175+
) -> Dict[str, Any]:
127176
"""Transform from Job resource to QueryRequest resource.
128177
129178
Most of the keys in job.configuration.query are in common with
@@ -150,6 +199,15 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any
150199
request_body.setdefault("formatOptions", {})
151200
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
152201

202+
if timeout is not None:
203+
# Subtract a buffer for context switching, network latency, etc.
204+
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
205+
206+
if location is not None:
207+
request_body["location"] = location
208+
209+
request_body["query"] = query
210+
153211
return request_body
154212

155213

@@ -207,6 +265,10 @@ def _to_query_job(
207265
return query_job
208266

209267

268+
def _to_query_path(project: str) -> str:
269+
return f"/projects/{project}/queries"
270+
271+
210272
def query_jobs_query(
211273
client: "Client",
212274
query: str,
@@ -217,18 +279,14 @@ def query_jobs_query(
217279
timeout: Optional[float],
218280
job_retry: retries.Retry,
219281
) -> job.QueryJob:
220-
"""Initiate a query using jobs.query.
282+
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
221283
222284
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
223285
"""
224-
path = f"/projects/{project}/queries"
225-
request_body = _to_query_request(job_config)
226-
227-
if timeout is not None:
228-
# Subtract a buffer for context switching, network latency, etc.
229-
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
230-
request_body["location"] = location
231-
request_body["query"] = query
286+
path = _to_query_path(project)
287+
request_body = _to_query_request(
288+
query=query, job_config=job_config, location=location, timeout=timeout
289+
)
232290

233291
def do_query():
234292
request_body["requestId"] = make_job_id()
@@ -253,3 +311,235 @@ def do_query():
253311
future._job_retry = job_retry
254312

255313
return future
314+
315+
316+
def query_and_wait(
317+
client: "Client",
318+
query: str,
319+
*,
320+
job_config: Optional[job.QueryJobConfig],
321+
location: Optional[str],
322+
project: str,
323+
api_timeout: Optional[float] = None,
324+
wait_timeout: Optional[float] = None,
325+
retry: Optional[retries.Retry],
326+
job_retry: Optional[retries.Retry],
327+
page_size: Optional[int] = None,
328+
max_results: Optional[int] = None,
329+
) -> table.RowIterator:
330+
"""Run the query, wait for it to finish, and return the results.
331+
332+
While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview in the
333+
``jobs.query`` REST API, use the default ``jobCreationMode`` unless
334+
the environment variable ``QUERY_PREVIEW_ENABLED=true``. After
335+
``jobCreationMode`` is GA, this method will always use
336+
``jobCreationMode=JOB_CREATION_OPTIONAL``. See:
337+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
338+
339+
Args:
340+
client:
341+
BigQuery client to make API calls.
342+
query (str):
343+
SQL query to be executed. Defaults to the standard SQL
344+
dialect. Use the ``job_config`` parameter to change dialects.
345+
job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
346+
Extra configuration options for the job.
347+
To override any options that were previously set in
348+
the ``default_query_job_config`` given to the
349+
``Client`` constructor, manually set those options to ``None``,
350+
or whatever value is preferred.
351+
location (Optional[str]):
352+
Location where to run the job. Must match the location of the
353+
table used in the query as well as the destination table.
354+
project (Optional[str]):
355+
Project ID of the project of where to run the job. Defaults
356+
to the client's project.
357+
api_timeout (Optional[float]):
358+
The number of seconds to wait for the underlying HTTP transport
359+
before using ``retry``.
360+
wait_timeout (Optional[float]):
361+
The number of seconds to wait for the query to finish. If the
362+
query doesn't finish before this timeout, the client attempts
363+
to cancel the query.
364+
retry (Optional[google.api_core.retry.Retry]):
365+
How to retry the RPC. This only applies to making RPC
366+
calls. It isn't used to retry failed jobs. This has
367+
a reasonable default that should only be overridden
368+
with care.
369+
job_retry (Optional[google.api_core.retry.Retry]):
370+
How to retry failed jobs. The default retries
371+
rate-limit-exceeded errors. Passing ``None`` disables
372+
job retry. Not all jobs can be retried.
373+
page_size (Optional[int]):
374+
The maximum number of rows in each page of results from this
375+
request. Non-positive values are ignored.
376+
max_results (Optional[int]):
377+
The maximum total number of rows from this request.
378+
379+
Returns:
380+
google.cloud.bigquery.table.RowIterator:
381+
Iterator of row data
382+
:class:`~google.cloud.bigquery.table.Row`-s. During each
383+
page, the iterator will have the ``total_rows`` attribute
384+
set, which counts the total number of rows **in the result
385+
set** (this is distinct from the total number of rows in the
386+
current page: ``iterator.page.num_items``).
387+
388+
If the query is a special query that produces no results, e.g.
389+
a DDL query, an ``_EmptyRowIterator`` instance is returned.
390+
391+
Raises:
392+
TypeError:
393+
If ``job_config`` is not an instance of
394+
:class:`~google.cloud.bigquery.job.QueryJobConfig`
395+
class.
396+
"""
397+
# Some API parameters aren't supported by the jobs.query API. In these
398+
# cases, fallback to a jobs.insert call.
399+
if not _supported_by_jobs_query(job_config):
400+
return _wait_or_cancel(
401+
query_jobs_insert(
402+
client=client,
403+
query=query,
404+
job_id=None,
405+
job_id_prefix=None,
406+
job_config=job_config,
407+
location=location,
408+
project=project,
409+
retry=retry,
410+
timeout=api_timeout,
411+
job_retry=job_retry,
412+
),
413+
api_timeout=api_timeout,
414+
wait_timeout=wait_timeout,
415+
retry=retry,
416+
page_size=page_size,
417+
max_results=max_results,
418+
)
419+
420+
path = _to_query_path(project)
421+
request_body = _to_query_request(
422+
query=query, job_config=job_config, location=location, timeout=api_timeout
423+
)
424+
425+
if page_size is not None and max_results is not None:
426+
request_body["maxResults"] = min(page_size, max_results)
427+
elif page_size is not None or max_results is not None:
428+
request_body["maxResults"] = page_size or max_results
429+
430+
if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true":
431+
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"
432+
433+
def do_query():
434+
request_body["requestId"] = make_job_id()
435+
span_attributes = {"path": path}
436+
437+
# For easier testing, handle the retries ourselves.
438+
if retry is not None:
439+
response = retry(client._call_api)(
440+
retry=None, # We're calling the retry decorator ourselves.
441+
span_name="BigQuery.query",
442+
span_attributes=span_attributes,
443+
method="POST",
444+
path=path,
445+
data=request_body,
446+
timeout=api_timeout,
447+
)
448+
else:
449+
response = client._call_api(
450+
retry=None,
451+
span_name="BigQuery.query",
452+
span_attributes=span_attributes,
453+
method="POST",
454+
path=path,
455+
data=request_body,
456+
timeout=api_timeout,
457+
)
458+
459+
# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
460+
# to fetch, there will be a job ID for jobs.getQueryResults.
461+
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
462+
response
463+
)
464+
page_token = query_results.page_token
465+
more_pages = page_token is not None
466+
467+
if more_pages or not query_results.complete:
468+
# TODO(swast): Avoid a call to jobs.get in some cases (few
469+
# remaining pages) by waiting for the query to finish and calling
470+
# client._list_rows_from_query_results directly. Need to update
471+
# RowIterator to fetch destination table via the job ID if needed.
472+
return _wait_or_cancel(
473+
_to_query_job(client, query, job_config, response),
474+
api_timeout=api_timeout,
475+
wait_timeout=wait_timeout,
476+
retry=retry,
477+
page_size=page_size,
478+
max_results=max_results,
479+
)
480+
481+
return table.RowIterator(
482+
client=client,
483+
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
484+
path=None,
485+
schema=query_results.schema,
486+
max_results=max_results,
487+
page_size=page_size,
488+
total_rows=query_results.total_rows,
489+
first_page_response=response,
490+
location=query_results.location,
491+
job_id=query_results.job_id,
492+
query_id=query_results.query_id,
493+
project=query_results.project,
494+
)
495+
496+
if job_retry is not None:
497+
return job_retry(do_query)()
498+
else:
499+
return do_query()
500+
501+
502+
def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool:
503+
"""True if jobs.query can be used. False if jobs.insert is needed."""
504+
if job_config is None:
505+
return True
506+
507+
return (
508+
# These features aren't supported by jobs.query.
509+
job_config.clustering_fields is None
510+
and job_config.destination is None
511+
and job_config.destination_encryption_configuration is None
512+
and job_config.range_partitioning is None
513+
and job_config.table_definitions is None
514+
and job_config.time_partitioning is None
515+
)
516+
517+
518+
def _wait_or_cancel(
519+
job: job.QueryJob,
520+
api_timeout: Optional[float],
521+
wait_timeout: Optional[float],
522+
retry: Optional[retries.Retry],
523+
page_size: Optional[int],
524+
max_results: Optional[int],
525+
) -> table.RowIterator:
526+
"""Wait for a job to complete and return the results.
527+
528+
If we can't return the results within the ``wait_timeout``, try to cancel
529+
the job.
530+
"""
531+
try:
532+
return job.result(
533+
page_size=page_size,
534+
max_results=max_results,
535+
retry=retry,
536+
timeout=wait_timeout,
537+
)
538+
except Exception:
539+
# Attempt to cancel the job since we can't return the results.
540+
try:
541+
job.cancel(retry=retry, timeout=api_timeout)
542+
except Exception:
543+
# Don't eat the original exception if cancel fails.
544+
pass
545+
raise

0 commit comments

Comments
 (0)