Skip to content

feat: add interpolate() to series and dataframe #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,97 @@ def indicate_duplicates(
)


def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
if method != "linear":
raise NotImplementedError(
f"Only 'linear' interpolate method supported. {constants.FEEDBACK_LINK}"
)
backwards_window = windows.WindowSpec(following=0)
forwards_window = windows.WindowSpec(preceding=0)

output_column_ids = []

original_columns = block.value_columns
original_labels = block.column_labels
block, offsets = block.promote_offsets()
for column in original_columns:
# null in same places column is null
should_interpolate = block._column_type(column) in [
pd.Float64Dtype(),
pd.Int64Dtype(),
]
if should_interpolate:
block, notnull = block.apply_unary_op(column, ops.notnull_op)
block, masked_offsets = block.apply_binary_op(
offsets, notnull, ops.partial_arg3(ops.where_op, None)
)

block, previous_value = block.apply_window_op(
column, agg_ops.LastNonNullOp(), backwards_window
)
block, next_value = block.apply_window_op(
column, agg_ops.FirstNonNullOp(), forwards_window
)
block, previous_value_offset = block.apply_window_op(
masked_offsets,
agg_ops.LastNonNullOp(),
backwards_window,
skip_reproject_unsafe=True,
)
block, next_value_offset = block.apply_window_op(
masked_offsets,
agg_ops.FirstNonNullOp(),
forwards_window,
skip_reproject_unsafe=True,
)

block, prediction_id = _interpolate(
block,
previous_value_offset,
previous_value,
next_value_offset,
next_value,
offsets,
)

block, interpolated_column = block.apply_binary_op(
column, prediction_id, ops.fillna_op
)
# Pandas performs ffill-like behavior to extrapolate forwards
block, interpolated_and_ffilled = block.apply_binary_op(
interpolated_column, previous_value, ops.fillna_op
)

output_column_ids.append(interpolated_and_ffilled)
else:
output_column_ids.append(column)

# Force reproject since used `skip_project_unsafe` perviously
block = block.select_columns(output_column_ids)._force_reproject()
return block.with_column_labels(original_labels)


def _interpolate(
block: blocks.Block,
x0_id: str,
y0_id: str,
x1_id: str,
y1_id: str,
xpredict_id: str,
) -> typing.Tuple[blocks.Block, str]:
"""Applies linear interpolation equation to predict y values for xpredict."""
block, x1x0diff = block.apply_binary_op(x1_id, x0_id, ops.sub_op)
block, y1y0diff = block.apply_binary_op(y1_id, y0_id, ops.sub_op)
block, xpredictx0diff = block.apply_binary_op(xpredict_id, x0_id, ops.sub_op)

block, y1_weight = block.apply_binary_op(y1y0diff, x1x0diff, ops.div_op)
block, y1_part = block.apply_binary_op(xpredictx0diff, y1_weight, ops.mul_op)

block, prediction_id = block.apply_binary_op(y0_id, y1_part, ops.add_op)
block = block.drop_columns([x1x0diff, y1y0diff, xpredictx0diff, y1_weight, y1_part])
return block, prediction_id


def drop_duplicates(
block: blocks.Block, columns: typing.Sequence[str], keep: str = "first"
) -> blocks.Block:
Expand Down
4 changes: 4 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,10 @@ def _reindex_columns(self, columns):
def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None):
return self.reindex(index=other.index, columns=other.columns, validate=validate)

def interpolate(self, method: str = "linear") -> DataFrame:
result = block_ops.interpolate(self._block, method)
return DataFrame(result)

def fillna(self, value=None) -> DataFrame:
return self._apply_binop(value, ops.fillna_op, how="left")

Expand Down
4 changes: 4 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ def replace(
)
return Series(block.select_column(result_col))

def interpolate(self, method: str = "linear") -> Series:
result = block_ops.interpolate(self._block, method)
return Series(result)

def dropna(
self,
*,
Expand Down
16 changes: 16 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,22 @@ def test_df_dropna(scalars_dfs, axis, how, ignore_index):
pandas.testing.assert_frame_equal(bf_result, pd_result)


def test_df_interpolate(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
columns = ["int64_col", "int64_too", "float64_col"]
bf_result = scalars_df[columns].interpolate().to_pandas()
# Pandas can only interpolate on "float64" columns
# https://github.com/pandas-dev/pandas/issues/40252
pd_result = scalars_pandas_df[columns].astype("float64").interpolate()

pandas.testing.assert_frame_equal(
bf_result,
pd_result,
check_index_type=False,
check_dtype=False,
)


def test_df_fillna(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
df = scalars_df[["int64_col", "float64_col"]].fillna(3)
Expand Down
26 changes: 26 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,32 @@ def test_series_replace_list_scalar(scalars_dfs):
)


@pytest.mark.parametrize(
("values",),
(
([None, 1, 2, None, None, 16, None],),
([None, None, 3.6, None],),
([403.2, None, 352.1, None, None, 111.9],),
),
)
def test_series_interpolate(values):
pd_series = pd.Series(values)
bf_series = series.Series(pd_series)

# Pandas can only interpolate on "float64" columns
# https://github.com/pandas-dev/pandas/issues/40252
pd_result = pd_series.astype("float64").interpolate()
bf_result = bf_series.interpolate().to_pandas()

# pd uses non-null types, while bf uses nullable types
pd.testing.assert_series_equal(
pd_result,
bf_result,
check_index_type=False,
check_dtype=False,
)


@pytest.mark.parametrize(
("ignore_index",),
(
Expand Down
37 changes: 37 additions & 0 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,43 @@ def value_counts(
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def interpolate(self, method: str = "linear"):
"""
Fill NaN values using an interpolation method.

Args:
method (str, default 'linear'):
Interpolation technique to use. Only 'linear' supported.
'linear': Ignore the index and treat the values as equally spaced.
This is the only method supported on MultiIndexes.
Copy link
Contributor

@shobsi shobsi Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ("This is the only method supported on MultiIndexes.") probably can be removed


Returns:
DataFrame:
Returns the same object type as the caller, interpolated at
some or all ``NaN`` values

**Examples:**

>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> df = bpd.DataFrame({
... 'A': [1, 2, 3, None, None, 6],
... 'B': [None, 6, None, 2, None, 3],
... })
>>> df.interpolate()
A B
0 1.0 <NA>
1 2.0 6.0
2 3.0 4.0
3 4.0 2.0
4 5.0 2.5
5 6.0 3.0
<BLANKLINE>
[6 rows x 2 columns]
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def fillna(self, value):
"""
Fill NA/NaN values using the specified method.
Expand Down
32 changes: 32 additions & 0 deletions third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,38 @@ def droplevel(self, level, axis):
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def interpolate(self, method: str = "linear"):
"""
Fill NaN values using an interpolation method.

Args:
method (str, default 'linear'):
Interpolation technique to use. Only 'linear' supported.
'linear': Ignore the index and treat the values as equally spaced.
This is the only method supported on MultiIndexes.

Returns:
Series:
Returns the same object type as the caller, interpolated at
some or all ``NaN`` values

**Examples:**

>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> series = bpd.Series([1, 2, 3, None, None, 6])
>>> series.interpolate()
0 1.0
1 2.0
2 3.0
3 4.0
4 5.0
5 6.0
dtype: Float64
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def fillna(
self,
value=None,
Expand Down