Skip to content

Commit 694c446

Browse files
fix(watermark): avoid panic in watermark derivation (close risingwavelabs#8689) (risingwavelabs#8690)
1 parent 870ba34 commit 694c446

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

src/frontend/planner_test/tests/testdata/watermark.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,18 @@
6464
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
6565
└─StreamExchange { dist: HashShard(t2.ts) }
6666
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
67+
- name: left semi hash join
68+
sql: |
69+
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
70+
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
71+
select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2 from t1 where exists (select * from t2 where t1.ts = t2.ts);
72+
stream_plan: |
73+
StreamMaterialize { columns: [t1_ts, t1_v1, t1_v2, t1._row_id(hidden)], pk_columns: [t1._row_id, t1_ts], pk_conflict: "no check", watermark_columns: [t1_ts] }
74+
└─StreamHashJoin { type: LeftSemi, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts], output: all }
75+
├─StreamExchange { dist: HashShard(t1.ts) }
76+
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
77+
└─StreamExchange { dist: HashShard(t2.ts) }
78+
└─StreamTableScan { table: t2, columns: [t2.ts, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
6779
- name: union all
6880
sql: |
6981
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;

src/frontend/src/optimizer/plan_node/stream_hash_join.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,12 @@ impl StreamHashJoin {
7474
if logical.left().watermark_columns().contains(left_key)
7575
&& logical.right().watermark_columns().contains(right_key)
7676
{
77-
watermark_columns.insert(l2i.map(left_key));
78-
watermark_columns.insert(r2i.map(right_key));
77+
if let Some(internal) = l2i.try_map(left_key) {
78+
watermark_columns.insert(internal);
79+
}
80+
if let Some(internal) = r2i.try_map(right_key) {
81+
watermark_columns.insert(internal);
82+
}
7983
}
8084
}
8185
logical.i2o_col_mapping().rewrite_bitset(&watermark_columns)

0 commit comments

Comments
 (0)