-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support time range rolling on Series. #1590
Changes from all commits
da0372e
87884ba
d253ca7
e873c07
87f265b
6222c5e
850ea41
db6a353
1928992
6967e24
91af2c5
f5113c3
ad53e3d
a242824
2e86a47
9a835cb
ea59217
91d349e
0bdaf17
cd6a666
4b4c379
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# Copyright 2025 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import annotations | ||
|
||
import dataclasses | ||
|
||
from bigframes import operations as ops | ||
from bigframes.core import nodes | ||
|
||
|
||
def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know I always propose compile-time rewrites, but I think validations need to happen earlier, in the API. The tree should always be valid, rewrites just transform from one valid tree to another. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you talking about the length checking for I will check the rolling window key on the API level too. Here this is simply to make sure we fail fast if some other error sneaks in. |
||
if not isinstance(node, nodes.WindowOpNode): | ||
return node | ||
|
||
if not node.window_spec.is_range_bounded: | ||
return node | ||
|
||
if len(node.window_spec.ordering) != 1: | ||
raise ValueError( | ||
"Range rolling should only be performed on exactly one column." | ||
) | ||
|
||
ordering_expr = node.window_spec.ordering[0] | ||
|
||
new_ordering = dataclasses.replace( | ||
ordering_expr, | ||
scalar_expression=ops.UnixMicros().as_expr(ordering_expr.scalar_expression), | ||
) | ||
|
||
return dataclasses.replace( | ||
node, | ||
window_spec=dataclasses.replace(node.window_spec, ordering=(new_ordering,)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be consistent to redefine the window spec bounds to integers rather than timestamp to be consistent with the underlying value now being an integer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I still prefer the timedeltas because it describes best what the range windows are for |
||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# Copyright 2025 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from __future__ import annotations | ||
|
||
from functools import singledispatch | ||
|
||
from bigframes.core import expression as ex | ||
from bigframes.core import nodes, ordering | ||
|
||
|
||
@singledispatch | ||
def find_order_direction( | ||
root: nodes.BigFrameNode, column_id: str | ||
) -> ordering.OrderingDirection | None: | ||
"""Returns the order of the given column with tree traversal. If the column cannot be found, | ||
or the ordering information is not available, return None. | ||
""" | ||
return None | ||
|
||
|
||
@find_order_direction.register | ||
def _(root: nodes.OrderByNode, column_id: str): | ||
if len(root.by) == 0: | ||
# This is a no-op | ||
return find_order_direction(root.child, column_id) | ||
|
||
# Make sure the window key is the prefix of sorting keys. | ||
order_expr = root.by[0] | ||
scalar_expr = order_expr.scalar_expression | ||
if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: | ||
return order_expr.direction | ||
|
||
return None | ||
|
||
|
||
@find_order_direction.register | ||
def _(root: nodes.ReversedNode, column_id: str): | ||
direction = find_order_direction(root.child, column_id) | ||
|
||
if direction is None: | ||
return None | ||
return direction.reverse() | ||
|
||
|
||
@find_order_direction.register | ||
def _(root: nodes.SelectionNode, column_id: str): | ||
for alias_ref in root.input_output_pairs: | ||
if alias_ref.id.name == column_id: | ||
return find_order_direction(root.child, alias_ref.ref.id.name) | ||
|
||
|
||
@find_order_direction.register | ||
def _(root: nodes.FilterNode, column_id: str): | ||
return find_order_direction(root.child, column_id) | ||
|
||
|
||
@find_order_direction.register | ||
def _(root: nodes.InNode, column_id: str): | ||
return find_order_direction(root.left_child, column_id) | ||
|
||
|
||
@find_order_direction.register | ||
def _(root: nodes.WindowOpNode, column_id: str): | ||
return find_order_direction(root.child, column_id) | ||
|
||
|
||
@find_order_direction.register | ||
def _(root: nodes.ProjectionNode, column_id: str): | ||
return find_order_direction(root.child, column_id) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,12 +14,18 @@ | |
|
||
from __future__ import annotations | ||
|
||
import datetime | ||
import typing | ||
|
||
import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling | ||
import numpy | ||
import pandas | ||
|
||
from bigframes.core import log_adapter, window_spec | ||
from bigframes import dtypes | ||
from bigframes.core import expression as ex | ||
from bigframes.core import log_adapter, ordering, window_spec | ||
import bigframes.core.blocks as blocks | ||
from bigframes.core.window import ordering as window_ordering | ||
import bigframes.operations.aggregations as agg_ops | ||
|
||
|
||
|
@@ -118,3 +124,38 @@ def _aggregate_block( | |
|
||
labels = [self._block.col_id_to_label[col] for col in agg_col_ids] | ||
return block.select_columns(result_ids).with_column_labels(labels) | ||
|
||
|
||
def create_range_window( | ||
block: blocks.Block, | ||
window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, | ||
min_periods: int | None, | ||
closed: typing.Literal["right", "left", "both", "neither"], | ||
is_series: bool, | ||
) -> Window: | ||
|
||
index_dtypes = block.index.dtypes | ||
if len(index_dtypes) > 1: | ||
raise ValueError("Range rolling on MultiIndex is not supported") | ||
if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE: | ||
raise ValueError("Index type should be timestamps with timezones") | ||
|
||
order_direction = window_ordering.find_order_direction( | ||
block.expr.node, block.index_columns[0] | ||
) | ||
if order_direction is None: | ||
raise ValueError( | ||
"The index might not be in a monotonic order. Please sort the index before rolling." | ||
) | ||
if isinstance(window, str): | ||
window = pandas.Timedelta(window) | ||
spec = window_spec.WindowSpec( | ||
bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed), | ||
min_periods=1 if min_periods is None else min_periods, | ||
ordering=( | ||
ordering.OrderingExpression( | ||
ex.deref(block.index_columns[0]), order_direction | ||
), | ||
), | ||
) | ||
Comment on lines
+152
to
+160
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do like this where we are picking the ordering column immediately and validating |
||
return Window(block, spec, block.value_columns, is_series=is_series) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what makes this one different? is it that it doesn't allow NULLS FIRST/LAST overrides and only allows one column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. If we have to deal with NULLs, there will be multiple expressions after "ORDER BY", but it is not allowed by SQL window syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added doc that explains the reasons for future references.