Skip to content

feat: add items, apply methods to DataFrame #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,12 @@ def isin(self, values) -> DataFrame:
f"isin(), you passed a [{type(values).__name__}]"
)

def items(self):
column_ids = self._block.value_columns
column_labels = self._block.column_labels
for col_id, col_label in zip(column_ids, column_labels):
yield col_label, bigframes.series.Series(self._block.select_column(col_id))

def dropna(
self,
*,
Expand Down Expand Up @@ -2382,6 +2388,18 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame:
ops.RemoteFunctionOp(func, apply_on_null=(na_action is None))
)

def apply(self, func, *, args: typing.Tuple = (), **kwargs):
results = {name: func(col, *args, **kwargs) for name, col in self.items()}
if all(
[
isinstance(val, bigframes.series.Series) or utils.is_list_like(val)
for val in results.values()
]
):
return DataFrame(data=results)
else:
return pandas.Series(data=results)

def drop_duplicates(
self,
subset: typing.Union[blocks.Label, typing.Sequence[blocks.Label]] = None,
Expand Down
51 changes: 51 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,57 @@ def test_df_bfill(scalars_dfs):
pandas.testing.assert_frame_equal(bf_result, pd_result)


def test_apply_series_series_callable(
scalars_df_index,
scalars_pandas_df_index,
):
columns = ["int64_too", "int64_col"]

def foo(series, arg1, arg2, *, kwarg1=0, kwarg2=0):
return series**2 + (arg1 * arg2 % 4) + (kwarg1 * kwarg2 % 7)

bf_result = (
scalars_df_index[columns]
.apply(foo, args=(33, 61), kwarg1=52, kwarg2=21)
.to_pandas()
)

pd_result = scalars_pandas_df_index[columns].apply(
foo, args=(33, 61), kwarg1=52, kwarg2=21
)

pandas.testing.assert_frame_equal(bf_result, pd_result)


def test_apply_series_listlike_callable(
scalars_df_index,
scalars_pandas_df_index,
):
columns = ["int64_too", "int64_col"]
bf_result = (
scalars_df_index[columns].apply(lambda x: [len(x), x.min(), 24]).to_pandas()
)

pd_result = scalars_pandas_df_index[columns].apply(lambda x: [len(x), x.min(), 24])

# Convert default pandas dtypes `int64` to match BigQuery DataFrames dtypes.
pd_result.index = pd_result.index.astype("Int64")
pd_result = pd_result.astype("Int64")
pandas.testing.assert_frame_equal(bf_result, pd_result)


def test_apply_series_scalar_callable(
scalars_df_index,
scalars_pandas_df_index,
):
columns = ["int64_too", "int64_col"]
bf_result = scalars_df_index[columns].apply(lambda x: x.sum())

pd_result = scalars_pandas_df_index[columns].apply(lambda x: x.sum())

pandas.testing.assert_series_equal(bf_result, pd_result)


def test_df_isin_list(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
values = ["Hello, World!", 55555, 2.51, pd.NA, True]
Expand Down
34 changes: 34 additions & 0 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,18 @@ def isin(self, values):
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def items(self):
"""
Iterate over (column name, Series) pairs.

Iterates over the DataFrame columns, returning a tuple with
the column name and the content as a Series.

Returns:
Iterator: Iterator of label, Series for each column.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

# ----------------------------------------------------------------------
# Sorting

Expand Down Expand Up @@ -1420,6 +1432,28 @@ def merge(
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def apply(self, func, *, args=(), **kwargs):
"""Apply a function along an axis of the DataFrame.

Objects passed to the function are Series objects whose index is
the DataFrame's index (``axis=0``) the final return type
is inferred from the return type of the applied function.

Args:
func (function):
Function to apply to each column or row.
args (tuple):
Positional arguments to pass to `func` in addition to the
array/series.
**kwargs:
Additional keyword arguments to pass as keywords arguments to
`func`.

Returns:
pandas.Series or bigframes.DataFrame: Result of applying ``func`` along the given axis of the DataFrame.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

# ----------------------------------------------------------------------
# ndarray-like stats methods

Expand Down