Skip to content

feat: Add Series.combine #680

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,22 +1298,36 @@ def coalesce_impl(
return ibis.coalesce(x, y)


@scalar_op_compiler.register_binary_op(ops.cliplower_op)
def clip_lower(
@scalar_op_compiler.register_binary_op(ops.maximum_op)
def maximum_impl(
value: ibis_types.Value,
lower: ibis_types.Value,
):
# Note: propagates nulls
return ibis.case().when(lower.isnull() | (value < lower), lower).else_(value).end()


@scalar_op_compiler.register_binary_op(ops.clipupper_op)
def clip_upper(
@scalar_op_compiler.register_binary_op(ops.minimum_op)
def minimum_impl(
value: ibis_types.Value,
upper: ibis_types.Value,
):
# Note: propagates nulls
return ibis.case().when(upper.isnull() | (value > upper), upper).else_(value).end()


@scalar_op_compiler.register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True)
def binary_remote_function_op_impl(
x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp
):
if not hasattr(op.func, "bigframes_remote_function"):
raise TypeError(
f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}"
)
x_transformed = op.func(x, y)
return x_transformed


# Ternary Operations
@scalar_op_compiler.register_ternary_op(ops.where_op)
def where_op(
Expand Down
16 changes: 14 additions & 2 deletions bigframes/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,8 @@ def output_type(self, *input_types):

# Binary Ops
fillna_op = create_binary_op(name="fillna", type_signature=op_typing.COERCE)
cliplower_op = create_binary_op(name="clip_lower", type_signature=op_typing.COERCE)
clipupper_op = create_binary_op(name="clip_upper", type_signature=op_typing.COERCE)
maximum_op = create_binary_op(name="maximum", type_signature=op_typing.COERCE)
minimum_op = create_binary_op(name="minimum", type_signature=op_typing.COERCE)
coalesce_op = create_binary_op(name="coalesce", type_signature=op_typing.COERCE)


Expand Down Expand Up @@ -587,6 +587,16 @@ def output_type(self, *input_types):
raise TypeError(f"Cannot subtract dtypes {left_type} and {right_type}")


@dataclasses.dataclass(frozen=True)
class BinaryRemoteFunctionOp(BinaryOp):
name: typing.ClassVar[str] = "binary_remote_function"
func: typing.Callable

def output_type(self, *input_types):
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
return self.func.output_dtype


add_op = AddOp()
sub_op = SubOp()
mul_op = create_binary_op(name="mul", type_signature=op_typing.BINARY_NUMERIC)
Expand Down Expand Up @@ -713,4 +723,6 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
np.divide: div_op,
np.power: pow_op,
np.arctan2: arctan2_op,
np.maximum: maximum_op,
np.minimum: minimum_op,
}
36 changes: 34 additions & 2 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,9 @@ def clip(self, lower, upper):
if lower is None and upper is None:
return self
if lower is None:
return self._apply_binary_op(upper, ops.clipupper_op, alignment="left")
return self._apply_binary_op(upper, ops.minimum_op, alignment="left")
if upper is None:
return self._apply_binary_op(lower, ops.cliplower_op, alignment="left")
return self._apply_binary_op(lower, ops.maximum_op, alignment="left")
value_id, lower_id, upper_id, block = self._align3(lower, upper)
block, result_id = block.apply_ternary_op(
value_id, lower_id, upper_id, ops.clip_op
Expand Down Expand Up @@ -1374,6 +1374,38 @@ def apply(
materialized_series = result_series._cached()
return materialized_series

def combine(
self,
other,
func,
) -> Series:
if not callable(func):
raise ValueError(
"Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported."
)

if not hasattr(func, "bigframes_remote_function"):
# Keep this in sync with .apply
try:
return func(self, other)
except Exception as ex:
# This could happen if any of the operators in func is not
# supported on a Series. Let's guide the customer to use a
# remote function instead
if hasattr(ex, "message"):
ex.message += f"\n{_remote_function_recommendation_message}"
raise

reprojected_series = Series(self._block._force_reproject())
result_series = reprojected_series._apply_binary_op(
other, ops.BinaryRemoteFunctionOp(func=func)
)

# return Series with materialized result so that any error in the remote
# function is caught early
materialized_series = result_series._cached()
return materialized_series

def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series:
return Series(self._get_block().add_prefix(prefix))

Expand Down
35 changes: 35 additions & 0 deletions tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,41 @@ def stringify(x):
)


# @pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_binop(session, scalars_dfs, dataset_id, bq_cf_connection):
try:

def func(x, y):
return x * abs(y % 4)

remote_func = session.remote_function(
[str, int],
str,
dataset_id,
bq_cf_connection,
reuse=False,
)(func)

scalars_df, scalars_pandas_df = scalars_dfs

scalars_df = scalars_df.dropna()
scalars_pandas_df = scalars_pandas_df.dropna()
bf_result = (
scalars_df["string_col"]
.combine(scalars_df["int64_col"], remote_func)
.to_pandas()
)
pd_result = scalars_pandas_df["string_col"].combine(
scalars_pandas_df["int64_col"], func
)
pandas.testing.assert_series_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, remote_func
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_decorator_with_bigframes_series(
session, scalars_dfs, dataset_id, bq_cf_connection
Expand Down
53 changes: 22 additions & 31 deletions tests/system/small/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,6 @@ def test_df_ufuncs(scalars_dfs, opname):
pd.testing.assert_frame_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("opname",),
[
("add",),
("subtract",),
("multiply",),
("divide",),
("power",),
("arctan2",),
],
)
def test_series_binary_ufuncs(floats_product_pd, floats_product_bf, opname):
bf_result = getattr(np, opname)(
floats_product_bf.float64_col_x, floats_product_bf.float64_col_y
).to_pandas()
pd_result = getattr(np, opname)(
floats_product_pd.float64_col_x, floats_product_pd.float64_col_y
)
pd.testing.assert_series_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("opname",),
[
Expand All @@ -106,30 +85,42 @@ def test_series_binary_ufuncs(floats_product_pd, floats_product_bf, opname):
)
def test_df_binary_ufuncs(scalars_dfs, opname):
scalars_df, scalars_pandas_df = scalars_dfs
op = getattr(np, opname)

bf_result = getattr(np, opname)(
scalars_df[["float64_col", "int64_col"]], 5.1
).to_pandas()
pd_result = getattr(np, opname)(
scalars_pandas_df[["float64_col", "int64_col"]], 5.1
)
bf_result = op(scalars_df[["float64_col", "int64_col"]], 5.1).to_pandas()
pd_result = op(scalars_pandas_df[["float64_col", "int64_col"]], 5.1)

pd.testing.assert_frame_equal(bf_result, pd_result)


# Operations tested here don't work on full dataframe in numpy+pandas
# Maybe because of nullable dtypes?
@pytest.mark.parametrize(
("x", "y"),
[
("int64_col", "int64_col"),
("float64_col", "int64_col"),
],
)
def test_series_atan2(scalars_dfs, x, y):
# Test atan2 separately as pandas errors when passing entire df as input, so pass only series
@pytest.mark.parametrize(
("opname",),
[
("add",),
("subtract",),
("multiply",),
("divide",),
("arctan2",),
("minimum",),
("maximum",),
],
)
def test_series_binary_ufuncs(scalars_dfs, x, y, opname):
scalars_df, scalars_pandas_df = scalars_dfs

bf_result = np.arctan2(scalars_df[x], scalars_df[y]).to_pandas()
pd_result = np.arctan2(scalars_pandas_df[x], scalars_pandas_df[y])
op = getattr(np, opname)

bf_result = op(scalars_df[x], scalars_df[y]).to_pandas()
pd_result = op(scalars_pandas_df[x], scalars_pandas_df[y])

pd.testing.assert_series_equal(bf_result, pd_result)

Expand Down
35 changes: 35 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -3509,6 +3509,41 @@ def test_apply_numpy_ufunc(scalars_dfs, ufunc):
assert_series_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("ufunc",),
[
pytest.param(numpy.add),
pytest.param(numpy.divide),
],
ids=[
"add",
"divide",
],
)
def test_combine_series_ufunc(scalars_dfs, ufunc):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df["int64_col"].dropna()
bf_result = bf_col.combine(bf_col, ufunc).to_pandas()

pd_col = scalars_pandas_df["int64_col"].dropna()
pd_result = pd_col.combine(pd_col, ufunc)

assert_series_equal(bf_result, pd_result, check_dtype=False)


def test_combine_scalar_ufunc(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df["int64_col"].dropna()
bf_result = bf_col.combine(2.5, numpy.add).to_pandas()

pd_col = scalars_pandas_df["int64_col"].dropna()
pd_result = pd_col.combine(2.5, numpy.add)

assert_series_equal(bf_result, pd_result, check_dtype=False)


def test_apply_simple_udf(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

Expand Down
56 changes: 56 additions & 0 deletions third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,62 @@ def apply(
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def combine(
self,
other: Series | Hashable,
func,
) -> Series:
"""
Combine the Series with a Series or scalar according to `func`.

Combine the Series and `other` using `func` to perform elementwise
selection for combined Series.
`fill_value` is assumed when value is missing at some index
from one of the two objects being combined.

**Examples:**

>>> import bigframes.pandas as bpd
>>> import numpy as np
>>> bpd.options.display.progress_bar = None

Consider 2 Datasets ``s1`` and ``s2`` containing
highest clocked speeds of different birds.

>>> s1 = bpd.Series({'falcon': 330.0, 'eagle': 160.0})
>>> s1
falcon 330.0
eagle 160.0
dtype: Float64
>>> s2 = bpd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
>>> s2
falcon 345.0
eagle 200.0
duck 30.0
dtype: Float64

Now, to combine the two datasets and view the highest speeds
of the birds across the two datasets

>>> s1.combine(s2, np.maximum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of the outputs are different as pandas. How we decide the ordering of the output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pandas will check the index of both sides, if they are not exactly the same, it will sort the index. If we wanted to copy this behavior, we would need to download the index from both sides of the operation, which I don't think is worth it. Users can always call sort_index() after combining if needed.

falcon 345.0
eagle 200.0
duck <NA>
dtype: Float64

Args:
other (Series or scalar):
The value(s) to be combined with the `Series`.
func (function):
BigFrames DataFrames ``remote_function`` to apply.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the np.maxinum a remote function in the example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a numpy ufunc. Having remote functions in the doctests has been problematic so far.

Takes two scalars as inputs and returns an element.
Also accepts some numpy binary functions.

Returns:
Series: The result of combining the Series with the other object.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def groupby(
self,
by=None,
Expand Down