Skip to content

Commit c19fc72

Browse files
authored
feat: Support optional parameter offset in tumble and hop (risingwavelabs#8490)
1 parent 777e836 commit c19fc72

File tree

14 files changed

+593
-70
lines changed

14 files changed

+593
-70
lines changed

e2e_test/batch/basic/time_window.slt.part

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ from tumble(t1, created_at, interval '30' minute) order by row_id, window_start;
2828
7 1 2022-01-01 10:51:00 2022-01-01 10:30:00 2022-01-01 11:00:00
2929
8 3 2022-01-01 11:02:00 2022-01-01 11:00:00 2022-01-01 11:30:00
3030

31+
32+
query IITTT
33+
select row_id, uid, created_at, window_start, window_end
34+
from tumble(t1, created_at, interval '30' minute, interval '13' minute) order by row_id, window_start;
35+
----
36+
1 1 2022-01-01 10:00:00 2022-01-01 09:43:00 2022-01-01 10:13:00
37+
2 3 2022-01-01 10:05:00 2022-01-01 09:43:00 2022-01-01 10:13:00
38+
3 2 2022-01-01 10:14:00 2022-01-01 10:13:00 2022-01-01 10:43:00
39+
4 1 2022-01-01 10:22:00 2022-01-01 10:13:00 2022-01-01 10:43:00
40+
5 3 2022-01-01 10:33:00 2022-01-01 10:13:00 2022-01-01 10:43:00
41+
6 2 2022-01-01 10:42:00 2022-01-01 10:13:00 2022-01-01 10:43:00
42+
7 1 2022-01-01 10:51:00 2022-01-01 10:43:00 2022-01-01 11:13:00
43+
8 3 2022-01-01 11:02:00 2022-01-01 10:43:00 2022-01-01 11:13:00
44+
45+
3146
query IITTT
3247
select row_id, uid, created_at, window_start, window_end
3348
from hop(t1, created_at, interval '15' minute, interval '30' minute) order by row_id, window_start;
@@ -49,6 +64,27 @@ from hop(t1, created_at, interval '15' minute, interval '30' minute) order by ro
4964
8 3 2022-01-01 11:02:00 2022-01-01 10:45:00 2022-01-01 11:15:00
5065
8 3 2022-01-01 11:02:00 2022-01-01 11:00:00 2022-01-01 11:30:00
5166

67+
query IITTT
68+
select row_id, uid, created_at, window_start, window_end
69+
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute) order by row_id, window_start;
70+
----
71+
1 1 2022-01-01 10:00:00 2022-01-01 09:43:00 2022-01-01 10:13:00
72+
1 1 2022-01-01 10:00:00 2022-01-01 09:58:00 2022-01-01 10:28:00
73+
2 3 2022-01-01 10:05:00 2022-01-01 09:43:00 2022-01-01 10:13:00
74+
2 3 2022-01-01 10:05:00 2022-01-01 09:58:00 2022-01-01 10:28:00
75+
3 2 2022-01-01 10:14:00 2022-01-01 09:58:00 2022-01-01 10:28:00
76+
3 2 2022-01-01 10:14:00 2022-01-01 10:13:00 2022-01-01 10:43:00
77+
4 1 2022-01-01 10:22:00 2022-01-01 09:58:00 2022-01-01 10:28:00
78+
4 1 2022-01-01 10:22:00 2022-01-01 10:13:00 2022-01-01 10:43:00
79+
5 3 2022-01-01 10:33:00 2022-01-01 10:13:00 2022-01-01 10:43:00
80+
5 3 2022-01-01 10:33:00 2022-01-01 10:28:00 2022-01-01 10:58:00
81+
6 2 2022-01-01 10:42:00 2022-01-01 10:13:00 2022-01-01 10:43:00
82+
6 2 2022-01-01 10:42:00 2022-01-01 10:28:00 2022-01-01 10:58:00
83+
7 1 2022-01-01 10:51:00 2022-01-01 10:28:00 2022-01-01 10:58:00
84+
7 1 2022-01-01 10:51:00 2022-01-01 10:43:00 2022-01-01 11:13:00
85+
8 3 2022-01-01 11:02:00 2022-01-01 10:43:00 2022-01-01 11:13:00
86+
8 3 2022-01-01 11:02:00 2022-01-01 10:58:00 2022-01-01 11:28:00
87+
5288
query IIT rowsort
5389
select row_id, uid, created_at
5490
from hop(t1, created_at, interval '15' minute, interval '30' minute);
@@ -70,6 +106,29 @@ from hop(t1, created_at, interval '15' minute, interval '30' minute);
70106
8 3 2022-01-01 11:02:00
71107
8 3 2022-01-01 11:02:00
72108

109+
110+
query IIT rowsort
111+
select row_id, uid, created_at
112+
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute);
113+
----
114+
1 1 2022-01-01 10:00:00
115+
1 1 2022-01-01 10:00:00
116+
2 3 2022-01-01 10:05:00
117+
2 3 2022-01-01 10:05:00
118+
3 2 2022-01-01 10:14:00
119+
3 2 2022-01-01 10:14:00
120+
4 1 2022-01-01 10:22:00
121+
4 1 2022-01-01 10:22:00
122+
5 3 2022-01-01 10:33:00
123+
5 3 2022-01-01 10:33:00
124+
6 2 2022-01-01 10:42:00
125+
6 2 2022-01-01 10:42:00
126+
7 1 2022-01-01 10:51:00
127+
7 1 2022-01-01 10:51:00
128+
8 3 2022-01-01 11:02:00
129+
8 3 2022-01-01 11:02:00
130+
131+
73132
query IT
74133
select sum(v), window_start
75134
from tumble(t1, created_at, interval '30' minute)
@@ -79,6 +138,15 @@ group by window_start order by window_start;
79138
18 2022-01-01 10:30:00
80139
8 2022-01-01 11:00:00
81140

141+
query IT
142+
select sum(v), window_start
143+
from tumble(t1, created_at, interval '30' minute, interval '13' minute)
144+
group by window_start order by window_start;
145+
----
146+
7 2022-01-01 09:43:00
147+
15 2022-01-01 10:13:00
148+
14 2022-01-01 10:43:00
149+
82150
query IIT
83151
select uid, sum(v), window_start
84152
from tumble(t1, created_at, interval '30' minute)
@@ -92,6 +160,20 @@ group by window_start, uid order by window_start, uid;
92160
3 5 2022-01-01 10:30:00
93161
3 8 2022-01-01 11:00:00
94162

163+
query IIT
164+
select uid, sum(v), window_start
165+
from tumble(t1, created_at, interval '30' minute, interval '13' minute)
166+
group by window_start, uid order by window_start, uid;
167+
----
168+
1 4 2022-01-01 09:43:00
169+
3 3 2022-01-01 09:43:00
170+
1 1 2022-01-01 10:13:00
171+
2 9 2022-01-01 10:13:00
172+
3 5 2022-01-01 10:13:00
173+
1 6 2022-01-01 10:43:00
174+
3 8 2022-01-01 10:43:00
175+
176+
95177
query IT
96178
select sum(v), window_start
97179
from hop(t1, created_at, interval '15' minute, interval '30' minute)
@@ -104,6 +186,19 @@ group by window_start order by window_start;
104186
14 2022-01-01 10:45:00
105187
8 2022-01-01 11:00:00
106188

189+
190+
query IT
191+
select sum(v), window_start
192+
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute)
193+
group by window_start order by window_start;
194+
----
195+
7 2022-01-01 09:43:00
196+
10 2022-01-01 09:58:00
197+
15 2022-01-01 10:13:00
198+
18 2022-01-01 10:28:00
199+
14 2022-01-01 10:43:00
200+
8 2022-01-01 10:58:00
201+
107202
query IIT
108203
select uid, sum(v), window_start
109204
from hop(t1, created_at, interval '15' minute, interval '30' minute)
@@ -125,6 +220,28 @@ group by window_start, uid order by window_start, uid;
125220
3 8 2022-01-01 10:45:00
126221
3 8 2022-01-01 11:00:00
127222

223+
224+
225+
query IIT
226+
select uid, sum(v), window_start
227+
from hop(t1, created_at, interval '15' minute, interval '30' minute, interval '13' minute)
228+
group by window_start, uid order by window_start, uid;
229+
----
230+
1 4 2022-01-01 09:43:00
231+
3 3 2022-01-01 09:43:00
232+
1 5 2022-01-01 09:58:00
233+
2 2 2022-01-01 09:58:00
234+
3 3 2022-01-01 09:58:00
235+
1 1 2022-01-01 10:13:00
236+
2 9 2022-01-01 10:13:00
237+
3 5 2022-01-01 10:13:00
238+
1 6 2022-01-01 10:28:00
239+
2 7 2022-01-01 10:28:00
240+
3 5 2022-01-01 10:28:00
241+
1 6 2022-01-01 10:43:00
242+
3 8 2022-01-01 10:43:00
243+
3 8 2022-01-01 10:58:00
244+
128245
statement error
129246
select * from hop(t1, created_at, interval '0', interval '1');
130247

src/batch/src/executor/hop_window.rs

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ impl HopWindowExecutor {
151151
async fn do_execute(self: Box<Self>) {
152152
let Self {
153153
child,
154-
155154
window_slide,
156155
window_size,
157156
output_indices,
@@ -219,7 +218,12 @@ mod tests {
219218
use super::*;
220219
use crate::executor::test_utils::MockExecutor;
221220

222-
fn create_executor(output_indices: Vec<usize>) -> Box<HopWindowExecutor> {
221+
fn create_executor(
222+
output_indices: Vec<usize>,
223+
window_slide: IntervalUnit,
224+
window_size: IntervalUnit,
225+
window_offset: IntervalUnit,
226+
) -> Box<HopWindowExecutor> {
223227
let field1 = Field::unnamed(DataType::Int64);
224228
let field2 = Field::unnamed(DataType::Int64);
225229
let field3 = Field::with_name(DataType::Timestamp, "created_at");
@@ -237,14 +241,17 @@ mod tests {
237241
8 3 ^11:02:00"
238242
.replace('^', "2022-2-2T"),
239243
);
240-
241244
let mut mock_executor = MockExecutor::new(schema.clone());
242245
mock_executor.add(chunk);
243246

244-
let window_slide = IntervalUnit::from_minutes(15);
245-
let window_size = IntervalUnit::from_minutes(30);
246-
let (window_start_exprs, window_end_exprs) =
247-
make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap();
247+
let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
248+
DataType::Timestamp,
249+
2,
250+
window_size,
251+
window_slide,
252+
window_offset,
253+
)
254+
.unwrap();
248255

249256
Box::new(HopWindowExecutor::new(
250257
Box::new(mock_executor),
@@ -259,10 +266,94 @@ mod tests {
259266
))
260267
}
261268

269+
#[tokio::test]
270+
async fn test_window_offset() {
271+
async fn test_window_offset_helper(window_offset: IntervalUnit) -> DataChunk {
272+
let default_indices = (0..3 + 2).collect_vec();
273+
let window_slide = IntervalUnit::from_minutes(15);
274+
let window_size = IntervalUnit::from_minutes(30);
275+
let executor =
276+
create_executor(default_indices, window_slide, window_size, window_offset);
277+
let mut stream = executor.execute();
278+
stream.next().await.unwrap().unwrap()
279+
}
280+
281+
let window_size = 30;
282+
for offset in 0..window_size {
283+
for coefficient in -5..0 {
284+
assert_eq!(
285+
test_window_offset_helper(IntervalUnit::from_minutes(
286+
coefficient * window_size + offset
287+
))
288+
.await,
289+
test_window_offset_helper(IntervalUnit::from_minutes(
290+
(coefficient - 1) * window_size + offset
291+
))
292+
.await
293+
);
294+
}
295+
}
296+
for offset in 0..window_size {
297+
for coefficient in 0..5 {
298+
assert_eq!(
299+
test_window_offset_helper(IntervalUnit::from_minutes(
300+
coefficient * window_size + offset
301+
))
302+
.await,
303+
test_window_offset_helper(IntervalUnit::from_minutes(
304+
(coefficient + 1) * window_size + offset
305+
))
306+
.await
307+
);
308+
}
309+
}
310+
for offset in -window_size..window_size {
311+
assert_eq!(
312+
test_window_offset_helper(IntervalUnit::from_minutes(window_size + offset)).await,
313+
test_window_offset_helper(IntervalUnit::from_minutes(-window_size + offset)).await
314+
);
315+
}
316+
317+
assert_eq!(
318+
test_window_offset_helper(IntervalUnit::from_minutes(-31)).await,
319+
DataChunk::from_pretty(
320+
&"I I TS TS TS
321+
1 1 ^10:00:00 ^09:44:00 ^10:14:00
322+
2 3 ^10:05:00 ^09:44:00 ^10:14:00
323+
3 2 ^10:14:00 ^09:59:00 ^10:29:00
324+
4 1 ^10:22:00 ^09:59:00 ^10:29:00
325+
5 3 ^10:33:00 ^10:14:00 ^10:44:00
326+
6 2 ^10:42:00 ^10:14:00 ^10:44:00
327+
7 1 ^10:51:00 ^10:29:00 ^10:59:00
328+
8 3 ^11:02:00 ^10:44:00 ^11:14:00"
329+
.replace('^', "2022-2-2T"),
330+
)
331+
);
332+
assert_eq!(
333+
test_window_offset_helper(IntervalUnit::from_minutes(29)).await,
334+
DataChunk::from_pretty(
335+
&"I I TS TS TS
336+
1 1 ^10:00:00 ^09:44:00 ^10:14:00
337+
2 3 ^10:05:00 ^09:44:00 ^10:14:00
338+
3 2 ^10:14:00 ^09:59:00 ^10:29:00
339+
4 1 ^10:22:00 ^09:59:00 ^10:29:00
340+
5 3 ^10:33:00 ^10:14:00 ^10:44:00
341+
6 2 ^10:42:00 ^10:14:00 ^10:44:00
342+
7 1 ^10:51:00 ^10:29:00 ^10:59:00
343+
8 3 ^11:02:00 ^10:44:00 ^11:14:00"
344+
.replace('^', "2022-2-2T"),
345+
)
346+
);
347+
}
348+
262349
#[tokio::test]
263350
async fn test_execute() {
264351
let default_indices = (0..3 + 2).collect_vec();
265-
let executor = create_executor(default_indices);
352+
353+
let window_slide = IntervalUnit::from_minutes(15);
354+
let window_size = IntervalUnit::from_minutes(30);
355+
let window_offset = IntervalUnit::from_minutes(0);
356+
let executor = create_executor(default_indices, window_slide, window_size, window_offset);
266357

267358
let mut stream = executor.execute();
268359
// TODO: add more test infra to reduce the duplicated codes below.
@@ -303,7 +394,10 @@ mod tests {
303394
}
304395
#[tokio::test]
305396
async fn test_output_indices() {
306-
let executor = create_executor(vec![1, 3, 4, 2]);
397+
let window_slide = IntervalUnit::from_minutes(15);
398+
let window_size = IntervalUnit::from_minutes(30);
399+
let window_offset = IntervalUnit::from_minutes(0);
400+
let executor = create_executor(vec![1, 3, 4, 2], window_slide, window_size, window_offset);
307401

308402
let mut stream = executor.execute();
309403
// TODO: add more test infra to reduce the duplicated codes below.

src/common/src/types/interval.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ pub struct IntervalUnit {
4545
usecs: i64,
4646
}
4747

48-
const USECS_PER_SEC: i64 = 1_000_000;
49-
const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC;
50-
const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY;
48+
pub const USECS_PER_SEC: i64 = 1_000_000;
49+
pub const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC;
50+
pub const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY;
5151

5252
impl IntervalUnit {
5353
/// Smallest interval value.

src/expr/src/expr/build_expr_from_prost.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ use super::expr_unary::{
5151
use super::expr_vnode::VnodeExpression;
5252
use crate::expr::expr_array_distinct::ArrayDistinctExpression;
5353
use crate::expr::expr_array_to_string::ArrayToStringExpression;
54+
use crate::expr::expr_binary_nonnull::new_tumble_start;
55+
use crate::expr::expr_ternary::new_tumble_start_offset;
5456
use crate::expr::{
5557
build_from_prost as expr_build_from_prost, BoxedExpression, Expression, InputRefExpression,
5658
LiteralExpression,
@@ -69,9 +71,9 @@ pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
6971
build_unary_expr_prost(prost)
7072
}
7173
Equal | NotEqual | LessThan | LessThanOrEqual | GreaterThan | GreaterThanOrEqual | Add
72-
| Subtract | Multiply | Divide | Modulus | Extract | RoundDigit | Pow | TumbleStart
73-
| Position | BitwiseShiftLeft | BitwiseShiftRight | BitwiseAnd | BitwiseOr | BitwiseXor
74-
| ConcatOp | AtTimeZone | CastWithTimeZone | JsonbAccessInner | JsonbAccessStr => {
74+
| Subtract | Multiply | Divide | Modulus | Extract | RoundDigit | Pow | Position
75+
| BitwiseShiftLeft | BitwiseShiftRight | BitwiseAnd | BitwiseOr | BitwiseXor | ConcatOp
76+
| AtTimeZone | CastWithTimeZone | JsonbAccessInner | JsonbAccessStr => {
7577
build_binary_expr_prost(prost)
7678
}
7779
And | Or | IsDistinctFrom | IsNotDistinctFrom | ArrayAccess | FormatType => {
@@ -87,6 +89,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
8789
Translate => build_translate_expr(prost),
8890

8991
// Variable number of arguments and based on `Unary/Binary/Ternary/...Expression`
92+
TumbleStart => build_tumble_start_expr(prost),
9093
Substr => build_substr_expr(prost),
9194
Overlay => build_overlay_expr(prost),
9295
Trim => build_trim_expr(prost),
@@ -272,6 +275,21 @@ fn build_date_trunc_expr(prost: &ExprNode) -> Result<BoxedExpression> {
272275
Ok(new_date_trunc_expr(ret_type, field, source, time_zone))
273276
}
274277

278+
fn build_tumble_start_expr(prost: &ExprNode) -> Result<BoxedExpression> {
279+
let (children, ret_type) = get_children_and_return_type(prost)?;
280+
ensure!(children.len() == 2 || children.len() == 3);
281+
let time = expr_build_from_prost(&children[0])?;
282+
let window_size = expr_build_from_prost(&children[1])?;
283+
if children.len() == 2 {
284+
new_tumble_start(time, window_size, ret_type)
285+
} else if children.len() == 3 {
286+
let offset = expr_build_from_prost(&children[2])?;
287+
new_tumble_start_offset(time, window_size, offset, ret_type)
288+
} else {
289+
unreachable!()
290+
}
291+
}
292+
275293
fn build_length_expr(prost: &ExprNode) -> Result<BoxedExpression> {
276294
let (children, ret_type) = get_children_and_return_type(prost)?;
277295
// TODO: add encoding length expr

0 commit comments

Comments
 (0)