Skip to content

Commit 66fd69c

Browse files
kszucscpcloud
authored andcommitted
refactor(analysis): always merge frames during windowization
1 parent e12ce8d commit 66fd69c

File tree

11 files changed

+61
-21
lines changed

11 files changed

+61
-21
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
SELECT max(t0.`f`) OVER (ORDER BY t0.`d` ASC) AS `foo`
1+
SELECT max(t0.`f`) OVER (ORDER BY t0.`d` ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `foo`
22
FROM `alltypes` t0
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
SELECT avg(t0.`f`) OVER (ORDER BY t0.`d` ASC) AS `foo`
1+
SELECT avg(t0.`f`) OVER (ORDER BY t0.`d` ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `foo`
22
FROM `alltypes` t0
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
SELECT min(t0.`f`) OVER (ORDER BY t0.`d` ASC) AS `foo`
1+
SELECT min(t0.`f`) OVER (ORDER BY t0.`d` ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `foo`
22
FROM `alltypes` t0
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
SELECT sum(t0.`f`) OVER (ORDER BY t0.`d` ASC) AS `foo`
1+
SELECT sum(t0.`f`) OVER (ORDER BY t0.`d` ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `foo`
22
FROM `alltypes` t0

ibis/backends/tests/test_window.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ def calc_zscore(s):
207207
),
208208
pytest.mark.broken(
209209
["flink"],
210-
raises=Py4JJavaError,
211-
reason="CalciteContextException: Argument to function 'NTILE' must be a literal",
210+
raises=com.UnsupportedOperationError,
211+
reason="Windows in Flink can only be ordered by a single time column",
212212
),
213213
],
214214
),
@@ -1250,6 +1250,21 @@ def test_range_expression_bounds(backend):
12501250
assert len(result) == con.execute(t.count())
12511251

12521252

1253+
@pytest.mark.notimpl(["polars", "dask"], raises=com.OperationNotDefinedError)
1254+
@pytest.mark.notyet(
1255+
["clickhouse"],
1256+
reason="clickhouse doesn't implement percent_rank",
1257+
raises=com.OperationNotDefinedError,
1258+
)
1259+
@pytest.mark.broken(
1260+
["pandas"], reason="missing column during execution", raises=KeyError
1261+
)
1262+
@pytest.mark.broken(
1263+
["mssql"], reason="lack of support for booleans", raises=sa.exc.OperationalError
1264+
)
1265+
@pytest.mark.broken(
1266+
["pyspark"], reason="pyspark requires CURRENT ROW", raises=AnalysisException
1267+
)
12531268
def test_rank_followed_by_over_call_merge_frames(backend, alltypes, df):
12541269
# GH #7631
12551270
t = alltypes

ibis/expr/analysis.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import ibis.expr.types as ir
1212
from ibis import util
1313
from ibis.common.deferred import deferred, var
14-
from ibis.common.exceptions import IbisTypeError, IntegrityError
14+
from ibis.common.exceptions import ExpressionError, IbisTypeError, IntegrityError
1515
from ibis.common.patterns import Eq, In, pattern, replace
1616
from ibis.util import Namespace
1717

@@ -170,17 +170,30 @@ def wrap_analytic(_, default_frame):
170170

171171
@replace(p.WindowFunction)
172172
def merge_windows(_, default_frame):
173+
if _.frame.start and default_frame.start and _.frame.start != default_frame.start:
174+
raise ExpressionError(
175+
"Unable to merge windows with conflicting `start` boundary"
176+
)
177+
if _.frame.end and default_frame.end and _.frame.end != default_frame.end:
178+
raise ExpressionError("Unable to merge windows with conflicting `end` boundary")
179+
180+
start = _.frame.start or default_frame.start
181+
end = _.frame.end or default_frame.end
173182
group_by = tuple(toolz.unique(_.frame.group_by + default_frame.group_by))
174-
order_by = tuple(toolz.unique(_.frame.order_by + default_frame.order_by))
175-
frame = _.frame.copy(group_by=group_by, order_by=order_by)
183+
184+
order_by = {}
185+
for sort_key in _.frame.order_by + default_frame.order_by:
186+
order_by[sort_key.expr] = sort_key.ascending
187+
order_by = tuple(ops.SortKey(k, v) for k, v in order_by.items())
188+
189+
frame = _.frame.copy(start=start, end=end, group_by=group_by, order_by=order_by)
176190
return ops.WindowFunction(_.func, frame)
177191

178192

179-
def windowize_function(expr, default_frame, merge_frames=False):
193+
def windowize_function(expr, default_frame):
180194
ctx = {"default_frame": default_frame}
181195
node = expr.op()
182-
if merge_frames:
183-
node = node.replace(merge_windows, filter=p.Value, context=ctx)
196+
node = node.replace(merge_windows, filter=p.Value, context=ctx)
184197
node = node.replace(wrap_analytic, filter=p.Value & ~p.WindowFunction, context=ctx)
185198
return node.to_expr()
186199

ibis/expr/types/core.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,8 @@ def to_torch(
589589

590590
def unbind(self) -> ir.Table:
591591
"""Return an expression built on `UnboundTable` instead of backend-specific objects."""
592-
from ibis.expr.analysis import p, c, _
592+
from ibis.expr.analysis import p, c
593+
from ibis.common.deferred import _
593594

594595
rule = p.DatabaseTable >> c.UnboundTable(name=_.name, schema=_.schema)
595596
return self.op().replace(rule).to_expr()

ibis/expr/types/generic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ def over(
758758

759759
def bind(table):
760760
frame = window.bind(table)
761-
expr = an.windowize_function(self, frame, merge_frames=True)
761+
expr = an.windowize_function(self, frame)
762762
if expr.equals(self):
763763
raise com.IbisTypeError(
764764
"No reduction or analytic function found to construct a window expression"

ibis/expr/types/groupby.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,12 @@ def _selectables(self, *exprs, **kwexprs):
254254
order_by=bind_expr(self.table, self._order_by),
255255
)
256256
return [
257-
an.windowize_function(e2, default_frame, merge_frames=True)
257+
an.windowize_function(e2, default_frame)
258258
for expr in exprs
259259
for e1 in util.promote_list(expr)
260260
for e2 in util.promote_list(table._ensure_expr(e1))
261261
] + [
262-
an.windowize_function(e, default_frame, merge_frames=True).name(k)
262+
an.windowize_function(e, default_frame).name(k)
263263
for k, expr in kwexprs.items()
264264
for e in util.promote_list(table._ensure_expr(expr))
265265
]

ibis/expr/types/relations.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4338,7 +4338,8 @@ def _resolve_predicates(
43384338
table: Table, predicates
43394339
) -> tuple[list[ir.BooleanValue], list[tuple[ir.BooleanValue, ir.Table]]]:
43404340
import ibis.expr.types as ir
4341-
from ibis.expr.analysis import _, flatten_predicate, p
4341+
from ibis.common.deferred import _
4342+
from ibis.expr.analysis import flatten_predicate, p
43424343

43434344
# TODO(kszucs): clean this up, too much flattening and resolving happens here
43444345
predicates = [

ibis/tests/expr/test_window_functions.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
from __future__ import annotations
22

3+
import pytest
4+
35
import ibis
46
import ibis.expr.operations as ops
7+
from ibis.common.exceptions import ExpressionError
58

69

710
def test_mutate_with_analytic_functions(alltypes):
@@ -48,15 +51,22 @@ def test_value_over_api(alltypes):
4851
w1 = ibis.window(rows=(0, 1), group_by=t.g, order_by=[t.f, t.h])
4952
w2 = ibis.window(range=(-1, 1), group_by=[t.g, t.a], order_by=[t.f])
5053

51-
expr = t.f.cumsum().over(rows=(0, 1), group_by=t.g, order_by=[t.f, t.h])
52-
expected = t.f.cumsum().over(w1)
54+
expr = t.f.sum().over(rows=(0, 1), group_by=t.g, order_by=[t.f, t.h])
55+
expected = t.f.sum().over(w1)
5356
assert expr.equals(expected)
5457

55-
expr = t.f.cumsum().over(range=(-1, 1), group_by=[t.g, t.a], order_by=[t.f])
56-
expected = t.f.cumsum().over(w2)
58+
expr = t.f.sum().over(range=(-1, 1), group_by=[t.g, t.a], order_by=[t.f])
59+
expected = t.f.sum().over(w2)
5760
assert expr.equals(expected)
5861

5962

63+
def test_conflicting_window_boundaries(alltypes):
64+
t = alltypes
65+
66+
with pytest.raises(ExpressionError, match="Unable to merge windows"):
67+
t.f.cumsum().over(rows=(0, 1))
68+
69+
6070
def test_rank_followed_by_over_call_merge_frames(alltypes):
6171
t = alltypes
6272
expr1 = t.f.percent_rank().over(ibis.window(group_by=t.f.notnull()))

0 commit comments

Comments
 (0)