-
Notifications
You must be signed in to change notification settings - Fork 313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add Client.query_and_wait
which directly returns a RowIterator
of results
#1722
Changes from 37 commits
b8c583a
6a8059d
1f0e38e
4401725
d078941
660aa76
476bcd7
05d6a3e
c16e4be
222f91b
73e5817
9508121
543481d
85f1cab
c0e6c86
d4a322d
e0b2d2e
9daccbd
bba36d2
adf0b49
6dfbf92
765a644
e461ebe
895b6d0
d5345cd
f75d8ab
baff9d6
221898d
18f825a
5afbc41
08167d8
3e10ea4
dc5e5be
f1556bc
db71a1b
bd7e767
4ffec17
95b3b0e
f08dac3
a376bd6
7c3d813
304799a
425f6b0
ae06cd2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,9 +12,32 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Helpers for interacting with the job REST APIs from the client.""" | ||
"""Helpers for interacting with the job REST APIs from the client. | ||
|
||
For queries, there are three cases to consider: | ||
|
||
1. jobs.insert: This always returns a job resource. | ||
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED: | ||
This sometimes can return the results inline, but always includes a job ID. | ||
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL: | ||
This sometimes doesn't create a job at all, instead returning the results. | ||
For better debugging, an auto-generated query ID is included in the | ||
response. | ||
|
||
Client.query() calls either (1) or (2), depending on what the user provides | ||
for the api_method parameter. query() always returns a QueryJob object, which | ||
can retry the query when the query job fails for a retriable reason. | ||
|
||
Client.query_and_wait() calls (3). This returns a RowIterator that may wrap | ||
local results from the response or may wrap a query job containing multiple | ||
pages of results. Even though query_and_wait() waits for the job to complete, | ||
we still need a separate job_retry object because there are different | ||
predicates where it is safe to generate a new query ID. | ||
""" | ||
|
||
import copy | ||
import functools | ||
import os | ||
import uuid | ||
from typing import Any, Dict, TYPE_CHECKING, Optional | ||
|
||
|
@@ -23,6 +46,7 @@ | |
|
||
from google.cloud.bigquery import job | ||
import google.cloud.bigquery.query | ||
from google.cloud.bigquery import table | ||
|
||
# Avoid circular imports | ||
if TYPE_CHECKING: # pragma: NO COVER | ||
|
@@ -59,6 +83,25 @@ def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> s | |
return str(uuid.uuid4()) | ||
|
||
|
||
def job_config_with_defaults( | ||
job_config: Optional[job.QueryJobConfig], | ||
default_job_config: Optional[job.QueryJobConfig], | ||
) -> Optional[job.QueryJobConfig]: | ||
if job_config is None: | ||
return default_job_config | ||
|
||
if default_job_config is None: | ||
return job_config | ||
|
||
# anything that's not defined on the incoming | ||
# that is in the default, | ||
# should be filled in with the default | ||
# the incoming therefore has precedence | ||
# | ||
# Note that _fill_from_default doesn't mutate the receiver | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my own edification, what do we mean by "receiver"? |
||
return job_config._fill_from_default(default_job_config) | ||
|
||
|
||
def query_jobs_insert( | ||
client: "Client", | ||
query: str, | ||
|
@@ -67,9 +110,9 @@ def query_jobs_insert( | |
job_id_prefix: Optional[str], | ||
location: Optional[str], | ||
project: str, | ||
retry: retries.Retry, | ||
retry: Optional[retries.Retry], | ||
timeout: Optional[float], | ||
job_retry: retries.Retry, | ||
job_retry: Optional[retries.Retry], | ||
) -> job.QueryJob: | ||
"""Initiate a query using jobs.insert. | ||
|
||
|
@@ -123,7 +166,13 @@ def do_query(): | |
return future | ||
|
||
|
||
def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]: | ||
def _to_query_request( | ||
tswast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
job_config: Optional[job.QueryJobConfig] = None, | ||
*, | ||
query: str, | ||
location: Optional[str] = None, | ||
timeout: Optional[float] = None, | ||
) -> Dict[str, Any]: | ||
"""Transform from Job resource to QueryRequest resource. | ||
|
||
Most of the keys in job.configuration.query are in common with | ||
|
@@ -150,6 +199,15 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any | |
request_body.setdefault("formatOptions", {}) | ||
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore | ||
|
||
if timeout is not None: | ||
# Subtract a buffer for context switching, network latency, etc. | ||
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) | ||
tswast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if location is not None: | ||
request_body["location"] = location | ||
|
||
request_body["query"] = query | ||
|
||
return request_body | ||
|
||
|
||
|
@@ -207,6 +265,10 @@ def _to_query_job( | |
return query_job | ||
|
||
|
||
def _to_query_path(project: str) -> str: | ||
return f"/projects/{project}/queries" | ||
|
||
|
||
def query_jobs_query( | ||
client: "Client", | ||
query: str, | ||
|
@@ -217,18 +279,14 @@ def query_jobs_query( | |
timeout: Optional[float], | ||
job_retry: retries.Retry, | ||
) -> job.QueryJob: | ||
"""Initiate a query using jobs.query. | ||
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED. | ||
|
||
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query | ||
""" | ||
path = f"/projects/{project}/queries" | ||
request_body = _to_query_request(job_config) | ||
|
||
if timeout is not None: | ||
# Subtract a buffer for context switching, network latency, etc. | ||
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) | ||
request_body["location"] = location | ||
request_body["query"] = query | ||
path = _to_query_path(project) | ||
request_body = _to_query_request( | ||
query=query, job_config=job_config, location=location, timeout=timeout | ||
) | ||
|
||
def do_query(): | ||
request_body["requestId"] = make_job_id() | ||
|
@@ -253,3 +311,235 @@ def do_query(): | |
future._job_retry = job_retry | ||
|
||
return future | ||
|
||
|
||
def query_and_wait( | ||
client: "Client", | ||
query: str, | ||
*, | ||
job_config: Optional[job.QueryJobConfig], | ||
location: Optional[str], | ||
project: str, | ||
api_timeout: Optional[float] = None, | ||
wait_timeout: Optional[float] = None, | ||
retry: Optional[retries.Retry], | ||
job_retry: Optional[retries.Retry], | ||
page_size: Optional[int] = None, | ||
max_results: Optional[int] = None, | ||
) -> table.RowIterator: | ||
"""Run the query, wait for it to finish, and return the results. | ||
|
||
While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview in the | ||
``jobs.query`` REST API, use the default ``jobCreationMode`` unless | ||
the environment variable ``QUERY_PREVIEW_ENABLED=true``. After | ||
``jobCreationMode`` is GA, this method will always use | ||
``jobCreationMode=JOB_CREATION_OPTIONAL``. See: | ||
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query | ||
|
||
Args: | ||
client: | ||
BigQuery client to make API calls. | ||
query (str): | ||
SQL query to be executed. Defaults to the standard SQL | ||
dialect. Use the ``job_config`` parameter to change dialects. | ||
job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): | ||
Extra configuration options for the job. | ||
To override any options that were previously set in | ||
the ``default_query_job_config`` given to the | ||
``Client`` constructor, manually set those options to ``None``, | ||
or whatever value is preferred. | ||
location (Optional[str]): | ||
Location where to run the job. Must match the location of the | ||
table used in the query as well as the destination table. | ||
project (Optional[str]): | ||
Project ID of the project of where to run the job. Defaults | ||
to the client's project. | ||
api_timeout (Optional[float]): | ||
The number of seconds to wait for the underlying HTTP transport | ||
before using ``retry``. | ||
wait_timeout (Optional[float]): | ||
The number of seconds to wait for the query to finish. If the | ||
query doesn't finish before this timeout, the client attempts | ||
to cancel the query. | ||
retry (Optional[google.api_core.retry.Retry]): | ||
How to retry the RPC. This only applies to making RPC | ||
calls. It isn't used to retry failed jobs. This has | ||
a reasonable default that should only be overridden | ||
with care. | ||
job_retry (Optional[google.api_core.retry.Retry]): | ||
How to retry failed jobs. The default retries | ||
rate-limit-exceeded errors. Passing ``None`` disables | ||
job retry. Not all jobs can be retried. | ||
page_size (Optional[int]): | ||
The maximum number of rows in each page of results from this | ||
request. Non-positive values are ignored. | ||
max_results (Optional[int]): | ||
The maximum total number of rows from this request. | ||
|
||
Returns: | ||
google.cloud.bigquery.table.RowIterator: | ||
Iterator of row data | ||
:class:`~google.cloud.bigquery.table.Row`-s. During each | ||
page, the iterator will have the ``total_rows`` attribute | ||
set, which counts the total number of rows **in the result | ||
set** (this is distinct from the total number of rows in the | ||
current page: ``iterator.page.num_items``). | ||
|
||
If the query is a special query that produces no results, e.g. | ||
a DDL query, an ``_EmptyRowIterator`` instance is returned. | ||
|
||
Raises: | ||
TypeError: | ||
If ``job_config`` is not an instance of | ||
:class:`~google.cloud.bigquery.job.QueryJobConfig` | ||
class. | ||
""" | ||
# Some API parameters aren't supported by the jobs.query API. In these | ||
# cases, fallback to a jobs.insert call. | ||
if not _supported_by_jobs_query(job_config): | ||
return _wait_or_cancel( | ||
query_jobs_insert( | ||
client=client, | ||
query=query, | ||
job_id=None, | ||
job_id_prefix=None, | ||
job_config=job_config, | ||
location=location, | ||
project=project, | ||
retry=retry, | ||
timeout=api_timeout, | ||
job_retry=job_retry, | ||
), | ||
api_timeout=api_timeout, | ||
wait_timeout=wait_timeout, | ||
retry=retry, | ||
page_size=page_size, | ||
max_results=max_results, | ||
) | ||
|
||
path = _to_query_path(project) | ||
request_body = _to_query_request( | ||
query=query, job_config=job_config, location=location, timeout=api_timeout | ||
) | ||
|
||
tswast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if page_size is not None and max_results is not None: | ||
request_body["maxResults"] = min(page_size, max_results) | ||
elif page_size is not None or max_results is not None: | ||
request_body["maxResults"] = page_size or max_results | ||
|
||
if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See: Java client changes. Needs to check a set of projects instead of all projects. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shollyman @mmladenovski Could you shared details about the requested There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tswast , in our previous discussion, I only suggested for this environment variable to use a list of project IDs for a more fine-grained control. We have discussed it with @shollyman , and for the Java client we decided to go with a boolean. I am fine either way, staying consistent with a boolean or doing a more fine-grained project id list (if required by python client usage). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll stick with the boolean for consistency with other client libraries, but definitely worth considering finer grained control in a future release if needed. |
||
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL" | ||
|
||
def do_query(): | ||
request_body["requestId"] = make_job_id() | ||
span_attributes = {"path": path} | ||
|
||
# For easier testing, handle the retries ourselves. | ||
if retry is not None: | ||
response = retry(client._call_api)( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to be careful with timeout here. |
||
retry=None, # We're calling the retry decorator ourselves. | ||
span_name="BigQuery.query", | ||
span_attributes=span_attributes, | ||
method="POST", | ||
path=path, | ||
data=request_body, | ||
timeout=api_timeout, | ||
) | ||
else: | ||
response = client._call_api( | ||
retry=None, | ||
span_name="BigQuery.query", | ||
span_attributes=span_attributes, | ||
method="POST", | ||
path=path, | ||
data=request_body, | ||
timeout=api_timeout, | ||
) | ||
|
||
# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages | ||
# to fetch, there will be a job ID for jobs.getQueryResults. | ||
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( | ||
response | ||
) | ||
page_token = query_results.page_token | ||
more_pages = page_token is not None | ||
|
||
if more_pages or not query_results.complete: | ||
# TODO(swast): Avoid a call to jobs.get in some cases (few | ||
# remaining pages) by waiting for the query to finish and calling | ||
# client._list_rows_from_query_results directly. Need to update | ||
# RowIterator to fetch destination table via the job ID if needed. | ||
return _wait_or_cancel( | ||
_to_query_job(client, query, job_config, response), | ||
api_timeout=api_timeout, | ||
wait_timeout=wait_timeout, | ||
retry=retry, | ||
page_size=page_size, | ||
max_results=max_results, | ||
) | ||
|
||
return table.RowIterator( | ||
client=client, | ||
api_request=functools.partial(client._call_api, retry, timeout=api_timeout), | ||
path=None, | ||
schema=query_results.schema, | ||
max_results=max_results, | ||
page_size=page_size, | ||
total_rows=query_results.total_rows, | ||
first_page_response=response, | ||
location=query_results.location, | ||
job_id=query_results.job_id, | ||
query_id=query_results.query_id, | ||
project=query_results.project, | ||
) | ||
|
||
if job_retry is not None: | ||
return job_retry(do_query)() | ||
else: | ||
return do_query() | ||
|
||
|
||
def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool: | ||
"""True if jobs.query can be used. False if jobs.insert is needed.""" | ||
if job_config is None: | ||
return True | ||
|
||
return ( | ||
# These features aren't supported by jobs.query. | ||
job_config.clustering_fields is None | ||
and job_config.destination is None | ||
and job_config.destination_encryption_configuration is None | ||
and job_config.range_partitioning is None | ||
and job_config.table_definitions is None | ||
and job_config.time_partitioning is None | ||
) | ||
|
||
|
||
def _wait_or_cancel( | ||
job: job.QueryJob, | ||
api_timeout: Optional[float], | ||
wait_timeout: Optional[float], | ||
retry: Optional[retries.Retry], | ||
page_size: Optional[int], | ||
max_results: Optional[int], | ||
) -> table.RowIterator: | ||
"""Wait for a job to complete and return the results. | ||
|
||
If we can't return the results within the ``wait_timeout``, try to cancel | ||
the job. | ||
""" | ||
try: | ||
return job.result( | ||
page_size=page_size, | ||
max_results=max_results, | ||
retry=retry, | ||
timeout=wait_timeout, | ||
) | ||
except Exception: | ||
# Attempt to cancel the job since we can't return the results. | ||
try: | ||
job.cancel(retry=retry, timeout=api_timeout) | ||
except Exception: | ||
# Don't eat the original exception if cancel fails. | ||
pass | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is a bit unclear. Can we clean this up a bit?