Skip to content

Commit 64b66da

Browse files
authored
Preserve order in to_dataframe with BQ Storage from queries containing ORDER BY (#7793)
* Preserve order in `to_dataframe` with BQ Storage and queries containing ORDER BY This fixes an issue where due to reading from multiple stream in parallel, the order of rows is not preserved. Normally this isn't an issue, but it is when the rows are query results from an ORDER BY query. * Compile regex. * Assert based on truthiness not equality.
1 parent 86f81d1 commit 64b66da

File tree

3 files changed

+192
-29
lines changed

3 files changed

+192
-29
lines changed

bigquery/google/cloud/bigquery/job.py

+29-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Define API Jobs."""
1616

1717
import copy
18+
import re
1819
import threading
1920

2021
import six
@@ -45,6 +46,7 @@
4546
_DONE_STATE = "DONE"
4647
_STOPPED_REASON = "stopped"
4748
_TIMEOUT_BUFFER_SECS = 0.1
49+
_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)
4850

4951
_ERROR_REASON_TO_EXCEPTION = {
5052
"accessDenied": http_client.FORBIDDEN,
@@ -92,6 +94,29 @@ def _error_result_to_exception(error_result):
9294
)
9395

9496

97+
def _contains_order_by(query):
98+
"""Do we need to preserve the order of the query results?
99+
100+
This function has known false positives, such as with ordered window
101+
functions:
102+
103+
.. code-block:: sql
104+
105+
SELECT SUM(x) OVER (
106+
window_name
107+
PARTITION BY...
108+
ORDER BY...
109+
window_frame_clause)
110+
FROM ...
111+
112+
This false positive failure case means the behavior will be correct, but
113+
downloading results with the BigQuery Storage API may be slower than it
114+
otherwise would. This is preferable to the false negative case, where
115+
results are expected to be in order but are not (due to parallel reads).
116+
"""
117+
return query and _CONTAINS_ORDER_BY.search(query)
118+
119+
95120
class Compression(object):
96121
"""The compression type to use for exported files. The default value is
97122
:attr:`NONE`.
@@ -2546,7 +2571,7 @@ def from_api_repr(cls, resource, client):
25462571
:returns: Job parsed from ``resource``.
25472572
"""
25482573
job_id, config = cls._get_resource_config(resource)
2549-
query = config["query"]["query"]
2574+
query = _helpers._get_sub_prop(config, ["query", "query"])
25502575
job = cls(job_id, query, client=client)
25512576
job._set_properties(resource)
25522577
return job
@@ -2849,7 +2874,9 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
28492874
dest_table_ref = self.destination
28502875
dest_table = Table(dest_table_ref, schema=schema)
28512876
dest_table._properties["numRows"] = self._query_results.total_rows
2852-
return self._client.list_rows(dest_table, retry=retry)
2877+
rows = self._client.list_rows(dest_table, retry=retry)
2878+
rows._preserve_order = _contains_order_by(self.query)
2879+
return rows
28532880

28542881
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
28552882
"""Return a pandas DataFrame from a QueryJob

bigquery/google/cloud/bigquery/table.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,7 @@ def __init__(
13481348
)
13491349
self._field_to_index = _helpers._field_to_index_mapping(schema)
13501350
self._page_size = page_size
1351+
self._preserve_order = False
13511352
self._project = client.project
13521353
self._schema = schema
13531354
self._selected_fields = selected_fields
@@ -1496,10 +1497,15 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
14961497
for field in self._selected_fields:
14971498
read_options.selected_fields.append(field.name)
14981499

1500+
requested_streams = 0
1501+
if self._preserve_order:
1502+
requested_streams = 1
1503+
14991504
session = bqstorage_client.create_read_session(
15001505
self._table.to_bqstorage(),
15011506
"projects/{}".format(self._project),
15021507
read_options=read_options,
1508+
requested_streams=requested_streams,
15031509
)
15041510

15051511
# We need to parse the schema manually so that we can rearrange the
@@ -1512,6 +1518,8 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
15121518
if not session.streams:
15131519
return pandas.DataFrame(columns=columns)
15141520

1521+
total_streams = len(session.streams)
1522+
15151523
# Use _to_dataframe_finished to notify worker threads when to quit.
15161524
# See: https://stackoverflow.com/a/29237343/101923
15171525
self._to_dataframe_finished = False
@@ -1560,7 +1568,7 @@ def get_frames(pool):
15601568

15611569
return frames
15621570

1563-
with concurrent.futures.ThreadPoolExecutor() as pool:
1571+
with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
15641572
try:
15651573
frames = get_frames(pool)
15661574
finally:

bigquery/tests/unit/test_job.py

+154-26
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import unittest
1818

1919
import mock
20+
import pytest
2021
from six.moves import http_client
2122

2223
try:
@@ -59,6 +60,47 @@ def _make_connection(*responses):
5960
return mock_conn
6061

6162

63+
def _make_job_resource(
64+
creation_time_ms=1437767599006,
65+
started_time_ms=1437767600007,
66+
ended_time_ms=1437767601008,
67+
started=False,
68+
ended=False,
69+
etag="abc-def-hjk",
70+
endpoint="https://www.googleapis.com",
71+
job_type="load",
72+
job_id="a-random-id",
73+
project_id="some-project",
74+
user_email="[email protected]",
75+
):
76+
resource = {
77+
"configuration": {job_type: {}},
78+
"statistics": {"creationTime": creation_time_ms, job_type: {}},
79+
"etag": etag,
80+
"id": "{}:{}".format(project_id, job_id),
81+
"jobReference": {"projectId": project_id, "jobId": job_id},
82+
"selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format(
83+
endpoint, project_id, job_id
84+
),
85+
"user_email": user_email,
86+
}
87+
88+
if started or ended:
89+
resource["statistics"]["startTime"] = started_time_ms
90+
91+
if ended:
92+
resource["statistics"]["endTime"] = ended_time_ms
93+
94+
if job_type == "query":
95+
resource["configuration"]["query"]["destinationTable"] = {
96+
"projectId": project_id,
97+
"datasetId": "_temp_dataset",
98+
"tableId": "_temp_table",
99+
}
100+
101+
return resource
102+
103+
62104
class Test__error_result_to_exception(unittest.TestCase):
63105
def _call_fut(self, *args, **kwargs):
64106
from google.cloud.bigquery import job
@@ -974,6 +1016,7 @@ class _Base(object):
9741016
from google.cloud.bigquery.dataset import DatasetReference
9751017
from google.cloud.bigquery.table import TableReference
9761018

1019+
ENDPOINT = "https://www.googleapis.com"
9771020
PROJECT = "project"
9781021
SOURCE1 = "http://example.com/source1.csv"
9791022
DS_ID = "dataset_id"
@@ -994,7 +1037,9 @@ def _setUpConstants(self):
9941037
self.WHEN = datetime.datetime.utcfromtimestamp(self.WHEN_TS).replace(tzinfo=UTC)
9951038
self.ETAG = "ETAG"
9961039
self.FULL_JOB_ID = "%s:%s" % (self.PROJECT, self.JOB_ID)
997-
self.RESOURCE_URL = "http://example.com/path/to/resource"
1040+
self.RESOURCE_URL = "{}/bigquery/v2/projects/{}/jobs/{}".format(
1041+
self.ENDPOINT, self.PROJECT, self.JOB_ID
1042+
)
9981043
self.USER_EMAIL = "[email protected]"
9991044

10001045
def _table_ref(self, table_id):
@@ -1004,30 +1049,19 @@ def _table_ref(self, table_id):
10041049

10051050
def _make_resource(self, started=False, ended=False):
10061051
self._setUpConstants()
1007-
resource = {
1008-
"configuration": {self.JOB_TYPE: {}},
1009-
"statistics": {"creationTime": self.WHEN_TS * 1000, self.JOB_TYPE: {}},
1010-
"etag": self.ETAG,
1011-
"id": self.FULL_JOB_ID,
1012-
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
1013-
"selfLink": self.RESOURCE_URL,
1014-
"user_email": self.USER_EMAIL,
1015-
}
1016-
1017-
if started or ended:
1018-
resource["statistics"]["startTime"] = self.WHEN_TS * 1000
1019-
1020-
if ended:
1021-
resource["statistics"]["endTime"] = (self.WHEN_TS + 1000) * 1000
1022-
1023-
if self.JOB_TYPE == "query":
1024-
resource["configuration"]["query"]["destinationTable"] = {
1025-
"projectId": self.PROJECT,
1026-
"datasetId": "_temp_dataset",
1027-
"tableId": "_temp_table",
1028-
}
1029-
1030-
return resource
1052+
return _make_job_resource(
1053+
creation_time_ms=int(self.WHEN_TS * 1000),
1054+
started_time_ms=int(self.WHEN_TS * 1000),
1055+
ended_time_ms=int(self.WHEN_TS * 1000) + 1000000,
1056+
started=started,
1057+
ended=ended,
1058+
etag=self.ETAG,
1059+
endpoint=self.ENDPOINT,
1060+
job_type=self.JOB_TYPE,
1061+
job_id=self.JOB_ID,
1062+
project_id=self.PROJECT,
1063+
user_email=self.USER_EMAIL,
1064+
)
10311065

10321066
def _verifyInitialReadonlyProperties(self, job):
10331067
# root elements of resource
@@ -4684,7 +4718,11 @@ def test_to_dataframe_bqstorage(self):
46844718
job.to_dataframe(bqstorage_client=bqstorage_client)
46854719

46864720
bqstorage_client.create_read_session.assert_called_once_with(
4687-
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
4721+
mock.ANY,
4722+
"projects/{}".format(self.PROJECT),
4723+
read_options=mock.ANY,
4724+
# Use default number of streams for best performance.
4725+
requested_streams=0,
46884726
)
46894727

46904728
@unittest.skipIf(pandas is None, "Requires `pandas`")
@@ -5039,3 +5077,93 @@ def test_from_api_repr_normal(self):
50395077
self.assertEqual(entry.pending_units, self.PENDING_UNITS)
50405078
self.assertEqual(entry.completed_units, self.COMPLETED_UNITS)
50415079
self.assertEqual(entry.slot_millis, self.SLOT_MILLIS)
5080+
5081+
5082+
@pytest.mark.parametrize(
5083+
"query,expected",
5084+
(
5085+
(None, False),
5086+
("", False),
5087+
("select name, age from table", False),
5088+
("select name, age from table LIMIT 10;", False),
5089+
("select name, age from table order by other_column;", True),
5090+
("Select name, age From table Order By other_column", True),
5091+
("SELECT name, age FROM table ORDER BY other_column;", True),
5092+
("select name, age from table order\nby other_column", True),
5093+
("Select name, age From table Order\nBy other_column;", True),
5094+
("SELECT name, age FROM table ORDER\nBY other_column", True),
5095+
("SelecT name, age froM table OrdeR \n\t BY other_column;", True),
5096+
),
5097+
)
5098+
def test__contains_order_by(query, expected):
5099+
from google.cloud.bigquery import job as mut
5100+
5101+
if expected:
5102+
assert mut._contains_order_by(query)
5103+
else:
5104+
assert not mut._contains_order_by(query)
5105+
5106+
5107+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
5108+
@pytest.mark.skipif(
5109+
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
5110+
)
5111+
@pytest.mark.parametrize(
5112+
"query",
5113+
(
5114+
"select name, age from table order by other_column;",
5115+
"Select name, age From table Order By other_column;",
5116+
"SELECT name, age FROM table ORDER BY other_column;",
5117+
"select name, age from table order\nby other_column;",
5118+
"Select name, age From table Order\nBy other_column;",
5119+
"SELECT name, age FROM table ORDER\nBY other_column;",
5120+
"SelecT name, age froM table OrdeR \n\t BY other_column;",
5121+
),
5122+
)
5123+
def test_to_dataframe_bqstorage_preserve_order(query):
5124+
from google.cloud.bigquery.job import QueryJob as target_class
5125+
5126+
job_resource = _make_job_resource(
5127+
project_id="test-project", job_type="query", ended=True
5128+
)
5129+
job_resource["configuration"]["query"]["query"] = query
5130+
job_resource["status"] = {"state": "DONE"}
5131+
get_query_results_resource = {
5132+
"jobComplete": True,
5133+
"jobReference": {"projectId": "test-project", "jobId": "test-job"},
5134+
"schema": {
5135+
"fields": [
5136+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
5137+
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
5138+
]
5139+
},
5140+
"totalRows": "4",
5141+
}
5142+
connection = _make_connection(get_query_results_resource, job_resource)
5143+
client = _make_client(connection=connection)
5144+
job = target_class.from_api_repr(job_resource, client)
5145+
bqstorage_client = mock.create_autospec(
5146+
bigquery_storage_v1beta1.BigQueryStorageClient
5147+
)
5148+
session = bigquery_storage_v1beta1.types.ReadSession()
5149+
session.avro_schema.schema = json.dumps(
5150+
{
5151+
"type": "record",
5152+
"name": "__root__",
5153+
"fields": [
5154+
{"name": "name", "type": ["null", "string"]},
5155+
{"name": "age", "type": ["null", "long"]},
5156+
],
5157+
}
5158+
)
5159+
bqstorage_client.create_read_session.return_value = session
5160+
5161+
job.to_dataframe(bqstorage_client=bqstorage_client)
5162+
5163+
bqstorage_client.create_read_session.assert_called_once_with(
5164+
mock.ANY,
5165+
"projects/test-project",
5166+
read_options=mock.ANY,
5167+
# Use a single stream to preserve row order.
5168+
requested_streams=1,
5169+
)

0 commit comments

Comments
 (0)