Skip to content

Commit 4dc8c36

Browse files
authored
Respect progress_bar_type in to_dataframe when used with BQ Storage API (#7697)
* fix: `to_dataframe` respects `progress_bar_type` with BQ Storage API * Add unit test for progress bar. * Add test for full queue. * Add worker queue for progress bar to prevent lost tqdm updates. The worker queue runs in a background thread, so it's more likely to be able to keep up with the other workers that are adding to the worker queue. * Test that progress bar updates more than once.
1 parent fb8c802 commit 4dc8c36

File tree

2 files changed

+200
-15
lines changed

2 files changed

+200
-15
lines changed

bigquery/google/cloud/bigquery/table.py

+97-4
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import datetime
2323
import json
2424
import operator
25+
import threading
26+
import time
2527
import warnings
2628

2729
import six
30+
from six.moves import queue
2831

2932
try:
3033
from google.cloud import bigquery_storage_v1beta1
@@ -66,7 +69,12 @@
6669
)
6770
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
6871
_MARKER = object()
69-
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.
72+
_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds.
73+
74+
# Send multiple updates from the worker threads, so there are at least a few
75+
# waiting next time the prgrogess bar is updated.
76+
_PROGRESS_UPDATES_PER_INTERVAL = 3
77+
_PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL
7078

7179

7280
def _reference_getter(table):
@@ -1274,6 +1282,16 @@ def __repr__(self):
12741282
return "Row({}, {})".format(self._xxx_values, f2i)
12751283

12761284

1285+
class _NoopProgressBarQueue(object):
1286+
"""A fake Queue class that does nothing.
1287+
1288+
This is used when there is no progress bar to send updates to.
1289+
"""
1290+
1291+
def put_nowait(self, item):
1292+
"""Don't actually do anything with the item."""
1293+
1294+
12771295
class RowIterator(HTTPIterator):
12781296
"""A class for iterating through HTTP/JSON API row list responses.
12791297
@@ -1392,7 +1410,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
13921410
return pandas.concat(frames)
13931411

13941412
def _to_dataframe_bqstorage_stream(
1395-
self, bqstorage_client, dtypes, columns, session, stream
1413+
self, bqstorage_client, dtypes, columns, session, stream, worker_queue
13961414
):
13971415
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
13981416
rowstream = bqstorage_client.read_rows(position).rows(session)
@@ -1403,6 +1421,13 @@ def _to_dataframe_bqstorage_stream(
14031421
return
14041422
frames.append(page.to_dataframe(dtypes=dtypes))
14051423

1424+
try:
1425+
worker_queue.put_nowait(page.num_items)
1426+
except queue.Full:
1427+
# It's okay if we miss a few progress updates. Don't slow
1428+
# down parsing for that.
1429+
pass
1430+
14061431
# Avoid errors on unlucky streams with no blocks. pandas.concat
14071432
# will fail on an empty list.
14081433
if not frames:
@@ -1412,7 +1437,47 @@ def _to_dataframe_bqstorage_stream(
14121437
# the end using manually-parsed schema.
14131438
return pandas.concat(frames)[columns]
14141439

1415-
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
1440+
def _process_worker_updates(self, worker_queue, progress_queue):
1441+
last_update_time = time.time()
1442+
current_update = 0
1443+
1444+
# Sum all updates in a contant loop.
1445+
while True:
1446+
try:
1447+
current_update += worker_queue.get(timeout=_PROGRESS_INTERVAL)
1448+
1449+
# Time to send to the progress bar queue?
1450+
current_time = time.time()
1451+
elapsed_time = current_time - last_update_time
1452+
if elapsed_time > _PROGRESS_WORKER_INTERVAL:
1453+
progress_queue.put(current_update)
1454+
last_update_time = current_time
1455+
current_update = 0
1456+
1457+
except queue.Empty:
1458+
# Keep going, unless there probably aren't going to be any
1459+
# additional updates.
1460+
if self._to_dataframe_finished:
1461+
progress_queue.put(current_update)
1462+
return
1463+
1464+
def _process_progress_updates(self, progress_queue, progress_bar):
1465+
if progress_bar is None:
1466+
return
1467+
1468+
# Output all updates since the last interval.
1469+
while True:
1470+
try:
1471+
next_update = progress_queue.get_nowait()
1472+
progress_bar.update(next_update)
1473+
except queue.Empty:
1474+
break
1475+
1476+
if self._to_dataframe_finished:
1477+
progress_bar.close()
1478+
return
1479+
1480+
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
14161481
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
14171482
if bigquery_storage_v1beta1 is None:
14181483
raise ValueError(_NO_BQSTORAGE_ERROR)
@@ -1451,6 +1516,18 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14511516
# See: https://stackoverflow.com/a/29237343/101923
14521517
self._to_dataframe_finished = False
14531518

1519+
# Create a queue to track progress updates across threads.
1520+
worker_queue = _NoopProgressBarQueue()
1521+
progress_queue = None
1522+
progress_thread = None
1523+
if progress_bar is not None:
1524+
worker_queue = queue.Queue()
1525+
progress_queue = queue.Queue()
1526+
progress_thread = threading.Thread(
1527+
target=self._process_worker_updates, args=(worker_queue, progress_queue)
1528+
)
1529+
progress_thread.start()
1530+
14541531
def get_frames(pool):
14551532
frames = []
14561533

@@ -1466,6 +1543,7 @@ def get_frames(pool):
14661543
columns,
14671544
session,
14681545
stream,
1546+
worker_queue,
14691547
)
14701548
for stream in session.streams
14711549
]
@@ -1475,6 +1553,11 @@ def get_frames(pool):
14751553
not_done, timeout=_PROGRESS_INTERVAL
14761554
)
14771555
frames.extend([future.result() for future in done])
1556+
1557+
# The progress bar needs to update on the main thread to avoid
1558+
# contention over stdout / stderr.
1559+
self._process_progress_updates(progress_queue, progress_bar)
1560+
14781561
return frames
14791562

14801563
with concurrent.futures.ThreadPoolExecutor() as pool:
@@ -1486,6 +1569,14 @@ def get_frames(pool):
14861569
# definition (enforced by the global interpreter lock).
14871570
self._to_dataframe_finished = True
14881571

1572+
# Shutdown all background threads, now that they should know to
1573+
# exit early.
1574+
pool.shutdown(wait=True)
1575+
if progress_thread is not None:
1576+
progress_thread.join()
1577+
1578+
# Update the progress bar one last time to close it.
1579+
self._process_progress_updates(progress_queue, progress_bar)
14891580
return pandas.concat(frames)
14901581

14911582
def _get_progress_bar(self, progress_bar_type):
@@ -1585,7 +1676,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
15851676

15861677
if bqstorage_client is not None:
15871678
try:
1588-
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
1679+
return self._to_dataframe_bqstorage(
1680+
bqstorage_client, dtypes, progress_bar=progress_bar
1681+
)
15891682
except google.api_core.exceptions.Forbidden:
15901683
# Don't hide errors such as insufficient permissions to create
15911684
# a read session, or the API is not enabled. Both of those are

bigquery/tests/unit/test_table.py

+103-11
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import mock
2323
import pytest
2424
import six
25+
from six.moves import queue
2526

2627
import google.api_core.exceptions
2728

@@ -1816,9 +1817,12 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18161817
bqstorage_client = mock.create_autospec(
18171818
bigquery_storage_v1beta1.BigQueryStorageClient
18181819
)
1819-
session = bigquery_storage_v1beta1.types.ReadSession(
1820-
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
1821-
)
1820+
streams = [
1821+
# Use two streams we want to check frames are read from each stream.
1822+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
1823+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
1824+
]
1825+
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
18221826
session.avro_schema.schema = json.dumps(
18231827
{
18241828
"fields": [
@@ -1836,20 +1840,25 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18361840

18371841
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
18381842
mock_rowstream.rows.return_value = mock_rows
1843+
page_items = [
1844+
{"colA": 1, "colB": "abc", "colC": 2.0},
1845+
{"colA": -1, "colB": "def", "colC": 4.0},
1846+
]
18391847

18401848
def blocking_to_dataframe(*args, **kwargs):
18411849
# Sleep for longer than the waiting interval so that we know we're
18421850
# only reading one page per loop at most.
18431851
time.sleep(2 * mut._PROGRESS_INTERVAL)
1844-
return pandas.DataFrame(
1845-
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
1846-
columns=["colA", "colB", "colC"],
1847-
)
1852+
return pandas.DataFrame(page_items, columns=["colA", "colB", "colC"])
18481853

18491854
mock_page = mock.create_autospec(reader.ReadRowsPage)
18501855
mock_page.to_dataframe.side_effect = blocking_to_dataframe
1851-
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
1852-
type(mock_rows).pages = mock_pages
1856+
mock_pages = (mock_page, mock_page, mock_page)
1857+
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
1858+
1859+
# Test that full queue errors are ignored.
1860+
mock_queue = mock.create_autospec(mut._NoopProgressBarQueue)
1861+
mock_queue().put_nowait.side_effect = queue.Full
18531862

18541863
schema = [
18551864
schema.SchemaField("colA", "IGNORED"),
@@ -1866,17 +1875,100 @@ def blocking_to_dataframe(*args, **kwargs):
18661875
selected_fields=schema,
18671876
)
18681877

1869-
with mock.patch(
1878+
with mock.patch.object(mut, "_NoopProgressBarQueue", mock_queue), mock.patch(
18701879
"concurrent.futures.wait", wraps=concurrent.futures.wait
18711880
) as mock_wait:
18721881
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
18731882

1883+
# Are the columns in the expected order?
18741884
column_names = ["colA", "colC", "colB"]
18751885
self.assertEqual(list(got), column_names)
1876-
self.assertEqual(len(got.index), 6)
1886+
1887+
# Have expected number of rows?
1888+
total_pages = len(streams) * len(mock_pages)
1889+
total_rows = len(page_items) * total_pages
1890+
self.assertEqual(len(got.index), total_rows)
1891+
18771892
# Make sure that this test looped through multiple progress intervals.
18781893
self.assertGreaterEqual(mock_wait.call_count, 2)
18791894

1895+
# Make sure that this test pushed to the progress queue.
1896+
self.assertEqual(mock_queue().put_nowait.call_count, total_pages)
1897+
1898+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1899+
@unittest.skipIf(
1900+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1901+
)
1902+
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
1903+
@mock.patch("tqdm.tqdm")
1904+
def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
1905+
from google.cloud.bigquery import schema
1906+
from google.cloud.bigquery import table as mut
1907+
from google.cloud.bigquery_storage_v1beta1 import reader
1908+
1909+
# Speed up testing.
1910+
mut._PROGRESS_INTERVAL = 0.01
1911+
1912+
bqstorage_client = mock.create_autospec(
1913+
bigquery_storage_v1beta1.BigQueryStorageClient
1914+
)
1915+
streams = [
1916+
# Use two streams we want to check that progress bar updates are
1917+
# sent from each stream.
1918+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
1919+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
1920+
]
1921+
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
1922+
session.avro_schema.schema = json.dumps({"fields": [{"name": "testcol"}]})
1923+
bqstorage_client.create_read_session.return_value = session
1924+
1925+
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
1926+
bqstorage_client.read_rows.return_value = mock_rowstream
1927+
1928+
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
1929+
mock_rowstream.rows.return_value = mock_rows
1930+
mock_page = mock.create_autospec(reader.ReadRowsPage)
1931+
page_items = [-1, 0, 1]
1932+
type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items))
1933+
1934+
def blocking_to_dataframe(*args, **kwargs):
1935+
# Sleep for longer than the waiting interval. This ensures the
1936+
# progress_queue gets written to more than once because it gives
1937+
# the worker->progress updater time to sum intermediate updates.
1938+
time.sleep(2 * mut._PROGRESS_INTERVAL)
1939+
return pandas.DataFrame({"testcol": page_items})
1940+
1941+
mock_page.to_dataframe.side_effect = blocking_to_dataframe
1942+
mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page)
1943+
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
1944+
1945+
schema = [schema.SchemaField("testcol", "IGNORED")]
1946+
1947+
row_iterator = mut.RowIterator(
1948+
_mock_client(),
1949+
None, # api_request: ignored
1950+
None, # path: ignored
1951+
schema,
1952+
table=mut.TableReference.from_string("proj.dset.tbl"),
1953+
selected_fields=schema,
1954+
)
1955+
1956+
row_iterator.to_dataframe(
1957+
bqstorage_client=bqstorage_client, progress_bar_type="tqdm"
1958+
)
1959+
1960+
# Make sure that this test updated the progress bar once per page from
1961+
# each stream.
1962+
total_pages = len(streams) * len(mock_pages)
1963+
expected_total_rows = total_pages * len(page_items)
1964+
progress_updates = [
1965+
args[0] for args, kwargs in tqdm_mock().update.call_args_list
1966+
]
1967+
# Should have sent >1 update due to delay in blocking_to_dataframe.
1968+
self.assertGreater(len(progress_updates), 1)
1969+
self.assertEqual(sum(progress_updates), expected_total_rows)
1970+
tqdm_mock().close.assert_called_once()
1971+
18801972
@unittest.skipIf(pandas is None, "Requires `pandas`")
18811973
@unittest.skipIf(
18821974
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"

0 commit comments

Comments
 (0)