Skip to content

Commit dd50bcf

Browse files
committed
feat: add Series.to_pandas_batches() method
1 parent 40c55a0 commit dd50bcf

File tree

3 files changed

+132
-2
lines changed

3 files changed

+132
-2
lines changed

bigframes/core/blocks.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,7 @@ def to_pandas_batches(
590590
page_size: Optional[int] = None,
591591
max_results: Optional[int] = None,
592592
allow_large_results: Optional[bool] = None,
593+
squeeze: Optional[bool] = False,
593594
):
594595
"""Download results one message at a time.
595596
@@ -605,7 +606,10 @@ def to_pandas_batches(
605606
for record_batch in execute_result.arrow_batches():
606607
df = io_pandas.arrow_to_pandas(record_batch, self.expr.schema)
607608
self._copy_index_to_pandas(df)
608-
yield df
609+
if squeeze:
610+
yield df.squeeze(axis=1)
611+
else:
612+
yield df
609613

610614
def _copy_index_to_pandas(self, df: pd.DataFrame):
611615
"""Set the index on pandas DataFrame to match this block.

bigframes/series.py

+76-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,18 @@
2323
import numbers
2424
import textwrap
2525
import typing
26-
from typing import Any, cast, List, Literal, Mapping, Optional, Sequence, Tuple, Union
26+
from typing import (
27+
Any,
28+
cast,
29+
Iterable,
30+
List,
31+
Literal,
32+
Mapping,
33+
Optional,
34+
Sequence,
35+
Tuple,
36+
Union,
37+
)
2738

2839
import bigframes_vendored.constants as constants
2940
import bigframes_vendored.pandas.core.series as vendored_pandas_series
@@ -478,6 +489,70 @@ def to_pandas(
478489
series.name = self._name
479490
return series
480491

492+
def to_pandas_batches(
493+
self,
494+
page_size: Optional[int] = None,
495+
max_results: Optional[int] = None,
496+
*,
497+
allow_large_results: Optional[bool] = None,
498+
) -> Iterable[pandas.Series]:
499+
"""Stream Series results to an iterable of pandas Series.
500+
501+
page_size and max_results determine the size and number of batches,
502+
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result
503+
504+
**Examples:**
505+
506+
>>> import bigframes.pandas as bpd
507+
>>> bpd.options.display.progress_bar = None
508+
>>> s = bpd.Series([4, 3, 2, 2, 3])
509+
510+
Iterate through the results in batches, limiting the total rows yielded
511+
across all batches via `max_results`:
512+
513+
>>> for s_batch in s.to_pandas_batches(max_results=3):
514+
... print(s_batch)
515+
0 4
516+
1 3
517+
2 2
518+
dtype: Int64
519+
520+
Alternatively, control the approximate size of each batch using `page_size`
521+
and fetch batches manually using `next()`:
522+
523+
>>> it = s.to_pandas_batches(page_size=2)
524+
>>> next(it)
525+
0 4
526+
1 3
527+
dtype: Int64
528+
>>> next(it)
529+
2 2
530+
3 2
531+
dtype: Int64
532+
533+
Args:
534+
page_size (int, default None):
535+
The maximum number of rows of each batch. Non-positive values are ignored.
536+
max_results (int, default None):
537+
The maximum total number of rows of all batches.
538+
allow_large_results (bool, default None):
539+
If not None, overrides the global setting to allow or disallow large query results
540+
over the default size limit of 10 GB.
541+
542+
Returns:
543+
Iterable[pandas.Series]:
544+
An iterable of smaller Series which combine to
545+
form the original Series. Results stream from bigquery,
546+
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable
547+
"""
548+
df = self._block.to_pandas_batches(
549+
page_size=page_size,
550+
max_results=max_results,
551+
allow_large_results=allow_large_results,
552+
squeeze=True,
553+
)
554+
return df
555+
481556
def _compute_dry_run(self) -> bigquery.QueryJob:
482557
_, query_job = self._block._compute_dry_run((self._value_column,))
483558
return query_job

tests/system/small/test_series_io.py

+51
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import pandas as pd
15+
import pytest
16+
1417
import bigframes
1518

1619

@@ -32,3 +35,51 @@ def test_to_pandas_override_global_option(scalars_df_index):
3235
bf_series.to_pandas(allow_large_results=False)
3336
assert bf_series._query_job.destination.table_id == table_id
3437
assert session._metrics.execution_count - execution_count == 1
38+
39+
40+
@pytest.mark.parametrize(
41+
("page_size", "max_results", "allow_large_results"),
42+
[
43+
pytest.param(None, None, True),
44+
pytest.param(2, None, False),
45+
pytest.param(None, 1, True),
46+
pytest.param(2, 5, False),
47+
pytest.param(3, 6, True),
48+
pytest.param(3, 100, False),
49+
pytest.param(100, 100, True),
50+
],
51+
)
52+
def test_to_pandas_batches(scalars_dfs, page_size, max_results, allow_large_results):
53+
scalars_df, scalars_pandas_df = scalars_dfs
54+
bf_series = scalars_df["int64_col"]
55+
pd_series = scalars_pandas_df["int64_col"]
56+
57+
total_rows = 0
58+
expected_total_rows = (
59+
min(max_results, len(pd_series)) if max_results else len(pd_series)
60+
)
61+
62+
hit_last_page = False
63+
for s in bf_series.to_pandas_batches(
64+
page_size=page_size,
65+
max_results=max_results,
66+
allow_large_results=allow_large_results,
67+
):
68+
assert not hit_last_page
69+
70+
actual_rows = s.shape[0]
71+
expected_rows = (
72+
min(page_size, expected_total_rows) if page_size else expected_total_rows
73+
)
74+
75+
assert actual_rows <= expected_rows
76+
if actual_rows < expected_rows:
77+
assert page_size
78+
hit_last_page = True
79+
80+
pd.testing.assert_series_equal(
81+
s, pd_series[total_rows : total_rows + actual_rows]
82+
)
83+
total_rows += actual_rows
84+
85+
assert total_rows == expected_total_rows

0 commit comments

Comments
 (0)