Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support time range rolling on Series. #1590

Merged
merged 21 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def project_window_op(
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
"""
# TODO: Support non-deterministic windowing
if window_spec.row_bounded or not op.order_independent:
if window_spec.is_row_bounded or not op.order_independent:
if self.node.order_ambiguous and not self.session._strictly_ordered:
if not self.session._allows_ambiguity:
raise ValueError(
Expand Down
67 changes: 56 additions & 11 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from google.cloud import bigquery
import pyarrow as pa

from bigframes.core import utils
import bigframes.core.compile.aggregate_compiler as agg_compiler
import bigframes.core.compile.googlesql
import bigframes.core.compile.ibis_types
Expand Down Expand Up @@ -231,7 +232,7 @@ def aggregate(
col_out: agg_compiler.compile_aggregate(
aggregate,
bindings,
order_by=_convert_ordering_to_table_values(table, order_by),
order_by=_convert_row_ordering_to_table_values(table, order_by),
)
for aggregate, col_out in aggregations
}
Expand Down Expand Up @@ -439,7 +440,7 @@ def project_window_op(
never_skip_nulls=never_skip_nulls,
)

if expression.op.order_independent and not window_spec.row_bounded:
if expression.op.order_independent and window_spec.is_unbounded:
# notably percentile_cont does not support ordering clause
window_spec = window_spec.without_order()
window = self._ibis_window_from_spec(window_spec)
Expand Down Expand Up @@ -517,16 +518,30 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec):
# 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed
# 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed
# 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties.
if window_spec.ordering:
order_by = _convert_ordering_to_table_values(
if window_spec.is_row_bounded:
if not window_spec.ordering:
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
raise ValueError("No ordering provided for ordered analytic function")
order_by = _convert_row_ordering_to_table_values(
self._column_names,
window_spec.ordering,
)
elif window_spec.row_bounded:
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
raise ValueError("No ordering provided for ordered analytic function")
else:

elif window_spec.is_range_bounded:
order_by = [
_convert_range_ordering_to_table_value(
self._column_names,
window_spec.ordering[0],
)
]
# The rest if branches are for unbounded windows
elif window_spec.ordering:
# Unbound grouping window. Suitable for aggregations but not for analytic function application.
order_by = _convert_row_ordering_to_table_values(
self._column_names,
window_spec.ordering,
)
else:
order_by = None

window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by)
Expand All @@ -551,7 +566,7 @@ def is_window(column: ibis_types.Value) -> bool:
return any(isinstance(op, ibis_ops.WindowFunction) for op in matches)


def _convert_ordering_to_table_values(
def _convert_row_ordering_to_table_values(
value_lookup: typing.Mapping[str, ibis_types.Value],
ordering_columns: typing.Sequence[OrderingExpression],
) -> typing.Sequence[ibis_types.Value]:
Expand Down Expand Up @@ -579,6 +594,30 @@ def _convert_ordering_to_table_values(
return ordering_values


def _convert_range_ordering_to_table_value(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what makes this one different? is it that it doesn't allow NULLS FIRST/LAST overrides and only allows one column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. If we have to deal with NULLs, there will be multiple expressions after "ORDER BY", but it is not allowed by SQL window syntax.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc that explains the reasons for future references.

value_lookup: typing.Mapping[str, ibis_types.Value],
ordering_column: OrderingExpression,
) -> ibis_types.Value:
"""Converts the ordering for range windows to Ibis references.

Note that this method is different from `_convert_row_ordering_to_table_values` in
that it does not arrange null values. There are two reasons:
1. Manipulating null positions requires more than one ordering key, which is forbidden
by SQL window syntax for range rolling.
2. Pandas does not allow range rolling on timeseries with nulls.

Therefore, we opt for the simplest approach here: generate the simplest SQL and follow
the BigQuery engine behavior.
"""
expr = op_compiler.compile_expression(
ordering_column.scalar_expression, value_lookup
)

if ordering_column.direction.is_ascending:
return bigframes_vendored.ibis.asc(expr) # type: ignore
return bigframes_vendored.ibis.desc(expr) # type: ignore


def _string_cast_join_cond(
lvalue: ibis_types.Column, rvalue: ibis_types.Column
) -> ibis_types.BooleanColumn:
Expand Down Expand Up @@ -668,8 +707,14 @@ def _add_boundary(
) -> ibis_expr_builders.LegacyWindowBuilder:
if isinstance(bounds, RangeWindowBounds):
return ibis_window.range(
start=_to_ibis_boundary(bounds.start),
end=_to_ibis_boundary(bounds.end),
start=_to_ibis_boundary(
None
if bounds.start is None
else utils.timedelta_to_micros(bounds.start)
),
end=_to_ibis_boundary(
None if bounds.end is None else utils.timedelta_to_micros(bounds.end)
),
)
if isinstance(bounds, RowsWindowBounds):
if bounds.start is not None or bounds.end is not None:
Expand Down
1 change: 1 addition & 0 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def _replace_unsupported_ops(node: nodes.BigFrameNode):
# TODO: Run all replacement rules as single bottom-up pass
node = nodes.bottom_up(node, rewrites.rewrite_slice)
node = nodes.bottom_up(node, rewrites.rewrite_timedelta_expressions)
node = nodes.bottom_up(node, rewrites.rewrite_range_rolling)
return node


Expand Down
7 changes: 3 additions & 4 deletions bigframes/core/compile/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import dataclasses
import functools
import itertools
from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING, Union
from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING

import bigframes.core
from bigframes.core import window_spec
Expand Down Expand Up @@ -359,6 +359,7 @@ def compile_window(self, node: nodes.WindowOpNode):
return df.with_columns([agg_expr])

else: # row-bounded window
assert isinstance(window.bounds, window_spec.RowsWindowBounds)
# Polars API semi-bounded, and any grouped rolling window challenging
# https://github.com/pola-rs/polars/issues/4799
# https://github.com/pola-rs/polars/issues/8976
Expand All @@ -382,9 +383,7 @@ def compile_window(self, node: nodes.WindowOpNode):
return pl.concat([df, results], how="horizontal")


def _get_period(
bounds: Union[window_spec.RowsWindowBounds, window_spec.RangeWindowBounds]
) -> Optional[int]:
def _get_period(bounds: window_spec.RowsWindowBounds) -> Optional[int]:
"""Returns None if the boundary is infinite."""
if bounds.start is None or bounds.end is None:
return None
Expand Down
6 changes: 4 additions & 2 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ def _validate(self):
"""Validate the local data in the node."""
# Since inner order and row bounds are coupled, rank ops can't be row bounded
assert (
not self.window_spec.row_bounded
not self.window_spec.is_row_bounded
) or self.expression.op.implicitly_inherits_order
assert all(ref in self.child.ids for ref in self.expression.column_references)

Expand Down Expand Up @@ -1415,7 +1415,9 @@ def inherits_order(self) -> bool:
op_inherits_order = (
not self.expression.op.order_independent
) and self.expression.op.implicitly_inherits_order
return op_inherits_order or self.window_spec.row_bounded
# range-bounded windows do not inherit orders because their ordering are
# already defined before rewrite time.
return op_inherits_order or self.window_spec.is_row_bounded

@property
def additive_base(self) -> BigFrameNode:
Expand Down
2 changes: 2 additions & 0 deletions bigframes/core/rewrite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from bigframes.core.rewrite.pruning import column_pruning
from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice
from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions
from bigframes.core.rewrite.windows import rewrite_range_rolling

__all__ = [
"legacy_join_as_projection",
Expand All @@ -29,4 +30,5 @@
"remap_variables",
"pull_up_order",
"column_pruning",
"rewrite_range_rolling",
]
45 changes: 45 additions & 0 deletions bigframes/core/rewrite/windows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import dataclasses

from bigframes import operations as ops
from bigframes.core import nodes


def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I always propose compile-time rewrites, but I think validations need to happen earlier, in the API. The tree should always be valid, rewrites just transform from one valid tree to another.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the length checking for ordering?

I will check the rolling window key on the API level too. Here this is simply to make sure we fail fast if some other error sneaks in.

if not isinstance(node, nodes.WindowOpNode):
return node

if not node.window_spec.is_range_bounded:
return node

if len(node.window_spec.ordering) != 1:
raise ValueError(
"Range rolling should only be performed on exactly one column."
)

ordering_expr = node.window_spec.ordering[0]

new_ordering = dataclasses.replace(
ordering_expr,
scalar_expression=ops.UnixMicros().as_expr(ordering_expr.scalar_expression),
)

return dataclasses.replace(
node,
window_spec=dataclasses.replace(node.window_spec, ordering=(new_ordering,)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be consistent to redefine the window spec bounds to integers rather than timestamp to be consistent with the underlying value now being an integer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I still prefer the timedeltas because it describes best what the range windows are for

)
81 changes: 81 additions & 0 deletions bigframes/core/window/ordering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from functools import singledispatch

from bigframes.core import expression as ex
from bigframes.core import nodes, ordering


@singledispatch
def find_order_direction(
root: nodes.BigFrameNode, column_id: str
) -> ordering.OrderingDirection | None:
"""Returns the order of the given column with tree traversal. If the column cannot be found,
or the ordering information is not available, return None.
"""
return None


@find_order_direction.register
def _(root: nodes.OrderByNode, column_id: str):
if len(root.by) == 0:
# This is a no-op
return find_order_direction(root.child, column_id)

# Make sure the window key is the prefix of sorting keys.
order_expr = root.by[0]
scalar_expr = order_expr.scalar_expression
if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id:
return order_expr.direction

return None


@find_order_direction.register
def _(root: nodes.ReversedNode, column_id: str):
direction = find_order_direction(root.child, column_id)

if direction is None:
return None
return direction.reverse()


@find_order_direction.register
def _(root: nodes.SelectionNode, column_id: str):
for alias_ref in root.input_output_pairs:
if alias_ref.id.name == column_id:
return find_order_direction(root.child, alias_ref.ref.id.name)


@find_order_direction.register
def _(root: nodes.FilterNode, column_id: str):
return find_order_direction(root.child, column_id)


@find_order_direction.register
def _(root: nodes.InNode, column_id: str):
return find_order_direction(root.left_child, column_id)


@find_order_direction.register
def _(root: nodes.WindowOpNode, column_id: str):
return find_order_direction(root.child, column_id)


@find_order_direction.register
def _(root: nodes.ProjectionNode, column_id: str):
return find_order_direction(root.child, column_id)
43 changes: 42 additions & 1 deletion bigframes/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

from __future__ import annotations

import datetime
import typing

import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling
import numpy
import pandas

from bigframes.core import log_adapter, window_spec
from bigframes import dtypes
from bigframes.core import expression as ex
from bigframes.core import log_adapter, ordering, window_spec
import bigframes.core.blocks as blocks
from bigframes.core.window import ordering as window_ordering
import bigframes.operations.aggregations as agg_ops


Expand Down Expand Up @@ -118,3 +124,38 @@ def _aggregate_block(

labels = [self._block.col_id_to_label[col] for col in agg_col_ids]
return block.select_columns(result_ids).with_column_labels(labels)


def create_range_window(
block: blocks.Block,
window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
min_periods: int | None,
closed: typing.Literal["right", "left", "both", "neither"],
is_series: bool,
) -> Window:

index_dtypes = block.index.dtypes
if len(index_dtypes) > 1:
raise ValueError("Range rolling on MultiIndex is not supported")
if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE:
raise ValueError("Index type should be timestamps with timezones")

order_direction = window_ordering.find_order_direction(
block.expr.node, block.index_columns[0]
)
if order_direction is None:
raise ValueError(
"The index might not be in a monotonic order. Please sort the index before rolling."
)
if isinstance(window, str):
window = pandas.Timedelta(window)
spec = window_spec.WindowSpec(
bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed),
min_periods=1 if min_periods is None else min_periods,
ordering=(
ordering.OrderingExpression(
ex.deref(block.index_columns[0]), order_direction
),
),
)
Comment on lines +152 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like this where we are picking the ordering column immediately and validating

return Window(block, spec, block.value_columns, is_series=is_series)
Loading