Skip to content

Commit 97b021d

Browse files
authored
fix(streaming): ignore null stream key from full outer join to workaround (risingwavelabs#8520)
1 parent 53da2e3 commit 97b021d

File tree

4 files changed

+100
-21
lines changed

4 files changed

+100
-21
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# https://github.com/risingwavelabs/risingwave/issues/8084
2+
3+
statement ok
4+
SET RW_IMPLICIT_FLUSH TO true;
5+
6+
statement ok
7+
create table t (a int primary key);
8+
9+
statement ok
10+
create materialized view mv as select t1.* from t as t1 full join t as t2 on t1.a = t2.a;
11+
12+
statement ok
13+
insert into t values(null);
14+
15+
# TODO: https://github.com/risingwavelabs/risingwave/issues/8084
16+
query I
17+
select * from mv;
18+
----
19+
20+
statement ok
21+
drop materialized view mv;
22+
23+
statement ok
24+
drop table t;

src/frontend/planner_test/tests/testdata/join.yaml

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -198,19 +198,20 @@
198198
StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: "no check" }
199199
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x) }
200200
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
201-
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
202-
├─StreamShare { id = 5 }
203-
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
204-
| ├─StreamExchange { dist: HashShard(i.x) }
205-
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
206-
| └─StreamExchange { dist: HashShard(i.x) }
207-
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
208-
└─StreamShare { id = 5 }
209-
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
210-
├─StreamExchange { dist: HashShard(i.x) }
211-
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
212-
└─StreamExchange { dist: HashShard(i.x) }
213-
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
201+
└─StreamFilter { predicate: (((((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) }
202+
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
203+
├─StreamShare { id = 5 }
204+
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
205+
| ├─StreamExchange { dist: HashShard(i.x) }
206+
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
207+
| └─StreamExchange { dist: HashShard(i.x) }
208+
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
209+
└─StreamShare { id = 5 }
210+
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
211+
├─StreamExchange { dist: HashShard(i.x) }
212+
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
213+
└─StreamExchange { dist: HashShard(i.x) }
214+
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
214215
- name: Use lookup join
215216
sql: |
216217
create table t1 (v1 int, v2 int);
@@ -505,11 +506,12 @@
505506
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
506507
└─StreamProject { exprs: [(2:Int32 * Coalesce(a.x, b.x)) as $expr1, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr2, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr3, a._row_id, b._row_id, a.x, b.x] }
507508
└─StreamFilter { predicate: ((2:Int32 * Coalesce(a.x, b.x)) < 10:Int32) }
508-
└─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] }
509-
├─StreamExchange { dist: HashShard(a.x) }
510-
| └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
511-
└─StreamExchange { dist: HashShard(b.x) }
512-
└─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
509+
└─StreamFilter { predicate: (IsNotNull(a._row_id) OR IsNotNull(b._row_id)) }
510+
└─StreamHashJoin { type: FullOuter, predicate: a.x = b.x, output: [a.x, b.x, a._row_id, b._row_id] }
511+
├─StreamExchange { dist: HashShard(a.x) }
512+
| └─StreamTableScan { table: a, columns: [a.x, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
513+
└─StreamExchange { dist: HashShard(b.x) }
514+
└─StreamTableScan { table: b, columns: [b.x, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
513515
- sql: |
514516
CREATE TABLE test (a INTEGER, b INTEGER);
515517
CREATE TABLE test2 (a INTEGER, c INTEGER);

src/frontend/src/optimizer/plan_node/logical_filter.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ use fixedbitset::FixedBitSet;
1818
use itertools::Itertools;
1919
use risingwave_common::bail;
2020
use risingwave_common::error::Result;
21+
use risingwave_common::types::DataType;
2122

2223
use super::generic::{self, GenericPlanNode};
2324
use super::{
2425
ColPrunable, CollectInputRef, ExprRewritable, LogicalProject, PlanBase, PlanRef,
2526
PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
2627
};
27-
use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter};
28+
use crate::expr::{assert_input_ref, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef};
2829
use crate::optimizer::plan_node::{
2930
BatchFilter, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext,
3031
StreamFilter, ToStreamContext,
@@ -79,7 +80,30 @@ impl LogicalFilter {
7980
}
8081
}
8182

82-
/// the function will check if the predicate is bool expression
83+
/// Create a `LogicalFilter` to filter the rows with all keys are null.
84+
pub fn filter_if_keys_all_null(input: PlanRef, key: &[usize]) -> PlanRef {
85+
let schema = input.schema();
86+
let cond = key.iter().fold(ExprImpl::literal_bool(false), |expr, i| {
87+
ExprImpl::FunctionCall(
88+
FunctionCall::new_unchecked(
89+
ExprType::Or,
90+
vec![
91+
expr,
92+
FunctionCall::new_unchecked(
93+
ExprType::IsNotNull,
94+
vec![InputRef::new(*i, schema.fields()[*i].data_type.clone()).into()],
95+
DataType::Boolean,
96+
)
97+
.into(),
98+
],
99+
DataType::Boolean,
100+
)
101+
.into(),
102+
)
103+
});
104+
LogicalFilter::create_with_expr(input, cond)
105+
}
106+
83107
pub fn create_with_expr(input: PlanRef, predicate: ExprImpl) -> PlanRef {
84108
let predicate = Condition::with_expr(predicate);
85109
Self::new(input, predicate).into()

src/frontend/src/optimizer/plan_node/logical_join.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1480,8 +1480,37 @@ impl ToStream for LogicalJoin {
14801480
}
14811481

14821482
let join_with_pk = join.clone_with_output_indices(new_output_indices);
1483+
1484+
let plan = if join_with_pk.join_type() == JoinType::FullOuter {
1485+
// ignore the all NULL to maintain the stream key's uniqueness, see https://github.com/risingwavelabs/risingwave/issues/8084 for more information
1486+
1487+
let l2o = join_with_pk
1488+
.l2i_col_mapping()
1489+
.composite(&join_with_pk.i2o_col_mapping());
1490+
let r2o = join_with_pk
1491+
.r2i_col_mapping()
1492+
.composite(&join_with_pk.i2o_col_mapping());
1493+
let left_right_stream_keys = join_with_pk
1494+
.left()
1495+
.logical_pk()
1496+
.iter()
1497+
.map(|i| l2o.map(*i))
1498+
.chain(
1499+
join_with_pk
1500+
.right()
1501+
.logical_pk()
1502+
.iter()
1503+
.map(|i| r2o.map(*i)),
1504+
)
1505+
.collect_vec();
1506+
let plan: PlanRef = join_with_pk.into();
1507+
LogicalFilter::filter_if_keys_all_null(plan, &left_right_stream_keys)
1508+
} else {
1509+
join_with_pk.into()
1510+
};
1511+
14831512
// the added columns is at the end, so it will not change the exists column index
1484-
Ok((join_with_pk.into(), out_col_change))
1513+
Ok((plan, out_col_change))
14851514
}
14861515
}
14871516

0 commit comments

Comments
 (0)