Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Series.to_pandas_batches() method #1592

Merged
merged 2 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ def to_pandas_batches(
page_size: Optional[int] = None,
max_results: Optional[int] = None,
allow_large_results: Optional[bool] = None,
squeeze: Optional[bool] = False,
):
"""Download results one message at a time.

Expand All @@ -605,7 +606,10 @@ def to_pandas_batches(
for record_batch in execute_result.arrow_batches():
df = io_pandas.arrow_to_pandas(record_batch, self.expr.schema)
self._copy_index_to_pandas(df)
yield df
if squeeze:
yield df.squeeze(axis=1)
else:
yield df

def _copy_index_to_pandas(self, df: pd.DataFrame):
"""Set the index on pandas DataFrame to match this block.
Expand Down
77 changes: 76 additions & 1 deletion bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,18 @@
import numbers
import textwrap
import typing
from typing import Any, cast, List, Literal, Mapping, Optional, Sequence, Tuple, Union
from typing import (
Any,
cast,
Iterable,
List,
Literal,
Mapping,
Optional,
Sequence,
Tuple,
Union,
)

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.series as vendored_pandas_series
Expand Down Expand Up @@ -478,6 +489,70 @@ def to_pandas(
series.name = self._name
return series

def to_pandas_batches(
self,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
*,
allow_large_results: Optional[bool] = None,
) -> Iterable[pandas.Series]:
"""Stream Series results to an iterable of pandas Series.

page_size and max_results determine the size and number of batches,
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result

**Examples:**

>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None
>>> s = bpd.Series([4, 3, 2, 2, 3])

Iterate through the results in batches, limiting the total rows yielded
across all batches via `max_results`:

>>> for s_batch in s.to_pandas_batches(max_results=3):
... print(s_batch)
0 4
1 3
2 2
dtype: Int64

Alternatively, control the approximate size of each batch using `page_size`
and fetch batches manually using `next()`:

>>> it = s.to_pandas_batches(page_size=2)
>>> next(it)
0 4
1 3
dtype: Int64
>>> next(it)
2 2
3 2
dtype: Int64

Args:
page_size (int, default None):
The maximum number of rows of each batch. Non-positive values are ignored.
max_results (int, default None):
The maximum total number of rows of all batches.
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.

Returns:
Iterable[pandas.Series]:
An iterable of smaller Series which combine to
form the original Series. Results stream from bigquery,
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable
"""
df = self._block.to_pandas_batches(
page_size=page_size,
max_results=max_results,
allow_large_results=allow_large_results,
squeeze=True,
)
return df

def _compute_dry_run(self) -> bigquery.QueryJob:
_, query_job = self._block._compute_dry_run((self._value_column,))
return query_job
Expand Down
51 changes: 51 additions & 0 deletions tests/system/small/test_series_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
import pytest

import bigframes


Expand All @@ -32,3 +35,51 @@ def test_to_pandas_override_global_option(scalars_df_index):
bf_series.to_pandas(allow_large_results=False)
assert bf_series._query_job.destination.table_id == table_id
assert session._metrics.execution_count - execution_count == 1


@pytest.mark.parametrize(
("page_size", "max_results", "allow_large_results"),
[
pytest.param(None, None, True),
pytest.param(2, None, False),
pytest.param(None, 1, True),
pytest.param(2, 5, False),
pytest.param(3, 6, True),
pytest.param(3, 100, False),
pytest.param(100, 100, True),
],
)
def test_to_pandas_batches(scalars_dfs, page_size, max_results, allow_large_results):
scalars_df, scalars_pandas_df = scalars_dfs
bf_series = scalars_df["int64_col"]
pd_series = scalars_pandas_df["int64_col"]

total_rows = 0
expected_total_rows = (
min(max_results, len(pd_series)) if max_results else len(pd_series)
)

hit_last_page = False
for s in bf_series.to_pandas_batches(
page_size=page_size,
max_results=max_results,
allow_large_results=allow_large_results,
):
assert not hit_last_page

actual_rows = s.shape[0]
expected_rows = (
min(page_size, expected_total_rows) if page_size else expected_total_rows
)

assert actual_rows <= expected_rows
if actual_rows < expected_rows:
assert page_size
hit_last_page = True

pd.testing.assert_series_equal(
s, pd_series[total_rows : total_rows + actual_rows]
)
total_rows += actual_rows

assert total_rows == expected_total_rows