Skip to content

Commit d49a4c5

Browse files
authored
refactor(optimizer): move fd dervie into core (risingwavelabs#8540)
1 parent 305c864 commit d49a4c5

28 files changed

+397
-347
lines changed

src/frontend/src/optimizer/plan_node/generic/agg.rs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use super::super::utils::TableCatalogBuilder;
2727
use super::{stream, GenericPlanNode, GenericPlanRef};
2828
use crate::expr::{Expr, ExprRewriter, InputRef, InputRefDisplay};
2929
use crate::optimizer::optimizer_context::OptimizerContextRef;
30+
use crate::optimizer::property::FunctionalDependencySet;
3031
use crate::stream_fragmenter::BuildFragmentGraphState;
3132
use crate::utils::{
3233
ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay, IndexRewriter,
@@ -46,12 +47,31 @@ pub struct Agg<PlanRef> {
4647
pub input: PlanRef,
4748
}
4849

49-
impl<PlanRef> Agg<PlanRef> {
50+
impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
5051
pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
5152
self.agg_calls.iter_mut().for_each(|call| {
5253
call.filter = call.filter.clone().rewrite_expr(r);
5354
});
5455
}
56+
57+
fn output_len(&self) -> usize {
58+
self.group_key.len() + self.agg_calls.len()
59+
}
60+
61+
/// get the Mapping of columnIndex from input column index to output column index,if a input
62+
/// column corresponds more than one out columns, mapping to any one
63+
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
64+
let mut map = vec![None; self.output_len()];
65+
for (i, key) in self.group_key.iter().enumerate() {
66+
map[i] = Some(*key);
67+
}
68+
ColIndexMapping::with_target_size(map, self.input.schema().len())
69+
}
70+
71+
/// get the Mapping of columnIndex from input column index to out column index
72+
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
73+
self.o2i_col_mapping().inverse()
74+
}
5575
}
5676

5777
impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
@@ -80,6 +100,21 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
80100
fn ctx(&self) -> OptimizerContextRef {
81101
self.input.ctx()
82102
}
103+
104+
fn functional_dependency(&self) -> FunctionalDependencySet {
105+
let output_len = self.output_len();
106+
let _input_len = self.input.schema().len();
107+
let mut fd_set =
108+
FunctionalDependencySet::with_key(output_len, &(0..self.group_key.len()).collect_vec());
109+
// take group keys from input_columns, then grow the target size to column_cnt
110+
let i2o = self.i2o_col_mapping();
111+
for fd in self.input.functional_dependency().as_dependencies() {
112+
if let Some(fd) = i2o.rewrite_functional_dependency(fd) {
113+
fd_set.add_functional_dependency(fd);
114+
}
115+
}
116+
fd_set
117+
}
83118
}
84119

85120
pub enum AggCallState {
@@ -318,24 +353,6 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
318353
.collect()
319354
}
320355

321-
/// get the Mapping of columnIndex from input column index to output column index,if a input
322-
/// column corresponds more than one out columns, mapping to any one
323-
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
324-
let input_len = self.input.schema().len();
325-
let agg_cal_num = self.agg_calls.len();
326-
let group_key = &self.group_key;
327-
let mut map = vec![None; agg_cal_num + group_key.len()];
328-
for (i, key) in group_key.iter().enumerate() {
329-
map[i] = Some(*key);
330-
}
331-
ColIndexMapping::with_target_size(map, input_len)
332-
}
333-
334-
/// get the Mapping of columnIndex from input column index to out column index
335-
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
336-
self.o2i_col_mapping().inverse()
337-
}
338-
339356
pub fn infer_result_table(
340357
&self,
341358
me: &impl GenericPlanRef,

src/frontend/src/optimizer/plan_node/generic/expand.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
use itertools::Itertools;
1616
use risingwave_common::catalog::{Field, FieldDisplay, Schema};
1717
use risingwave_common::types::DataType;
18+
use risingwave_common::util::column_index_mapping::ColIndexMapping;
1819

1920
use super::{GenericPlanNode, GenericPlanRef};
2021
use crate::optimizer::optimizer_context::OptimizerContextRef;
22+
use crate::optimizer::property::FunctionalDependencySet;
2123

2224
/// [`Expand`] expand one row multiple times according to `column_subsets` and also keep
2325
/// original columns of input. It can be used to implement distinct aggregation and group set.
@@ -35,6 +37,16 @@ pub struct Expand<PlanRef> {
3537
pub input: PlanRef,
3638
}
3739

40+
impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
41+
fn output_len(&self) -> usize {
42+
self.input.schema().len() * 2 + 1
43+
}
44+
45+
fn flag_index(&self) -> usize {
46+
self.output_len() - 1
47+
}
48+
}
49+
3850
impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
3951
fn schema(&self) -> Schema {
4052
let mut fields = self.input.schema().clone().into_fields();
@@ -59,6 +71,31 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
5971
fn ctx(&self) -> OptimizerContextRef {
6072
self.input.ctx()
6173
}
74+
75+
fn functional_dependency(&self) -> FunctionalDependencySet {
76+
let input_fd = self
77+
.input
78+
.functional_dependency()
79+
.clone()
80+
.into_dependencies();
81+
let output_len = self.output_len();
82+
let flag_index = self.flag_index();
83+
84+
self.input
85+
.functional_dependency()
86+
.as_dependencies()
87+
.iter()
88+
.map(|_input_fd| {})
89+
.collect_vec();
90+
91+
let mut current_fd = FunctionalDependencySet::new(output_len);
92+
for mut fd in input_fd {
93+
fd.grow(output_len);
94+
fd.set_from(flag_index, true);
95+
current_fd.add_functional_dependency(fd);
96+
}
97+
current_fd
98+
}
6299
}
63100

64101
impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
@@ -73,4 +110,16 @@ impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
73110
})
74111
.collect_vec()
75112
}
113+
114+
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
115+
let input_len = self.input.schema().len();
116+
let map = (0..input_len)
117+
.map(|source| Some(source + input_len))
118+
.collect_vec();
119+
ColIndexMapping::with_target_size(map, self.output_len())
120+
}
121+
122+
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
123+
self.i2o_col_mapping().inverse()
124+
}
76125
}

src/frontend/src/optimizer/plan_node/generic/filter.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use risingwave_common::catalog::Schema;
1717
use super::{GenericPlanNode, GenericPlanRef};
1818
use crate::expr::ExprRewriter;
1919
use crate::optimizer::optimizer_context::OptimizerContextRef;
20+
use crate::optimizer::property::FunctionalDependencySet;
2021
use crate::utils::Condition;
2122

2223
/// [`Filter`] iterates over its input and returns elements for which `predicate` evaluates to
@@ -29,6 +30,7 @@ pub struct Filter<PlanRef> {
2930
pub input: PlanRef,
3031
}
3132

33+
impl<PlanRef: GenericPlanRef> Filter<PlanRef> {}
3234
impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
3335
fn schema(&self) -> Schema {
3436
self.input.schema().clone()
@@ -41,6 +43,21 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
4143
fn ctx(&self) -> OptimizerContextRef {
4244
self.input.ctx()
4345
}
46+
47+
fn functional_dependency(&self) -> FunctionalDependencySet {
48+
let mut functional_dependency = self.input.functional_dependency().clone();
49+
for i in &self.predicate.conjunctions {
50+
if let Some((col, _)) = i.as_eq_const() {
51+
functional_dependency.add_constant_columns(&[col.index()])
52+
} else if let Some((left, right)) = i.as_eq_cond() {
53+
functional_dependency
54+
.add_functional_dependency_by_column_indices(&[left.index()], &[right.index()]);
55+
functional_dependency
56+
.add_functional_dependency_by_column_indices(&[right.index()], &[left.index()]);
57+
}
58+
}
59+
functional_dependency
60+
}
4461
}
4562

4663
impl<PlanRef> Filter<PlanRef> {

src/frontend/src/optimizer/plan_node/generic/hop_window.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use super::super::utils::IndicesDisplay;
2626
use super::{GenericPlanNode, GenericPlanRef};
2727
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
2828
use crate::optimizer::optimizer_context::OptimizerContextRef;
29+
use crate::optimizer::property::FunctionalDependencySet;
30+
use crate::utils::ColIndexMappingRewriteExt;
2931

3032
/// [`HopWindow`] implements Hop Table Function.
3133
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -95,6 +97,24 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
9597
fn ctx(&self) -> OptimizerContextRef {
9698
self.input.ctx()
9799
}
100+
101+
fn functional_dependency(&self) -> FunctionalDependencySet {
102+
let mut fd_set = self
103+
.i2o_col_mapping()
104+
.rewrite_functional_dependency_set(self.input.functional_dependency().clone());
105+
let (start_idx_in_output, end_idx_in_output) = {
106+
let internal2output = self.internal2output_col_mapping();
107+
(
108+
internal2output.try_map(self.internal_window_start_col_idx()),
109+
internal2output.try_map(self.internal_window_end_col_idx()),
110+
)
111+
};
112+
if let Some(start_idx) = start_idx_in_output && let Some(end_idx) = end_idx_in_output {
113+
fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]);
114+
fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]);
115+
}
116+
fd_set
117+
}
98118
}
99119

100120
impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
@@ -113,7 +133,7 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
113133
}
114134

115135
pub fn internal_window_end_col_idx(&self) -> usize {
116-
self.internal_window_start_col_idx() + 1
136+
self.input.schema().len() + 1
117137
}
118138

119139
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
@@ -127,7 +147,7 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
127147
}
128148

129149
pub fn internal_column_num(&self) -> usize {
130-
self.internal_window_start_col_idx() + 2
150+
self.input.schema().len() + 2
131151
}
132152

133153
pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
@@ -139,17 +159,11 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
139159
}
140160

141161
pub fn input2internal_col_mapping(&self) -> ColIndexMapping {
142-
ColIndexMapping::identity_or_none(
143-
self.internal_window_start_col_idx(),
144-
self.internal_column_num(),
145-
)
162+
ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num())
146163
}
147164

148165
pub fn internal2input_col_mapping(&self) -> ColIndexMapping {
149-
ColIndexMapping::identity_or_none(
150-
self.internal_column_num(),
151-
self.internal_window_start_col_idx(),
152-
)
166+
ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len())
153167
}
154168

155169
pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec<ExprImpl>, Vec<ExprImpl>)> {

src/frontend/src/optimizer/plan_node/generic/join.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use risingwave_pb::plan_common::JoinType;
1919
use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
2020
use crate::expr::ExprRewriter;
2121
use crate::optimizer::optimizer_context::OptimizerContextRef;
22-
use crate::utils::{ColIndexMapping, Condition};
22+
use crate::optimizer::property::FunctionalDependencySet;
23+
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
2324

2425
/// [`Join`] combines two relations according to some condition.
2526
///
@@ -141,6 +142,62 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
141142
fn ctx(&self) -> OptimizerContextRef {
142143
self.left.ctx()
143144
}
145+
146+
fn functional_dependency(&self) -> FunctionalDependencySet {
147+
let left_len = self.left.schema().len();
148+
let right_len = self.right.schema().len();
149+
let left_fd_set = self.left.functional_dependency().clone();
150+
let right_fd_set = self.right.functional_dependency().clone();
151+
152+
let full_out_col_num = self.internal_column_num();
153+
154+
let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| {
155+
ColIndexMapping::with_shift_offset(left_len, 0)
156+
.composite(&ColIndexMapping::identity(full_out_col_num))
157+
.rewrite_functional_dependency_set(left_fd_set)
158+
};
159+
let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| {
160+
ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap())
161+
.rewrite_functional_dependency_set(right_fd_set)
162+
};
163+
let fd_set: FunctionalDependencySet = match self.join_type {
164+
JoinType::Inner => {
165+
let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
166+
for i in &self.on.conjunctions {
167+
if let Some((col, _)) = i.as_eq_const() {
168+
fd_set.add_constant_columns(&[col.index()])
169+
} else if let Some((left, right)) = i.as_eq_cond() {
170+
fd_set.add_functional_dependency_by_column_indices(
171+
&[left.index()],
172+
&[right.index()],
173+
);
174+
fd_set.add_functional_dependency_by_column_indices(
175+
&[right.index()],
176+
&[left.index()],
177+
);
178+
}
179+
}
180+
get_new_left_fd_set(left_fd_set)
181+
.into_dependencies()
182+
.into_iter()
183+
.chain(
184+
get_new_right_fd_set(right_fd_set)
185+
.into_dependencies()
186+
.into_iter(),
187+
)
188+
.for_each(|fd| fd_set.add_functional_dependency(fd));
189+
fd_set
190+
}
191+
JoinType::LeftOuter => get_new_left_fd_set(left_fd_set),
192+
JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
193+
JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
194+
JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
195+
JoinType::RightSemi | JoinType::RightAnti => right_fd_set,
196+
JoinType::Unspecified => unreachable!(),
197+
};
198+
ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num)
199+
.rewrite_functional_dependency_set(fd_set)
200+
}
144201
}
145202

146203
impl<PlanRef> Join<PlanRef> {

src/frontend/src/optimizer/plan_node/generic/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use risingwave_common::catalog::Schema;
1616

1717
use super::{stream, EqJoinPredicate};
1818
use crate::optimizer::optimizer_context::OptimizerContextRef;
19+
use crate::optimizer::property::FunctionalDependencySet;
1920

2021
pub mod dynamic_filter;
2122
pub use dynamic_filter::*;
@@ -47,10 +48,20 @@ pub use share::*;
4748
pub trait GenericPlanRef {
4849
fn schema(&self) -> &Schema;
4950
fn logical_pk(&self) -> &[usize];
51+
fn functional_dependency(&self) -> &FunctionalDependencySet;
5052
fn ctx(&self) -> OptimizerContextRef;
5153
}
5254

5355
pub trait GenericPlanNode {
56+
/// return (schema, `logical_pk`, fds)
57+
fn logical_properties(&self) -> (Schema, Option<Vec<usize>>, FunctionalDependencySet) {
58+
(
59+
self.schema(),
60+
self.logical_pk(),
61+
self.functional_dependency(),
62+
)
63+
}
64+
fn functional_dependency(&self) -> FunctionalDependencySet;
5465
fn schema(&self) -> Schema;
5566
fn logical_pk(&self) -> Option<Vec<usize>>;
5667
fn ctx(&self) -> OptimizerContextRef;

src/frontend/src/optimizer/plan_node/generic/project.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use risingwave_common::util::iter_util::ZipEqFast;
2424
use super::{GenericPlanNode, GenericPlanRef};
2525
use crate::expr::{assert_input_ref, Expr, ExprDisplay, ExprImpl, ExprRewriter, InputRef};
2626
use crate::optimizer::optimizer_context::OptimizerContextRef;
27-
use crate::utils::ColIndexMapping;
27+
use crate::optimizer::property::FunctionalDependencySet;
28+
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};
2829

2930
fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
3031
if expr.has_subquery() {
@@ -109,6 +110,11 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
109110
fn ctx(&self) -> OptimizerContextRef {
110111
self.input.ctx()
111112
}
113+
114+
fn functional_dependency(&self) -> FunctionalDependencySet {
115+
let i2o = self.i2o_col_mapping();
116+
i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
117+
}
112118
}
113119

114120
impl<PlanRef: GenericPlanRef> Project<PlanRef> {

0 commit comments

Comments
 (0)