Skip to content

Commit e1ae04e

Browse files
authored
fix(streaming): hop executor handle watermark (risingwavelabs#8498)
1 parent c0aa78b commit e1ae04e

File tree

1 file changed

+297
-48
lines changed

1 file changed

+297
-48
lines changed

src/stream/src/executor/hop_window.rs

Lines changed: 297 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,23 @@ impl Executor for HopWindowExecutor {
8585
}
8686

8787
impl HopWindowExecutor {
88+
fn derive_watermarks(
89+
input_len: usize,
90+
time_col_idx: usize,
91+
output_indices: &[usize],
92+
) -> Vec<Vec<usize>> {
93+
let mut watermark_derivations = vec![vec![]; input_len];
94+
for (out_i, in_i) in output_indices.iter().enumerate() {
95+
let in_i = *in_i;
96+
if in_i >= input_len {
97+
watermark_derivations[time_col_idx].push(out_i);
98+
} else {
99+
watermark_derivations[in_i].push(out_i);
100+
}
101+
}
102+
watermark_derivations
103+
}
104+
88105
#[try_stream(ok = Message, error = StreamExecutorError)]
89106
async fn execute_inner(self: Box<Self>) {
90107
let Self {
@@ -95,6 +112,7 @@ impl HopWindowExecutor {
95112
window_size,
96113
output_indices,
97114
info,
115+
time_col_idx,
98116
..
99117
} = *self;
100118
let units = window_size
@@ -111,55 +129,64 @@ impl HopWindowExecutor {
111129

112130
let window_start_col_index = input.schema().len();
113131
let window_end_col_index = input.schema().len() + 1;
132+
let watermark_derivations =
133+
Self::derive_watermarks(input.schema().len(), time_col_idx, &output_indices);
114134
#[for_await]
115135
for msg in input.execute() {
116136
let msg = msg?;
117-
if let Message::Chunk(chunk) = msg {
118-
// TODO: compact may be not necessary here.
119-
let chunk = chunk.compact();
120-
let (data_chunk, ops) = chunk.into_parts();
121-
// SAFETY: Already compacted.
122-
assert!(matches!(data_chunk.vis(), Vis::Compact(_)));
123-
let _len = data_chunk.cardinality();
124-
for i in 0..units {
125-
let window_start_col = if output_indices.contains(&window_start_col_index) {
126-
Some(
127-
self.window_start_exprs[i].eval_infallible(&data_chunk, |err| {
128-
ctx.on_compute_error(err, &info.identity)
129-
}),
130-
)
131-
} else {
132-
None
133-
};
134-
let window_end_col = if output_indices.contains(&window_end_col_index) {
135-
Some(
136-
self.window_end_exprs[i].eval_infallible(&data_chunk, |err| {
137-
ctx.on_compute_error(err, &info.identity)
138-
}),
139-
)
140-
} else {
141-
None
142-
};
143-
let new_cols = output_indices
144-
.iter()
145-
.filter_map(|&idx| {
146-
if idx < window_start_col_index {
147-
Some(data_chunk.column_at(idx).clone())
148-
} else if idx == window_start_col_index {
149-
Some(Column::new(window_start_col.clone().unwrap()))
150-
} else if idx == window_end_col_index {
151-
Some(Column::new(window_end_col.clone().unwrap()))
152-
} else {
153-
None
154-
}
155-
})
156-
.collect();
157-
let new_chunk = StreamChunk::new(ops.clone(), new_cols, None);
158-
yield Message::Chunk(new_chunk);
137+
match msg {
138+
Message::Chunk(chunk) => {
139+
// TODO: compact may be not necessary here.
140+
let chunk = chunk.compact();
141+
let (data_chunk, ops) = chunk.into_parts();
142+
// SAFETY: Already compacted.
143+
assert!(matches!(data_chunk.vis(), Vis::Compact(_)));
144+
let _len = data_chunk.cardinality();
145+
for i in 0..units {
146+
let window_start_col = if output_indices.contains(&window_start_col_index) {
147+
Some(
148+
self.window_start_exprs[i].eval_infallible(&data_chunk, |err| {
149+
ctx.on_compute_error(err, &info.identity)
150+
}),
151+
)
152+
} else {
153+
None
154+
};
155+
let window_end_col = if output_indices.contains(&window_end_col_index) {
156+
Some(
157+
self.window_end_exprs[i].eval_infallible(&data_chunk, |err| {
158+
ctx.on_compute_error(err, &info.identity)
159+
}),
160+
)
161+
} else {
162+
None
163+
};
164+
let new_cols = output_indices
165+
.iter()
166+
.filter_map(|&idx| {
167+
if idx < window_start_col_index {
168+
Some(data_chunk.column_at(idx).clone())
169+
} else if idx == window_start_col_index {
170+
Some(Column::new(window_start_col.clone().unwrap()))
171+
} else if idx == window_end_col_index {
172+
Some(Column::new(window_end_col.clone().unwrap()))
173+
} else {
174+
None
175+
}
176+
})
177+
.collect();
178+
let new_chunk = StreamChunk::new(ops.clone(), new_cols, None);
179+
yield Message::Chunk(new_chunk);
180+
}
181+
}
182+
Message::Barrier(b) => {
183+
yield Message::Barrier(b);
184+
}
185+
Message::Watermark(w) => {
186+
for i in &watermark_derivations[w.col_idx] {
187+
yield Message::Watermark(w.clone().with_idx(*i));
188+
}
159189
}
160-
} else {
161-
yield msg;
162-
continue;
163190
};
164191
}
165192
}
@@ -174,9 +201,9 @@ mod tests {
174201
use risingwave_common::types::{DataType, IntervalUnit};
175202
use risingwave_expr::expr::test_utils::make_hop_window_expression;
176203

177-
use crate::executor::test_utils::MockSource;
178-
use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk};
179-
204+
use super::super::*;
205+
use crate::executor::test_utils::{MessageSender, MockSource};
206+
use crate::executor::{ActorContext, Executor, ExecutorInfo, ScalarImpl, StreamChunk};
180207
fn create_executor(output_indices: Vec<usize>) -> Box<dyn Executor> {
181208
let field1 = Field::unnamed(DataType::Int64);
182209
let field2 = Field::unnamed(DataType::Int64);
@@ -305,4 +332,226 @@ mod tests {
305332
)
306333
);
307334
}
335+
336+
fn create_executor2(output_indices: Vec<usize>) -> (MessageSender, Box<dyn Executor>) {
337+
let field1 = Field::unnamed(DataType::Int64);
338+
let field2 = Field::unnamed(DataType::Int64);
339+
let field3 = Field::with_name(DataType::Timestamp, "created_at");
340+
let schema = Schema::new(vec![field1, field2, field3]);
341+
let pk_indices = vec![0];
342+
let (tx, source) = MockSource::channel(schema.clone(), pk_indices.clone());
343+
344+
let window_slide = IntervalUnit::from_minutes(15);
345+
let window_size = IntervalUnit::from_minutes(30);
346+
let (window_start_exprs, window_end_exprs) =
347+
make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap();
348+
349+
(
350+
tx,
351+
super::HopWindowExecutor::new(
352+
ActorContext::create(123),
353+
Box::new(source),
354+
ExecutorInfo {
355+
// TODO: the schema is incorrect, but it seems useless here.
356+
schema,
357+
pk_indices,
358+
identity: "test".to_string(),
359+
},
360+
2,
361+
window_slide,
362+
window_size,
363+
window_start_exprs,
364+
window_end_exprs,
365+
output_indices,
366+
)
367+
.boxed(),
368+
)
369+
}
370+
371+
#[tokio::test]
372+
async fn test_watermark_full_output() {
373+
let (mut tx, hop) = create_executor2((0..5).collect());
374+
let mut hop = hop.execute();
375+
376+
// TODO: the datatype is incorrect, but it seems useless here.
377+
tx.push_int64_watermark(0, 100);
378+
tx.push_int64_watermark(1, 100);
379+
tx.push_int64_watermark(2, 100);
380+
381+
let w = hop.next().await.unwrap().unwrap();
382+
let w = w.as_watermark().unwrap();
383+
assert_eq!(
384+
w,
385+
&Watermark {
386+
col_idx: 0,
387+
data_type: DataType::Int64,
388+
val: ScalarImpl::Int64(100)
389+
}
390+
);
391+
392+
let w = hop.next().await.unwrap().unwrap();
393+
let w = w.as_watermark().unwrap();
394+
assert_eq!(
395+
w,
396+
&Watermark {
397+
col_idx: 1,
398+
data_type: DataType::Int64,
399+
val: ScalarImpl::Int64(100)
400+
}
401+
);
402+
403+
let w = hop.next().await.unwrap().unwrap();
404+
let w = w.as_watermark().unwrap();
405+
assert_eq!(
406+
w,
407+
&Watermark {
408+
col_idx: 2,
409+
data_type: DataType::Int64,
410+
val: ScalarImpl::Int64(100)
411+
}
412+
);
413+
414+
let w = hop.next().await.unwrap().unwrap();
415+
let w = w.as_watermark().unwrap();
416+
assert_eq!(
417+
w,
418+
&Watermark {
419+
col_idx: 3,
420+
data_type: DataType::Int64,
421+
val: ScalarImpl::Int64(100)
422+
}
423+
);
424+
425+
let w = hop.next().await.unwrap().unwrap();
426+
let w = w.as_watermark().unwrap();
427+
assert_eq!(
428+
w,
429+
&Watermark {
430+
col_idx: 4,
431+
data_type: DataType::Int64,
432+
val: ScalarImpl::Int64(100)
433+
}
434+
);
435+
}
436+
437+
#[tokio::test]
438+
async fn test_watermark_output_indices1() {
439+
let (mut tx, hop) = create_executor2(vec![4, 1, 0, 2]);
440+
let mut hop = hop.execute();
441+
442+
// TODO: the datatype is incorrect, but it seems useless here.
443+
tx.push_int64_watermark(0, 100);
444+
tx.push_int64_watermark(1, 100);
445+
tx.push_int64_watermark(2, 100);
446+
447+
let w = hop.next().await.unwrap().unwrap();
448+
let w = w.as_watermark().unwrap();
449+
assert_eq!(
450+
w,
451+
&Watermark {
452+
col_idx: 2,
453+
data_type: DataType::Int64,
454+
val: ScalarImpl::Int64(100)
455+
}
456+
);
457+
458+
let w = hop.next().await.unwrap().unwrap();
459+
let w = w.as_watermark().unwrap();
460+
assert_eq!(
461+
w,
462+
&Watermark {
463+
col_idx: 1,
464+
data_type: DataType::Int64,
465+
val: ScalarImpl::Int64(100)
466+
}
467+
);
468+
469+
let w = hop.next().await.unwrap().unwrap();
470+
let w = w.as_watermark().unwrap();
471+
assert_eq!(
472+
w,
473+
&Watermark {
474+
col_idx: 0,
475+
data_type: DataType::Int64,
476+
val: ScalarImpl::Int64(100)
477+
}
478+
);
479+
480+
let w = hop.next().await.unwrap().unwrap();
481+
let w = w.as_watermark().unwrap();
482+
assert_eq!(
483+
w,
484+
&Watermark {
485+
col_idx: 3,
486+
data_type: DataType::Int64,
487+
val: ScalarImpl::Int64(100)
488+
}
489+
);
490+
}
491+
492+
#[tokio::test]
493+
async fn test_watermark_output_indices2() {
494+
let (mut tx, hop) = create_executor2(vec![4, 1, 5, 0, 2]);
495+
let mut hop = hop.execute();
496+
497+
// TODO: the datatype is incorrect, but it seems useless here.
498+
tx.push_int64_watermark(0, 100);
499+
tx.push_int64_watermark(1, 100);
500+
tx.push_int64_watermark(2, 100);
501+
502+
let w = hop.next().await.unwrap().unwrap();
503+
let w = w.as_watermark().unwrap();
504+
assert_eq!(
505+
w,
506+
&Watermark {
507+
col_idx: 3,
508+
data_type: DataType::Int64,
509+
val: ScalarImpl::Int64(100)
510+
}
511+
);
512+
513+
let w = hop.next().await.unwrap().unwrap();
514+
let w = w.as_watermark().unwrap();
515+
assert_eq!(
516+
w,
517+
&Watermark {
518+
col_idx: 1,
519+
data_type: DataType::Int64,
520+
val: ScalarImpl::Int64(100)
521+
}
522+
);
523+
524+
let w = hop.next().await.unwrap().unwrap();
525+
let w = w.as_watermark().unwrap();
526+
assert_eq!(
527+
w,
528+
&Watermark {
529+
col_idx: 0,
530+
data_type: DataType::Int64,
531+
val: ScalarImpl::Int64(100)
532+
}
533+
);
534+
535+
let w = hop.next().await.unwrap().unwrap();
536+
let w = w.as_watermark().unwrap();
537+
assert_eq!(
538+
w,
539+
&Watermark {
540+
col_idx: 2,
541+
data_type: DataType::Int64,
542+
val: ScalarImpl::Int64(100)
543+
}
544+
);
545+
546+
let w = hop.next().await.unwrap().unwrap();
547+
let w = w.as_watermark().unwrap();
548+
assert_eq!(
549+
w,
550+
&Watermark {
551+
col_idx: 4,
552+
data_type: DataType::Int64,
553+
val: ScalarImpl::Int64(100)
554+
}
555+
);
556+
}
308557
}

0 commit comments

Comments
 (0)