Skip to content

Commit 217bd2b

Browse files
authored
feat(bigquery): add create_bqstorage_client param to to_dataframe and to_arrow (#9573)
* feat(bigquery): add `create_bqstorage_client` param to `to_dataframe` and `to_arrow` When the `create_bqstorage_client` parameter is set to `True`, the BigQuery client constructs a BigQuery Storage API client for you. This removes the need for boilerplate code to manually construct both clients explitly with the same credentials. Does this make the `bqstorage_client` parameter unnecessary? In most cases, yes, but there are a few cases where we'll want to continue using it. * When partner tools use `to_dataframe`, they should continue to use `bqstorage_client` so that they can set the correct amended user-agent strings. * When a developer needs to override the default API endpoint for the BQ Storage API, they'll need to manually supply a `bqstorage_client`. * test for BQ Storage API usage in samples tests. * fix: close bqstorage client if created by to_dataframe/to_arrow * chore: blacken * doc: update versionadded * doc: update versionadded
1 parent 5447d1c commit 217bd2b

9 files changed

+431
-37
lines changed

bigquery/google/cloud/bigquery/client.py

+13
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,19 @@ def dataset(self, dataset_id, project=None):
353353

354354
return DatasetReference(project, dataset_id)
355355

356+
def _create_bqstorage_client(self):
357+
"""Create a BigQuery Storage API client using this client's credentials.
358+
359+
Returns:
360+
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient:
361+
A BigQuery Storage API client.
362+
"""
363+
from google.cloud import bigquery_storage_v1beta1
364+
365+
return bigquery_storage_v1beta1.BigQueryStorageClient(
366+
credentials=self._credentials
367+
)
368+
356369
def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY):
357370
"""API call: create the dataset via a POST request.
358371

bigquery/google/cloud/bigquery/job.py

+37-3
Original file line numberDiff line numberDiff line change
@@ -3152,7 +3152,12 @@ def result(
31523152

31533153
# If changing the signature of this method, make sure to apply the same
31543154
# changes to table.RowIterator.to_arrow()
3155-
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
3155+
def to_arrow(
3156+
self,
3157+
progress_bar_type=None,
3158+
bqstorage_client=None,
3159+
create_bqstorage_client=False,
3160+
):
31563161
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
31573162
table or query.
31583163
@@ -3185,6 +3190,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
31853190
31863191
Reading from a specific partition or snapshot is not
31873192
currently supported by this method.
3193+
create_bqstorage_client (bool):
3194+
**Beta Feature** Optional. If ``True``, create a BigQuery
3195+
Storage API client using the default API settings. The
3196+
BigQuery Storage API is a faster way to fetch rows from
3197+
BigQuery. See the ``bqstorage_client`` parameter for more
3198+
information.
3199+
3200+
This argument does nothing if ``bqstorage_client`` is supplied.
3201+
3202+
..versionadded:: 1.24.0
31883203
31893204
Returns:
31903205
pyarrow.Table
@@ -3199,12 +3214,20 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
31993214
..versionadded:: 1.17.0
32003215
"""
32013216
return self.result().to_arrow(
3202-
progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client
3217+
progress_bar_type=progress_bar_type,
3218+
bqstorage_client=bqstorage_client,
3219+
create_bqstorage_client=create_bqstorage_client,
32033220
)
32043221

32053222
# If changing the signature of this method, make sure to apply the same
32063223
# changes to table.RowIterator.to_dataframe()
3207-
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
3224+
def to_dataframe(
3225+
self,
3226+
bqstorage_client=None,
3227+
dtypes=None,
3228+
progress_bar_type=None,
3229+
create_bqstorage_client=False,
3230+
):
32083231
"""Return a pandas DataFrame from a QueryJob
32093232
32103233
Args:
@@ -3237,6 +3260,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
32373260
for details.
32383261
32393262
..versionadded:: 1.11.0
3263+
create_bqstorage_client (bool):
3264+
**Beta Feature** Optional. If ``True``, create a BigQuery
3265+
Storage API client using the default API settings. The
3266+
BigQuery Storage API is a faster way to fetch rows from
3267+
BigQuery. See the ``bqstorage_client`` parameter for more
3268+
information.
3269+
3270+
This argument does nothing if ``bqstorage_client`` is supplied.
3271+
3272+
..versionadded:: 1.24.0
32403273
32413274
Returns:
32423275
A :class:`~pandas.DataFrame` populated with row data and column
@@ -3250,6 +3283,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
32503283
bqstorage_client=bqstorage_client,
32513284
dtypes=dtypes,
32523285
progress_bar_type=progress_bar_type,
3286+
create_bqstorage_client=create_bqstorage_client,
32533287
)
32543288

32553289
def __iter__(self):

bigquery/google/cloud/bigquery/table.py

+102-34
Original file line numberDiff line numberDiff line change
@@ -1456,7 +1456,12 @@ def _to_arrow_iterable(self, bqstorage_client=None):
14561456

14571457
# If changing the signature of this method, make sure to apply the same
14581458
# changes to job.QueryJob.to_arrow()
1459-
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
1459+
def to_arrow(
1460+
self,
1461+
progress_bar_type=None,
1462+
bqstorage_client=None,
1463+
create_bqstorage_client=False,
1464+
):
14601465
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
14611466
table or query.
14621467
@@ -1489,6 +1494,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
14891494
14901495
Reading from a specific partition or snapshot is not
14911496
currently supported by this method.
1497+
create_bqstorage_client (bool):
1498+
**Beta Feature** Optional. If ``True``, create a BigQuery
1499+
Storage API client using the default API settings. The
1500+
BigQuery Storage API is a faster way to fetch rows from
1501+
BigQuery. See the ``bqstorage_client`` parameter for more
1502+
information.
1503+
1504+
This argument does nothing if ``bqstorage_client`` is supplied.
1505+
1506+
..versionadded:: 1.24.0
14921507
14931508
Returns:
14941509
pyarrow.Table
@@ -1504,22 +1519,33 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
15041519
if pyarrow is None:
15051520
raise ValueError(_NO_PYARROW_ERROR)
15061521

1507-
progress_bar = self._get_progress_bar(progress_bar_type)
1522+
owns_bqstorage_client = False
1523+
if not bqstorage_client and create_bqstorage_client:
1524+
owns_bqstorage_client = True
1525+
bqstorage_client = self.client._create_bqstorage_client()
15081526

1509-
record_batches = []
1510-
for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client):
1511-
record_batches.append(record_batch)
1527+
try:
1528+
progress_bar = self._get_progress_bar(progress_bar_type)
15121529

1513-
if progress_bar is not None:
1514-
# In some cases, the number of total rows is not populated
1515-
# until the first page of rows is fetched. Update the
1516-
# progress bar's total to keep an accurate count.
1517-
progress_bar.total = progress_bar.total or self.total_rows
1518-
progress_bar.update(record_batch.num_rows)
1530+
record_batches = []
1531+
for record_batch in self._to_arrow_iterable(
1532+
bqstorage_client=bqstorage_client
1533+
):
1534+
record_batches.append(record_batch)
15191535

1520-
if progress_bar is not None:
1521-
# Indicate that the download has finished.
1522-
progress_bar.close()
1536+
if progress_bar is not None:
1537+
# In some cases, the number of total rows is not populated
1538+
# until the first page of rows is fetched. Update the
1539+
# progress bar's total to keep an accurate count.
1540+
progress_bar.total = progress_bar.total or self.total_rows
1541+
progress_bar.update(record_batch.num_rows)
1542+
1543+
if progress_bar is not None:
1544+
# Indicate that the download has finished.
1545+
progress_bar.close()
1546+
finally:
1547+
if owns_bqstorage_client:
1548+
bqstorage_client.transport.channel.close()
15231549

15241550
if record_batches:
15251551
return pyarrow.Table.from_batches(record_batches)
@@ -1558,14 +1584,20 @@ def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
15581584

15591585
# If changing the signature of this method, make sure to apply the same
15601586
# changes to job.QueryJob.to_dataframe()
1561-
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
1587+
def to_dataframe(
1588+
self,
1589+
bqstorage_client=None,
1590+
dtypes=None,
1591+
progress_bar_type=None,
1592+
create_bqstorage_client=False,
1593+
):
15621594
"""Create a pandas DataFrame by loading all pages of a query.
15631595
15641596
Args:
15651597
bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient):
15661598
**Beta Feature** Optional. A BigQuery Storage API client. If
15671599
supplied, use the faster BigQuery Storage API to fetch rows
1568-
from BigQuery. This API is a billable API.
1600+
from BigQuery.
15691601
15701602
This method requires the ``pyarrow`` and
15711603
``google-cloud-bigquery-storage`` libraries.
@@ -1602,6 +1634,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
16021634
progress bar as a graphical dialog box.
16031635
16041636
..versionadded:: 1.11.0
1637+
create_bqstorage_client (bool):
1638+
**Beta Feature** Optional. If ``True``, create a BigQuery
1639+
Storage API client using the default API settings. The
1640+
BigQuery Storage API is a faster way to fetch rows from
1641+
BigQuery. See the ``bqstorage_client`` parameter for more
1642+
information.
1643+
1644+
This argument does nothing if ``bqstorage_client`` is supplied.
1645+
1646+
..versionadded:: 1.24.0
16051647
16061648
Returns:
16071649
pandas.DataFrame:
@@ -1621,32 +1663,44 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
16211663
if dtypes is None:
16221664
dtypes = {}
16231665

1624-
if bqstorage_client and self.max_results is not None:
1666+
if (
1667+
bqstorage_client or create_bqstorage_client
1668+
) and self.max_results is not None:
16251669
warnings.warn(
16261670
"Cannot use bqstorage_client if max_results is set, "
16271671
"reverting to fetching data with the tabledata.list endpoint.",
16281672
stacklevel=2,
16291673
)
1674+
create_bqstorage_client = False
16301675
bqstorage_client = None
16311676

1632-
progress_bar = self._get_progress_bar(progress_bar_type)
1677+
owns_bqstorage_client = False
1678+
if not bqstorage_client and create_bqstorage_client:
1679+
owns_bqstorage_client = True
1680+
bqstorage_client = self.client._create_bqstorage_client()
16331681

1634-
frames = []
1635-
for frame in self._to_dataframe_iterable(
1636-
bqstorage_client=bqstorage_client, dtypes=dtypes
1637-
):
1638-
frames.append(frame)
1682+
try:
1683+
progress_bar = self._get_progress_bar(progress_bar_type)
16391684

1640-
if progress_bar is not None:
1641-
# In some cases, the number of total rows is not populated
1642-
# until the first page of rows is fetched. Update the
1643-
# progress bar's total to keep an accurate count.
1644-
progress_bar.total = progress_bar.total or self.total_rows
1645-
progress_bar.update(len(frame))
1685+
frames = []
1686+
for frame in self._to_dataframe_iterable(
1687+
bqstorage_client=bqstorage_client, dtypes=dtypes
1688+
):
1689+
frames.append(frame)
1690+
1691+
if progress_bar is not None:
1692+
# In some cases, the number of total rows is not populated
1693+
# until the first page of rows is fetched. Update the
1694+
# progress bar's total to keep an accurate count.
1695+
progress_bar.total = progress_bar.total or self.total_rows
1696+
progress_bar.update(len(frame))
16461697

1647-
if progress_bar is not None:
1648-
# Indicate that the download has finished.
1649-
progress_bar.close()
1698+
if progress_bar is not None:
1699+
# Indicate that the download has finished.
1700+
progress_bar.close()
1701+
finally:
1702+
if owns_bqstorage_client:
1703+
bqstorage_client.transport.channel.close()
16501704

16511705
# Avoid concatting an empty list.
16521706
if not frames:
@@ -1667,11 +1721,18 @@ class _EmptyRowIterator(object):
16671721
pages = ()
16681722
total_rows = 0
16691723

1670-
def to_arrow(self, progress_bar_type=None):
1724+
def to_arrow(
1725+
self,
1726+
progress_bar_type=None,
1727+
bqstorage_client=None,
1728+
create_bqstorage_client=False,
1729+
):
16711730
"""[Beta] Create an empty class:`pyarrow.Table`.
16721731
16731732
Args:
16741733
progress_bar_type (Optional[str]): Ignored. Added for compatibility with RowIterator.
1734+
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
1735+
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
16751736
16761737
Returns:
16771738
pyarrow.Table: An empty :class:`pyarrow.Table`.
@@ -1680,13 +1741,20 @@ def to_arrow(self, progress_bar_type=None):
16801741
raise ValueError(_NO_PYARROW_ERROR)
16811742
return pyarrow.Table.from_arrays(())
16821743

1683-
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
1744+
def to_dataframe(
1745+
self,
1746+
bqstorage_client=None,
1747+
dtypes=None,
1748+
progress_bar_type=None,
1749+
create_bqstorage_client=False,
1750+
):
16841751
"""Create an empty dataframe.
16851752
16861753
Args:
16871754
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
16881755
dtypes (Any): Ignored. Added for compatibility with RowIterator.
16891756
progress_bar_type (Any): Ignored. Added for compatibility with RowIterator.
1757+
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
16901758
16911759
Returns:
16921760
pandas.DataFrame: An empty :class:`~pandas.DataFrame`.
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def download_public_data(client):
17+
18+
# [START bigquery_pandas_public_data]
19+
# TODO(developer): Import the client library.
20+
# from google.cloud import bigquery
21+
22+
# TODO(developer): Construct a BigQuery client object.
23+
# client = bigquery.Client()
24+
25+
# TODO(developer): Set table_id to the fully-qualified table ID in standard
26+
# SQL format, including the project ID and dataset ID.
27+
table_id = "bigquery-public-data.usa_names.usa_1910_current"
28+
29+
# Use the BigQuery Storage API to speed-up downloads of large tables.
30+
dataframe = client.list_rows(table_id).to_dataframe(create_bqstorage_client=True)
31+
32+
print(dataframe.info())
33+
# [END bigquery_pandas_public_data]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def download_public_data_sandbox(client):
17+
18+
# [START bigquery_pandas_public_data_sandbox]
19+
# TODO(developer): Import the client library.
20+
# from google.cloud import bigquery
21+
22+
# TODO(developer): Construct a BigQuery client object.
23+
# client = bigquery.Client()
24+
25+
# `SELECT *` is an anti-pattern in BigQuery because it is cheaper and
26+
# faster to use the BigQuery Storage API directly, but BigQuery Sandbox
27+
# users can only use the BigQuery Storage API to download query results.
28+
query_string = "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_current`"
29+
30+
# Use the BigQuery Storage API to speed-up downloads of large tables.
31+
dataframe = client.query(query_string).to_dataframe(create_bqstorage_client=True)
32+
33+
print(dataframe.info())
34+
# [END bigquery_pandas_public_data_sandbox]

0 commit comments

Comments
 (0)