Skip to content

Commit 256767f

Browse files
chloeh13qcpcloud
authored andcommitted
feat(flink): implement windowed computations
1 parent 751cfcf commit 256767f

File tree

8 files changed

+43
-20
lines changed

8 files changed

+43
-20
lines changed

ibis/backends/flink/registry.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,6 @@ def _format_window_frame(translator: ExprTranslator, func, frame):
156156
components.append(f"PARTITION BY {partition_args}")
157157

158158
(order_by,) = frame.order_by
159-
if order_by.descending is True:
160-
raise com.UnsupportedOperationError(
161-
"Flink only supports windows ordered in ASCENDING mode"
162-
)
163159
components.append(f"ORDER BY {translator.translate(order_by)}")
164160

165161
if frame.start is None and frame.end is None:
@@ -221,8 +217,7 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str:
221217

222218
if isinstance(func, (ops.RankBase, ops.NTile)):
223219
return f"({result} - 1)"
224-
else:
225-
return result
220+
return result
226221

227222

228223
def _clip(translator: ExprTranslator, op: ops.Node) -> str:
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT t0.`window_start`, t0.`window_end`, t0.`g`, avg(t0.`d`) AS `mean`
2+
FROM TABLE(TUMBLE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '15' MINUTE)) t0
3+
GROUP BY t0.`window_start`, t0.`window_end`, t0.`g`
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
WITH t0 AS (
2+
SELECT t2.`a`, t2.`b`, t2.`c`, t2.`d`, t2.`g`, t2.`window_start`,
3+
t2.`window_end`
4+
FROM TABLE(TUMBLE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '10' MINUTE)) t2
5+
)
6+
SELECT t1.*
7+
FROM (
8+
SELECT t0.*,
9+
(row_number() OVER (PARTITION BY t0.`window_start`, t0.`window_end` ORDER BY t0.`g` DESC) - 1) AS `rownum`
10+
FROM t0
11+
) t1
12+
WHERE t1.`rownum` <= CAST(3 AS TINYINT)

ibis/backends/flink/tests/test_compiler.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pytest import param
55

66
import ibis
7+
from ibis.common.deferred import _
78

89

910
def test_sum(con, snapshot, simple_table):
@@ -137,9 +138,34 @@ def test_having(con, snapshot, simple_table):
137138
),
138139
],
139140
)
140-
def test_tvf(con, snapshot, simple_table, function_type, params):
141+
def test_windowing_tvf(con, snapshot, simple_table, function_type, params):
141142
expr = getattr(simple_table.window_by(time_col=simple_table.i), function_type)(
142143
**params
143144
)
144145
result = con.compile(expr)
145146
snapshot.assert_match(result, "out.sql")
147+
148+
149+
def test_window_aggregation(con, snapshot, simple_table):
150+
expr = (
151+
simple_table.window_by(time_col=simple_table.i)
152+
.tumble(window_size=ibis.interval(minutes=15))
153+
.group_by(["window_start", "window_end", "g"])
154+
.aggregate(mean=_.d.mean())
155+
)
156+
result = con.compile(expr)
157+
snapshot.assert_match(result, "out.sql")
158+
159+
160+
def test_window_topn(con, snapshot, simple_table):
161+
expr = simple_table.window_by(time_col="i").tumble(
162+
window_size=ibis.interval(seconds=600),
163+
)["a", "b", "c", "d", "g", "window_start", "window_end"]
164+
expr = expr.mutate(
165+
rownum=ibis.row_number().over(
166+
group_by=["window_start", "window_end"], order_by=ibis.desc("g")
167+
)
168+
)
169+
expr = expr[expr.rownum <= 3]
170+
result = con.compile(expr)
171+
snapshot.assert_match(result, "out.sql")

ibis/backends/flink/tests/test_window.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,6 @@ def test_window_does_not_support_multiple_order_by(con, simple_table):
2929
con.compile(expr)
3030

3131

32-
def test_window_does_not_support_desc_order(con, simple_table):
33-
expr = simple_table.f.sum().over(
34-
rows=(-1, 1),
35-
group_by=[simple_table.g, simple_table.a],
36-
order_by=[simple_table.f.desc()],
37-
)
38-
with pytest.raises(
39-
UnsupportedOperationError,
40-
match="Flink only supports windows ordered in ASCENDING mode",
41-
):
42-
con.compile(expr)
43-
44-
4532
@pytest.mark.parametrize(
4633
("window", "err"),
4734
[

0 commit comments

Comments
 (0)