Skip to content

Commit 380e104

Browse files
authored
perf(stream): add simple strategy for if the stream project compact the chunk (risingwavelabs#8758)
1 parent 8bedb3c commit 380e104

File tree

5 files changed

+41
-6
lines changed

5 files changed

+41
-6
lines changed

src/common/src/array/data_chunk.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,19 @@ impl DataChunk {
138138
&self.vis2
139139
}
140140

141+
pub fn selectivity(&self) -> f64 {
142+
match &self.vis2 {
143+
Vis::Bitmap(b) => {
144+
if b.is_empty() {
145+
0.0
146+
} else {
147+
b.count_ones() as f64 / b.len() as f64
148+
}
149+
}
150+
Vis::Compact(_) => 1.0,
151+
}
152+
}
153+
141154
pub fn with_visibility(&self, visibility: Bitmap) -> Self {
142155
DataChunk::new(self.columns.clone(), visibility)
143156
}

src/common/src/array/stream_chunk.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ impl StreamChunk {
133133
self.data.capacity()
134134
}
135135

136+
pub fn selectivity(&self) -> f64 {
137+
self.data.selectivity()
138+
}
139+
136140
/// Get the reference of the underlying data chunk.
137141
pub fn data_chunk(&self) -> &DataChunk {
138142
&self.data

src/stream/src/executor/integration_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ async fn test_merger_sum_aggr() {
196196
],
197197
3,
198198
MultiMap::new(),
199+
0.0,
199200
);
200201

201202
let items = Arc::new(Mutex::new(vec![]));

src/stream/src/executor/project.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ struct Inner {
4040
/// All the watermark derivations, (input_column_index, output_column_index). And the
4141
/// derivation expression is the project's expression itself.
4242
watermark_derivations: MultiMap<usize, usize>,
43+
44+
/// the selectivity threshold which should be in [0,1]. for the chunk with selectivity less
45+
/// than the threshold, the Project executor will construct a new chunk before expr evaluation,
46+
materialize_selectivity_threshold: f64,
4347
}
4448

4549
impl ProjectExecutor {
@@ -50,6 +54,7 @@ impl ProjectExecutor {
5054
exprs: Vec<BoxedExpression>,
5155
executor_id: u64,
5256
watermark_derivations: MultiMap<usize, usize>,
57+
materialize_selectivity_threshold: f64,
5358
) -> Self {
5459
let info = ExecutorInfo {
5560
schema: input.schema().to_owned(),
@@ -74,6 +79,7 @@ impl ProjectExecutor {
7479
},
7580
exprs,
7681
watermark_derivations,
82+
materialize_selectivity_threshold,
7783
},
7884
}
7985
}
@@ -110,10 +116,12 @@ impl Inner {
110116
&self,
111117
chunk: StreamChunk,
112118
) -> StreamExecutorResult<Option<StreamChunk>> {
113-
let chunk = chunk.compact();
114-
119+
let chunk = if chunk.selectivity() <= self.materialize_selectivity_threshold {
120+
chunk.compact()
121+
} else {
122+
chunk
123+
};
115124
let (data_chunk, ops) = chunk.into_parts();
116-
117125
let mut projected_columns = Vec::new();
118126

119127
for expr in &self.exprs {
@@ -125,8 +133,9 @@ impl Inner {
125133
let new_column = Column::new(evaluated_expr);
126134
projected_columns.push(new_column);
127135
}
128-
129-
let new_chunk = StreamChunk::new(ops, projected_columns, None);
136+
let (_, vis) = data_chunk.into_parts();
137+
let vis = vis.into_visibility();
138+
let new_chunk = StreamChunk::new(ops, projected_columns, vis);
130139
Ok(Some(new_chunk))
131140
}
132141

@@ -233,6 +242,7 @@ mod tests {
233242
vec![test_expr],
234243
1,
235244
MultiMap::new(),
245+
0.0,
236246
));
237247
let mut project = project.execute();
238248

@@ -299,6 +309,7 @@ mod tests {
299309
vec![a_expr, b_expr],
300310
1,
301311
MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
312+
0.0,
302313
));
303314
let mut project = project.execute();
304315

src/stream/src/from_proto/project.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use multimap::MultiMap;
1616
use risingwave_common::util::iter_util::ZipEqFast;
1717
use risingwave_expr::expr::build_from_prost;
18+
use risingwave_pb::expr::expr_node;
1819
use risingwave_pb::stream_plan::ProjectNode;
1920

2021
use super::*;
@@ -49,14 +50,19 @@ impl ExecutorBuilder for ProjectExecutorBuilder {
4950
.map(|key| *key as usize),
5051
),
5152
);
52-
53+
let extremely_light = node.get_select_list().iter().all(|expr| {
54+
let expr_type = expr.get_expr_type().unwrap();
55+
expr_type == expr_node::Type::InputRef || expr_type == expr_node::Type::ConstantValue
56+
});
57+
let materialize_selectivity_threshold = if extremely_light { 0.0 } else { 0.5 };
5358
Ok(ProjectExecutor::new(
5459
params.actor_context,
5560
input,
5661
params.pk_indices,
5762
project_exprs,
5863
params.executor_id,
5964
watermark_derivations,
65+
materialize_selectivity_threshold,
6066
)
6167
.boxed())
6268
}

0 commit comments

Comments
 (0)