Skip to content

Commit cc3faf6

Browse files
authored
feat(watermark): handle watermark in project_set (risingwavelabs#12128)
1 parent 4bb7c66 commit cc3faf6

File tree

9 files changed

+270
-60
lines changed

9 files changed

+270
-60
lines changed

e2e_test/streaming/watermark.slt

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ create table t (
99
) append only;
1010

1111
statement ok
12-
create materialized view mv as select * from t emit on window close;
12+
create materialized view mv1 as select * from t emit on window close;
13+
14+
statement ok
15+
create materialized view mv2 as select t.ts, unnest(Array[1,2,3]) from t emit on window close;
1316

1417
statement ok
1518
insert into t values ('2023-05-06 16:51:00', 1), ('2023-05-06 16:51:00', 2), ('2023-05-06 16:51:00', 3);
@@ -22,14 +25,31 @@ sleep 5s
2225

2326
skipif in-memory
2427
query TI
25-
select * from mv;
28+
select * from mv1;
29+
----
30+
2023-05-06 16:51:00 1
31+
2023-05-06 16:51:00 2
32+
2023-05-06 16:51:00 3
33+
34+
skipif in-memory
35+
query TI
36+
select * from mv2;
2637
----
2738
2023-05-06 16:51:00 1
2839
2023-05-06 16:51:00 2
2940
2023-05-06 16:51:00 3
41+
2023-05-06 16:51:00 1
42+
2023-05-06 16:51:00 2
43+
2023-05-06 16:51:00 3
44+
2023-05-06 16:51:00 1
45+
2023-05-06 16:51:00 2
46+
2023-05-06 16:51:00 3
47+
48+
statement ok
49+
drop materialized view mv1;
3050

3151
statement ok
32-
drop materialized view mv;
52+
drop materialized view mv2;
3353

3454
statement ok
3555
drop table t;

proto/stream_plan.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,12 @@ message ExpandNode {
540540

541541
message ProjectSetNode {
542542
repeated expr.ProjectSetSelectItem select_list = 1;
543+
// this two field is expressing a list of usize pair, which means when project receives a
544+
// watermark with `watermark_input_cols[i]` column index, it should derive a new watermark
545+
// with `watermark_output_cols[i]`th expression
546+
repeated uint32 watermark_input_cols = 2;
547+
repeated uint32 watermark_expr_indices = 3;
548+
repeated uint32 nondecreasing_exprs = 4;
543549
}
544550

545551
// Sorts inputs and outputs ordered data based on watermark.

src/frontend/planner_test/tests/testdata/input/watermark.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,9 @@
9696
select window_start from hop(t, ts, interval '1' minute, interval '3' minute);
9797
expected_outputs:
9898
- stream_plan
99+
- name: unnest
100+
sql: |
101+
create table t (ts timestamp with time zone, v1 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
102+
explain create materialized view mv as select t.ts, unnest(Array[1,2,3]) from t emit on window close;
103+
expected_outputs:
104+
- explain_output

src/frontend/planner_test/tests/testdata/output/watermark.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,12 @@
210210
└─StreamHopWindow { time_col: t.ts, slide: 00:01:00, size: 00:03:00, output: [window_start, t._row_id], output_watermarks: [window_start] }
211211
└─StreamFilter { predicate: IsNotNull(t.ts) }
212212
└─StreamTableScan { table: t, columns: [t.ts, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
213+
- name: unnest
214+
sql: |
215+
create table t (ts timestamp with time zone, v1 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
216+
explain create materialized view mv as select t.ts, unnest(Array[1,2,3]) from t emit on window close;
217+
explain_output: |
218+
StreamMaterialize { columns: [projected_row_id(hidden), ts, unnest, t._row_id(hidden)], stream_key: [t._row_id, projected_row_id], pk_columns: [t._row_id, projected_row_id], pk_conflict: NoCheck, watermark_columns: [ts] }
219+
└─StreamEowcSort { sort_column: t.ts }
220+
└─StreamProjectSet { select_list: [$0, Unnest(ARRAY[1, 2, 3]:List(Int32)), $1] }
221+
└─StreamTableScan { table: t, columns: [ts, _row_id] }

src/frontend/src/optimizer/plan_node/stream.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -711,14 +711,8 @@ pub fn to_stream_prost_body(
711711
table: Some(me.table.to_internal_table_prost()),
712712
})
713713
}
714-
Node::ProjectSet(me) => {
715-
let me = &me.core;
716-
let select_list = me
717-
.select_list
718-
.iter()
719-
.map(ExprImpl::to_project_set_select_item_proto)
720-
.collect();
721-
PbNodeBody::ProjectSet(ProjectSetNode { select_list })
714+
Node::ProjectSet(_) => {
715+
unreachable!()
722716
}
723717
Node::Project(me) => PbNodeBody::Project(ProjectNode {
724718
select_list: me.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),

src/frontend/src/optimizer/plan_node/stream_project_set.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ use crate::utils::ColIndexMappingRewriteExt;
2828
pub struct StreamProjectSet {
2929
pub base: PlanBase,
3030
logical: generic::ProjectSet<PlanRef>,
31+
/// All the watermark derivations, (input_column_idx, expr_idx). And the
32+
/// derivation expression is the project_set's expression itself.
33+
watermark_derivations: Vec<(usize, usize)>,
34+
/// Nondecreasing expression indices. `ProjectSet` can produce watermarks for these
35+
/// expressions.
36+
nondecreasing_exprs: Vec<usize>,
3137
}
3238

3339
impl StreamProjectSet {
@@ -37,15 +43,26 @@ impl StreamProjectSet {
3743
.i2o_col_mapping()
3844
.rewrite_provided_distribution(input.distribution());
3945

46+
let mut watermark_derivations = vec![];
47+
let mut nondecreasing_exprs = vec![];
4048
let mut watermark_columns = FixedBitSet::with_capacity(logical.output_len());
4149
for (expr_idx, expr) in logical.select_list.iter().enumerate() {
42-
if let WatermarkDerivation::Watermark(input_idx) = try_derive_watermark(expr) {
43-
if input.watermark_columns().contains(input_idx) {
44-
// The first column of ProjectSet is `projected_row_id`.
50+
match try_derive_watermark(expr) {
51+
WatermarkDerivation::Watermark(input_idx) => {
52+
if input.watermark_columns().contains(input_idx) {
53+
watermark_derivations.push((input_idx, expr_idx));
54+
watermark_columns.insert(expr_idx + 1);
55+
}
56+
}
57+
WatermarkDerivation::Nondecreasing => {
58+
nondecreasing_exprs.push(expr_idx);
4559
watermark_columns.insert(expr_idx + 1);
4660
}
61+
WatermarkDerivation::Constant => {
62+
// XXX(rc): we can produce one watermark on each recovery for this case.
63+
}
64+
WatermarkDerivation::None => {}
4765
}
48-
// XXX(rc): do we need to handle `WatermarkDerivation::Nondecreasing` here?
4966
}
5067

5168
// ProjectSet executor won't change the append-only behavior of the stream, so it depends on
@@ -57,7 +74,12 @@ impl StreamProjectSet {
5774
input.emit_on_window_close(),
5875
watermark_columns,
5976
);
60-
StreamProjectSet { base, logical }
77+
StreamProjectSet {
78+
base,
79+
logical,
80+
watermark_derivations,
81+
nondecreasing_exprs,
82+
}
6183
}
6284
}
6385
impl_distill_by_unit!(StreamProjectSet, logical, "StreamProjectSet");
@@ -77,13 +99,21 @@ impl PlanTreeNodeUnary for StreamProjectSet {
7799

78100
impl StreamNode for StreamProjectSet {
79101
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
102+
let (watermark_input_cols, watermark_expr_indices) = self
103+
.watermark_derivations
104+
.iter()
105+
.map(|(i, o)| (*i as u32, *o as u32))
106+
.unzip();
80107
PbNodeBody::ProjectSet(ProjectSetNode {
81108
select_list: self
82109
.logical
83110
.select_list
84111
.iter()
85112
.map(|select_item| select_item.to_project_set_select_item_proto())
86113
.collect_vec(),
114+
watermark_input_cols,
115+
watermark_expr_indices,
116+
nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
87117
})
88118
}
89119
}

0 commit comments

Comments
 (0)