Skip to content

Commit eddb2fc

Browse files
authored
fix(optimizer): ApplyAggTransposeRule should handle CorrelatedInputRef in agg filter (risingwavelabs#8650)
1 parent 32f4925 commit eddb2fc

File tree

6 files changed

+108
-97
lines changed

6 files changed

+108
-97
lines changed

src/frontend/planner_test/tests/testdata/subquery_expr_correlated.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,22 @@
739739
├─LogicalAgg { group_key: [strings.v1], aggs: [] }
740740
| └─LogicalScan { table: strings, columns: [strings.v1] }
741741
└─LogicalScan { table: strings, columns: [strings.v1] }
742+
- name: issue 4762 correlated input in agg filter
743+
sql: |
744+
CREATE TABLE strings(v1 VARCHAR);
745+
SELECT (SELECT STRING_AGG(v1, ',') FILTER (WHERE v1 < t.v1) FROM strings) FROM strings AS t;
746+
optimized_logical_plan_for_batch: |
747+
LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(strings.v1, strings.v1), output: [string_agg(strings.v1, ',':Varchar) filter((strings.v1 < strings.v1))] }
748+
├─LogicalScan { table: strings, columns: [strings.v1] }
749+
└─LogicalAgg { group_key: [strings.v1], aggs: [string_agg(strings.v1, ',':Varchar) filter((strings.v1 < strings.v1))] }
750+
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(strings.v1, strings.v1), output: [strings.v1, strings.v1, ',':Varchar] }
751+
├─LogicalAgg { group_key: [strings.v1], aggs: [] }
752+
| └─LogicalScan { table: strings, columns: [strings.v1] }
753+
└─LogicalProject { exprs: [strings.v1, strings.v1, ',':Varchar] }
754+
└─LogicalJoin { type: Inner, on: true, output: all }
755+
├─LogicalAgg { group_key: [strings.v1], aggs: [] }
756+
| └─LogicalScan { table: strings, columns: [strings.v1] }
757+
└─LogicalScan { table: strings, columns: [strings.v1] }
742758
- name: Existential join on outer join with correlated condition
743759
sql: |
744760
create table t1(x int, y int);

src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ use risingwave_common::types::DataType;
1616
use risingwave_expr::expr::AggKind;
1717
use risingwave_pb::plan_common::JoinType;
1818

19-
use super::{BoxedRule, Rule};
19+
use super::{ApplyOffsetRewriter, BoxedRule, Rule};
2020
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
2121
use crate::optimizer::plan_node::{LogicalAgg, LogicalApply, LogicalFilter, LogicalProject};
2222
use crate::optimizer::PlanRef;
23-
use crate::utils::{ColIndexMapping, Condition};
23+
use crate::utils::Condition;
2424

2525
/// Transpose `LogicalApply` and `LogicalAgg`.
2626
///
@@ -53,7 +53,6 @@ impl Rule for ApplyAggTransposeRule {
5353
let agg: &LogicalAgg = right.as_logical_agg()?;
5454
let (mut agg_calls, agg_group_key, agg_input) = agg.clone().decompose();
5555
let is_scalar_agg = agg_group_key.is_empty();
56-
let agg_input_len = agg_input.schema().len();
5756
let apply_left_len = left.schema().len();
5857

5958
if !is_scalar_agg && max_one_row {
@@ -102,7 +101,7 @@ impl Rule for ApplyAggTransposeRule {
102101
JoinType::LeftOuter,
103102
Condition::true_cond(),
104103
correlated_id,
105-
correlated_indices,
104+
correlated_indices.clone(),
106105
false,
107106
)
108107
.translate_apply(left, eq_predicates)
@@ -113,7 +112,7 @@ impl Rule for ApplyAggTransposeRule {
113112
JoinType::Inner,
114113
Condition::true_cond(),
115114
correlated_id,
116-
correlated_indices,
115+
correlated_indices.clone(),
117116
false,
118117
)
119118
.into()
@@ -122,7 +121,8 @@ impl Rule for ApplyAggTransposeRule {
122121
let group_agg = {
123122
// shift index of agg_calls' `InputRef` with `apply_left_len`.
124123
let offset = apply_left_len as isize;
125-
let mut shift_index = ColIndexMapping::with_shift_offset(agg_input_len, offset);
124+
let mut rewriter =
125+
ApplyOffsetRewriter::new(apply_left_len, &correlated_indices, correlated_id);
126126
agg_calls.iter_mut().for_each(|agg_call| {
127127
agg_call.inputs.iter_mut().for_each(|input_ref| {
128128
input_ref.shift_with_offset(offset);
@@ -131,7 +131,7 @@ impl Rule for ApplyAggTransposeRule {
131131
.order_by
132132
.iter_mut()
133133
.for_each(|o| o.shift_with_offset(offset));
134-
agg_call.filter = agg_call.filter.clone().rewrite_expr(&mut shift_index);
134+
agg_call.filter = agg_call.filter.clone().rewrite_expr(&mut rewriter);
135135
});
136136
if is_scalar_agg {
137137
// convert count(*) to count(1).

src/frontend/src/optimizer/rule/apply_filter_transpose_rule.rs

Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
use itertools::{Either, Itertools};
1616
use risingwave_pb::plan_common::JoinType;
1717

18-
use super::{BoxedRule, Rule};
19-
use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
18+
use super::{ApplyOffsetRewriter, BoxedRule, Rule};
19+
use crate::expr::ExprRewriter;
2020
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, PlanTreeNodeUnary};
2121
use crate::optimizer::PlanRef;
22-
use crate::utils::{ColIndexMapping, Condition};
22+
use crate::utils::Condition;
2323

2424
/// Transpose `LogicalApply` and `LogicalFilter`.
2525
///
@@ -57,19 +57,8 @@ impl Rule for ApplyFilterTransposeRule {
5757
let filter = right.as_logical_filter()?;
5858
let input = filter.input();
5959

60-
let mut rewriter = Rewriter {
61-
offset: left.schema().len(),
62-
index_mapping: ColIndexMapping::new(
63-
correlated_indices
64-
.clone()
65-
.into_iter()
66-
.map(Some)
67-
.collect_vec(),
68-
)
69-
.inverse(),
70-
has_correlated_input_ref: false,
71-
correlated_id,
72-
};
60+
let mut rewriter =
61+
ApplyOffsetRewriter::new(left.schema().len(), &correlated_indices, correlated_id);
7362
// Split predicates in LogicalFilter into correlated expressions and uncorrelated
7463
// expressions.
7564
let (cor_exprs, uncor_exprs) =
@@ -79,8 +68,8 @@ impl Rule for ApplyFilterTransposeRule {
7968
.into_iter()
8069
.partition_map(|expr| {
8170
let expr = rewriter.rewrite_expr(expr);
82-
if rewriter.has_correlated_input_ref {
83-
rewriter.has_correlated_input_ref = false;
71+
if rewriter.has_correlated_input_ref() {
72+
rewriter.reset_state();
8473
Either::Left(expr)
8574
} else {
8675
Either::Right(expr)
@@ -115,33 +104,3 @@ impl ApplyFilterTransposeRule {
115104
Box::new(ApplyFilterTransposeRule {})
116105
}
117106
}
118-
119-
/// Convert `CorrelatedInputRef` to `InputRef` and shift `InputRef` with offset.
120-
struct Rewriter {
121-
offset: usize,
122-
index_mapping: ColIndexMapping,
123-
has_correlated_input_ref: bool,
124-
correlated_id: CorrelatedId,
125-
}
126-
impl ExprRewriter for Rewriter {
127-
fn rewrite_correlated_input_ref(
128-
&mut self,
129-
correlated_input_ref: CorrelatedInputRef,
130-
) -> ExprImpl {
131-
let found = correlated_input_ref.correlated_id() == self.correlated_id;
132-
self.has_correlated_input_ref |= found;
133-
if found {
134-
InputRef::new(
135-
self.index_mapping.map(correlated_input_ref.index()),
136-
correlated_input_ref.return_type(),
137-
)
138-
.into()
139-
} else {
140-
correlated_input_ref.into()
141-
}
142-
}
143-
144-
fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
145-
InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
146-
}
147-
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use itertools::Itertools;
16+
17+
use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
18+
use crate::utils::ColIndexMapping;
19+
20+
/// Convert `CorrelatedInputRef` to `InputRef` and shift `InputRef` with offset.
21+
pub struct ApplyOffsetRewriter {
22+
offset: usize,
23+
index_mapping: ColIndexMapping,
24+
has_correlated_input_ref: bool,
25+
correlated_id: CorrelatedId,
26+
}
27+
28+
impl ExprRewriter for ApplyOffsetRewriter {
29+
fn rewrite_correlated_input_ref(
30+
&mut self,
31+
correlated_input_ref: CorrelatedInputRef,
32+
) -> ExprImpl {
33+
let found = correlated_input_ref.correlated_id() == self.correlated_id;
34+
self.has_correlated_input_ref |= found;
35+
if found {
36+
InputRef::new(
37+
self.index_mapping.map(correlated_input_ref.index()),
38+
correlated_input_ref.return_type(),
39+
)
40+
.into()
41+
} else {
42+
correlated_input_ref.into()
43+
}
44+
}
45+
46+
fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
47+
InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
48+
}
49+
}
50+
51+
impl ApplyOffsetRewriter {
52+
pub fn new(offset: usize, correlated_indices: &[usize], correlated_id: CorrelatedId) -> Self {
53+
Self {
54+
offset,
55+
index_mapping: ColIndexMapping::new(
56+
correlated_indices.iter().copied().map(Some).collect_vec(),
57+
)
58+
.inverse(),
59+
has_correlated_input_ref: false,
60+
correlated_id,
61+
}
62+
}
63+
64+
pub fn has_correlated_input_ref(&self) -> bool {
65+
self.has_correlated_input_ref
66+
}
67+
68+
pub fn reset_state(&mut self) {
69+
self.has_correlated_input_ref = false;
70+
}
71+
}

src/frontend/src/optimizer/rule/apply_project_transpose_rule.rs

Lines changed: 4 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
use itertools::Itertools;
1616
use risingwave_pb::plan_common::JoinType;
1717

18-
use super::{BoxedRule, Rule};
19-
use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
18+
use super::{ApplyOffsetRewriter, BoxedRule, Rule};
19+
use crate::expr::{ExprImpl, ExprRewriter, InputRef};
2020
use crate::optimizer::plan_node::{LogicalApply, LogicalProject};
2121
use crate::optimizer::PlanRef;
22-
use crate::utils::ColIndexMapping;
2322

2423
/// Transpose `LogicalApply` and `LogicalProject`.
2524
///
@@ -64,18 +63,8 @@ impl Rule for ApplyProjectTransposeRule {
6463
let (proj_exprs, proj_input) = project.clone().decompose();
6564

6665
// replace correlated_input_ref in project exprs
67-
let mut rewriter = Rewriter {
68-
offset: left.schema().len(),
69-
index_mapping: ColIndexMapping::new(
70-
correlated_indices
71-
.clone()
72-
.into_iter()
73-
.map(Some)
74-
.collect_vec(),
75-
)
76-
.inverse(),
77-
correlated_id,
78-
};
66+
let mut rewriter =
67+
ApplyOffsetRewriter::new(left.schema().len(), &correlated_indices, correlated_id);
7968

8069
let new_proj_exprs: Vec<ExprImpl> = proj_exprs
8170
.into_iter()
@@ -124,30 +113,3 @@ impl ExprRewriter for ApplyOnConditionRewriter {
124113
}
125114
}
126115
}
127-
128-
/// Convert `CorrelatedInputRef` to `InputRef` and shift `InputRef` with offset.
129-
struct Rewriter {
130-
offset: usize,
131-
index_mapping: ColIndexMapping,
132-
correlated_id: CorrelatedId,
133-
}
134-
impl ExprRewriter for Rewriter {
135-
fn rewrite_correlated_input_ref(
136-
&mut self,
137-
correlated_input_ref: CorrelatedInputRef,
138-
) -> ExprImpl {
139-
if correlated_input_ref.correlated_id() == self.correlated_id {
140-
InputRef::new(
141-
self.index_mapping.map(correlated_input_ref.index()),
142-
correlated_input_ref.return_type(),
143-
)
144-
.into()
145-
} else {
146-
correlated_input_ref.into()
147-
}
148-
}
149-
150-
fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
151-
InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
152-
}
153-
}

src/frontend/src/optimizer/rule/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ pub use avoid_exchange_share_rule::*;
9797
mod min_max_on_index_rule;
9898
pub use min_max_on_index_rule::*;
9999

100+
mod apply_offset_rewriter;
101+
use apply_offset_rewriter::ApplyOffsetRewriter;
102+
100103
#[macro_export]
101104
macro_rules! for_all_rules {
102105
($macro:ident) => {

0 commit comments

Comments
 (0)