Skip to content

Commit 3b0c7ae

Browse files
committed
Preserve order in to_dataframe with BQ Storage and queries containing ORDER BY
This fixes an issue where due to reading from multiple stream in parallel, the order of rows is not preserved. Normally this isn't an issue, but it is when the rows are query results from an ORDER BY query.
1 parent 4dc8c36 commit 3b0c7ae

File tree

3 files changed

+173
-29
lines changed

3 files changed

+173
-29
lines changed

bigquery/google/cloud/bigquery/job.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Define API Jobs."""
1616

1717
import copy
18+
import re
1819
import threading
1920

2021
import six
@@ -92,6 +93,14 @@ def _error_result_to_exception(error_result):
9293
)
9394

9495

96+
def _contains_order_by(query):
97+
"""Do we need to preserve the order of the query results?"""
98+
if query is None:
99+
return False
100+
101+
return re.search(r"ORDER\s+BY", query, re.IGNORECASE) is not None
102+
103+
95104
class Compression(object):
96105
"""The compression type to use for exported files. The default value is
97106
:attr:`NONE`.
@@ -2546,7 +2555,7 @@ def from_api_repr(cls, resource, client):
25462555
:returns: Job parsed from ``resource``.
25472556
"""
25482557
job_id, config = cls._get_resource_config(resource)
2549-
query = config["query"]["query"]
2558+
query = _helpers._get_sub_prop(config, ["query", "query"])
25502559
job = cls(job_id, query, client=client)
25512560
job._set_properties(resource)
25522561
return job
@@ -2849,7 +2858,9 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
28492858
dest_table_ref = self.destination
28502859
dest_table = Table(dest_table_ref, schema=schema)
28512860
dest_table._properties["numRows"] = self._query_results.total_rows
2852-
return self._client.list_rows(dest_table, retry=retry)
2861+
rows = self._client.list_rows(dest_table, retry=retry)
2862+
rows._preserve_order = _contains_order_by(self.query)
2863+
return rows
28532864

28542865
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
28552866
"""Return a pandas DataFrame from a QueryJob

bigquery/google/cloud/bigquery/table.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,7 @@ def __init__(
13481348
)
13491349
self._field_to_index = _helpers._field_to_index_mapping(schema)
13501350
self._page_size = page_size
1351+
self._preserve_order = False
13511352
self._project = client.project
13521353
self._schema = schema
13531354
self._selected_fields = selected_fields
@@ -1496,10 +1497,15 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
14961497
for field in self._selected_fields:
14971498
read_options.selected_fields.append(field.name)
14981499

1500+
requested_streams = 0
1501+
if self._preserve_order:
1502+
requested_streams = 1
1503+
14991504
session = bqstorage_client.create_read_session(
15001505
self._table.to_bqstorage(),
15011506
"projects/{}".format(self._project),
15021507
read_options=read_options,
1508+
requested_streams=requested_streams,
15031509
)
15041510

15051511
# We need to parse the schema manually so that we can rearrange the
@@ -1512,6 +1518,8 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
15121518
if not session.streams:
15131519
return pandas.DataFrame(columns=columns)
15141520

1521+
total_streams = len(session.streams)
1522+
15151523
# Use _to_dataframe_finished to notify worker threads when to quit.
15161524
# See: https://stackoverflow.com/a/29237343/101923
15171525
self._to_dataframe_finished = False
@@ -1560,7 +1568,7 @@ def get_frames(pool):
15601568

15611569
return frames
15621570

1563-
with concurrent.futures.ThreadPoolExecutor() as pool:
1571+
with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
15641572
try:
15651573
frames = get_frames(pool)
15661574
finally:

bigquery/tests/unit/test_job.py

+151-26
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import unittest
1818

1919
import mock
20+
import pytest
2021
from six.moves import http_client
2122

2223
try:
@@ -59,6 +60,47 @@ def _make_connection(*responses):
5960
return mock_conn
6061

6162

63+
def _make_job_resource(
64+
creation_time_ms=1437767599006,
65+
started_time_ms=1437767600007,
66+
ended_time_ms=1437767601008,
67+
started=False,
68+
ended=False,
69+
etag="abc-def-hjk",
70+
endpoint="https://www.googleapis.com",
71+
job_type="load",
72+
job_id="a-random-id",
73+
project_id="some-project",
74+
user_email="[email protected]",
75+
):
76+
resource = {
77+
"configuration": {job_type: {}},
78+
"statistics": {"creationTime": creation_time_ms, job_type: {}},
79+
"etag": etag,
80+
"id": "{}:{}".format(project_id, job_id),
81+
"jobReference": {"projectId": project_id, "jobId": job_id},
82+
"selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format(
83+
endpoint, project_id, job_id
84+
),
85+
"user_email": user_email,
86+
}
87+
88+
if started or ended:
89+
resource["statistics"]["startTime"] = started_time_ms
90+
91+
if ended:
92+
resource["statistics"]["endTime"] = ended_time_ms
93+
94+
if job_type == "query":
95+
resource["configuration"]["query"]["destinationTable"] = {
96+
"projectId": project_id,
97+
"datasetId": "_temp_dataset",
98+
"tableId": "_temp_table",
99+
}
100+
101+
return resource
102+
103+
62104
class Test__error_result_to_exception(unittest.TestCase):
63105
def _call_fut(self, *args, **kwargs):
64106
from google.cloud.bigquery import job
@@ -974,6 +1016,7 @@ class _Base(object):
9741016
from google.cloud.bigquery.dataset import DatasetReference
9751017
from google.cloud.bigquery.table import TableReference
9761018

1019+
ENDPOINT = "https://www.googleapis.com"
9771020
PROJECT = "project"
9781021
SOURCE1 = "http://example.com/source1.csv"
9791022
DS_ID = "dataset_id"
@@ -994,7 +1037,9 @@ def _setUpConstants(self):
9941037
self.WHEN = datetime.datetime.utcfromtimestamp(self.WHEN_TS).replace(tzinfo=UTC)
9951038
self.ETAG = "ETAG"
9961039
self.FULL_JOB_ID = "%s:%s" % (self.PROJECT, self.JOB_ID)
997-
self.RESOURCE_URL = "http://example.com/path/to/resource"
1040+
self.RESOURCE_URL = "{}/bigquery/v2/projects/{}/jobs/{}".format(
1041+
self.ENDPOINT, self.PROJECT, self.JOB_ID
1042+
)
9981043
self.USER_EMAIL = "[email protected]"
9991044

10001045
def _table_ref(self, table_id):
@@ -1004,30 +1049,19 @@ def _table_ref(self, table_id):
10041049

10051050
def _make_resource(self, started=False, ended=False):
10061051
self._setUpConstants()
1007-
resource = {
1008-
"configuration": {self.JOB_TYPE: {}},
1009-
"statistics": {"creationTime": self.WHEN_TS * 1000, self.JOB_TYPE: {}},
1010-
"etag": self.ETAG,
1011-
"id": self.FULL_JOB_ID,
1012-
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
1013-
"selfLink": self.RESOURCE_URL,
1014-
"user_email": self.USER_EMAIL,
1015-
}
1016-
1017-
if started or ended:
1018-
resource["statistics"]["startTime"] = self.WHEN_TS * 1000
1019-
1020-
if ended:
1021-
resource["statistics"]["endTime"] = (self.WHEN_TS + 1000) * 1000
1022-
1023-
if self.JOB_TYPE == "query":
1024-
resource["configuration"]["query"]["destinationTable"] = {
1025-
"projectId": self.PROJECT,
1026-
"datasetId": "_temp_dataset",
1027-
"tableId": "_temp_table",
1028-
}
1029-
1030-
return resource
1052+
return _make_job_resource(
1053+
creation_time_ms=int(self.WHEN_TS * 1000),
1054+
started_time_ms=int(self.WHEN_TS * 1000),
1055+
ended_time_ms=int(self.WHEN_TS * 1000) + 1000000,
1056+
started=started,
1057+
ended=ended,
1058+
etag=self.ETAG,
1059+
endpoint=self.ENDPOINT,
1060+
job_type=self.JOB_TYPE,
1061+
job_id=self.JOB_ID,
1062+
project_id=self.PROJECT,
1063+
user_email=self.USER_EMAIL,
1064+
)
10311065

10321066
def _verifyInitialReadonlyProperties(self, job):
10331067
# root elements of resource
@@ -4684,7 +4718,11 @@ def test_to_dataframe_bqstorage(self):
46844718
job.to_dataframe(bqstorage_client=bqstorage_client)
46854719

46864720
bqstorage_client.create_read_session.assert_called_once_with(
4687-
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
4721+
mock.ANY,
4722+
"projects/{}".format(self.PROJECT),
4723+
read_options=mock.ANY,
4724+
# Use default number of streams for best performance.
4725+
requested_streams=0,
46884726
)
46894727

46904728
@unittest.skipIf(pandas is None, "Requires `pandas`")
@@ -5039,3 +5077,90 @@ def test_from_api_repr_normal(self):
50395077
self.assertEqual(entry.pending_units, self.PENDING_UNITS)
50405078
self.assertEqual(entry.completed_units, self.COMPLETED_UNITS)
50415079
self.assertEqual(entry.slot_millis, self.SLOT_MILLIS)
5080+
5081+
5082+
@pytest.mark.parametrize(
5083+
"query,expected",
5084+
(
5085+
(None, False),
5086+
("", False),
5087+
("select name, age from table", False),
5088+
("select name, age from table LIMIT 10;", False),
5089+
("select name, age from table order by other_column;", True),
5090+
("Select name, age From table Order By other_column", True),
5091+
("SELECT name, age FROM table ORDER BY other_column;", True),
5092+
("select name, age from table order\nby other_column", True),
5093+
("Select name, age From table Order\nBy other_column;", True),
5094+
("SELECT name, age FROM table ORDER\nBY other_column", True),
5095+
("SelecT name, age froM table OrdeR \n\t BY other_column;", True),
5096+
),
5097+
)
5098+
def test__contains_order_by(query, expected):
5099+
from google.cloud.bigquery import job as mut
5100+
5101+
assert mut._contains_order_by(query) == expected
5102+
5103+
5104+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
5105+
@pytest.mark.skipif(
5106+
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
5107+
)
5108+
@pytest.mark.parametrize(
5109+
"query",
5110+
(
5111+
"select name, age from table order by other_column;",
5112+
"Select name, age From table Order By other_column;",
5113+
"SELECT name, age FROM table ORDER BY other_column;",
5114+
"select name, age from table order\nby other_column;",
5115+
"Select name, age From table Order\nBy other_column;",
5116+
"SELECT name, age FROM table ORDER\nBY other_column;",
5117+
"SelecT name, age froM table OrdeR \n\t BY other_column;",
5118+
),
5119+
)
5120+
def test_to_dataframe_bqstorage_preserve_order(query):
5121+
from google.cloud.bigquery.job import QueryJob as target_class
5122+
5123+
job_resource = _make_job_resource(
5124+
project_id="test-project", job_type="query", ended=True
5125+
)
5126+
job_resource["configuration"]["query"]["query"] = query
5127+
job_resource["status"] = {"state": "DONE"}
5128+
get_query_results_resource = {
5129+
"jobComplete": True,
5130+
"jobReference": {"projectId": "test-project", "jobId": "test-job"},
5131+
"schema": {
5132+
"fields": [
5133+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
5134+
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
5135+
]
5136+
},
5137+
"totalRows": "4",
5138+
}
5139+
connection = _make_connection(get_query_results_resource, job_resource)
5140+
client = _make_client(connection=connection)
5141+
job = target_class.from_api_repr(job_resource, client)
5142+
bqstorage_client = mock.create_autospec(
5143+
bigquery_storage_v1beta1.BigQueryStorageClient
5144+
)
5145+
session = bigquery_storage_v1beta1.types.ReadSession()
5146+
session.avro_schema.schema = json.dumps(
5147+
{
5148+
"type": "record",
5149+
"name": "__root__",
5150+
"fields": [
5151+
{"name": "name", "type": ["null", "string"]},
5152+
{"name": "age", "type": ["null", "long"]},
5153+
],
5154+
}
5155+
)
5156+
bqstorage_client.create_read_session.return_value = session
5157+
5158+
job.to_dataframe(bqstorage_client=bqstorage_client)
5159+
5160+
bqstorage_client.create_read_session.assert_called_once_with(
5161+
mock.ANY,
5162+
"projects/test-project",
5163+
read_options=mock.ANY,
5164+
# Use a single stream to preserve row order.
5165+
requested_streams=1,
5166+
)

0 commit comments

Comments
 (0)