Skip to content

Commit 9e774bb

Browse files
authored
refactor(hash agg): split building change and applying change (risingwavelabs#8706)
Signed-off-by: Richard Chien <[email protected]>
1 parent b474059 commit 9e774bb

File tree

3 files changed

+126
-169
lines changed

3 files changed

+126
-169
lines changed

src/stream/src/executor/aggregation/agg_group.rs

Lines changed: 113 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -31,80 +31,15 @@ use crate::common::table::state_table::StateTable;
3131
use crate::executor::error::StreamExecutorResult;
3232
use crate::executor::PkIndices;
3333

34-
mod changes_builder {
35-
use super::*;
36-
37-
pub(super) fn insert_new_outputs(
38-
curr_outputs: &OwnedRow,
39-
builders: &mut [ArrayBuilderImpl],
40-
new_ops: &mut Vec<Op>,
41-
) -> usize {
42-
new_ops.push(Op::Insert);
43-
44-
for (builder, new_value) in builders.iter_mut().zip_eq_fast(curr_outputs.iter()) {
45-
trace!("insert datum: {:?}", new_value);
46-
builder.append_datum(new_value);
47-
}
48-
49-
1
50-
}
51-
52-
pub(super) fn delete_old_outputs(
53-
prev_outputs: &OwnedRow,
54-
builders: &mut [ArrayBuilderImpl],
55-
new_ops: &mut Vec<Op>,
56-
) -> usize {
57-
new_ops.push(Op::Delete);
58-
59-
for (builder, old_value) in builders.iter_mut().zip_eq_fast(prev_outputs.iter()) {
60-
trace!("delete datum: {:?}", old_value);
61-
builder.append_datum(old_value);
62-
}
63-
64-
1
65-
}
66-
67-
pub(super) fn update_outputs(
68-
prev_outputs: &OwnedRow,
69-
curr_outputs: &OwnedRow,
70-
builders: &mut [ArrayBuilderImpl],
71-
new_ops: &mut Vec<Op>,
72-
) -> usize {
73-
if prev_outputs == curr_outputs {
74-
// Fast path for no change.
75-
return 0;
76-
}
77-
78-
new_ops.push(Op::UpdateDelete);
79-
new_ops.push(Op::UpdateInsert);
80-
81-
for (builder, old_value, new_value) in itertools::multizip((
82-
builders.iter_mut(),
83-
prev_outputs.iter(),
84-
curr_outputs.iter(),
85-
)) {
86-
trace!(
87-
"update datum: prev = {:?}, curr = {:?}",
88-
old_value,
89-
new_value
90-
);
91-
builder.append_datum(old_value);
92-
builder.append_datum(new_value);
93-
}
94-
95-
2
96-
}
97-
}
98-
9934
pub trait Strategy {
100-
fn build_changes(
35+
/// Infer the change type of the aggregation result. Don't need to take the ownership of
36+
/// `prev_outputs` and `curr_outputs`.
37+
fn infer_change_type(
10138
prev_row_count: usize,
10239
curr_row_count: usize,
10340
prev_outputs: Option<&OwnedRow>,
10441
curr_outputs: &OwnedRow,
105-
builders: &mut [ArrayBuilderImpl],
106-
new_ops: &mut Vec<Op>,
107-
) -> usize;
42+
) -> Option<AggChangeType>;
10843
}
10944

11045
/// The strategy that always outputs the aggregation result no matter there're input rows or not.
@@ -114,14 +49,12 @@ pub struct AlwaysOutput;
11449
pub struct OnlyOutputIfHasInput;
11550

11651
impl Strategy for AlwaysOutput {
117-
fn build_changes(
52+
fn infer_change_type(
11853
prev_row_count: usize,
11954
curr_row_count: usize,
12055
prev_outputs: Option<&OwnedRow>,
12156
curr_outputs: &OwnedRow,
122-
builders: &mut [ArrayBuilderImpl],
123-
new_ops: &mut Vec<Op>,
124-
) -> usize {
57+
) -> Option<AggChangeType> {
12558
match prev_outputs {
12659
None => {
12760
// First time to build changes, assert to ensure correctness.
@@ -130,46 +63,48 @@ impl Strategy for AlwaysOutput {
13063
assert_eq!(prev_row_count, 0);
13164

13265
// Generate output no matter whether current row count is 0 or not.
133-
changes_builder::insert_new_outputs(curr_outputs, builders, new_ops)
66+
Some(AggChangeType::Insert)
13467
}
13568
Some(prev_outputs) => {
136-
if prev_row_count == 0 && curr_row_count == 0 {
137-
// No rows exist.
138-
return 0;
69+
if prev_row_count == 0 && curr_row_count == 0 || prev_outputs == curr_outputs {
70+
// No rows exist, or output is not changed.
71+
None
72+
} else {
73+
Some(AggChangeType::Update)
13974
}
140-
changes_builder::update_outputs(prev_outputs, curr_outputs, builders, new_ops)
14175
}
14276
}
14377
}
14478
}
14579

14680
impl Strategy for OnlyOutputIfHasInput {
147-
fn build_changes(
81+
fn infer_change_type(
14882
prev_row_count: usize,
14983
curr_row_count: usize,
15084
prev_outputs: Option<&OwnedRow>,
15185
curr_outputs: &OwnedRow,
152-
builders: &mut [ArrayBuilderImpl],
153-
new_ops: &mut Vec<Op>,
154-
) -> usize {
86+
) -> Option<AggChangeType> {
15587
match (prev_row_count, curr_row_count) {
15688
(0, 0) => {
15789
// No rows of current group exist.
158-
0
90+
None
15991
}
16092
(0, _) => {
16193
// Insert new output row for this newly emerged group.
162-
changes_builder::insert_new_outputs(curr_outputs, builders, new_ops)
94+
Some(AggChangeType::Insert)
16395
}
16496
(_, 0) => {
16597
// Delete old output row for this newly disappeared group.
166-
let prev_outputs = prev_outputs.expect("must exist previous outputs");
167-
changes_builder::delete_old_outputs(prev_outputs, builders, new_ops)
98+
Some(AggChangeType::Delete)
16899
}
169100
(_, _) => {
170101
// Update output row.
171-
let prev_outputs = prev_outputs.expect("must exist previous outputs");
172-
changes_builder::update_outputs(prev_outputs, curr_outputs, builders, new_ops)
102+
if prev_outputs.expect("must exist previous outputs") == curr_outputs {
103+
// No output change.
104+
None
105+
} else {
106+
Some(AggChangeType::Update)
107+
}
173108
}
174109
}
175110
}
@@ -178,7 +113,7 @@ impl Strategy for OnlyOutputIfHasInput {
178113
/// [`AggGroup`] manages agg states of all agg calls for one `group_key`.
179114
pub struct AggGroup<S: StateStore, Strtg: Strategy> {
180115
/// Group key.
181-
group_key: Option<OwnedRow>, // TODO(rc): we can remove this
116+
group_key: Option<OwnedRow>,
182117

183118
/// Current managed states for all [`AggCall`]s.
184119
states: Vec<AggState<S>>,
@@ -201,14 +136,25 @@ impl<S: StateStore, Strtg: Strategy> Debug for AggGroup<S, Strtg> {
201136
}
202137
}
203138

204-
/// Information about the changes built by `AggState::build_changes`.
205-
pub struct AggChangesInfo {
206-
/// The number of rows and corresponding ops in the changes.
207-
pub n_appended_ops: usize,
208-
/// The result row containing group key prefix. To be inserted into result table.
209-
pub result_row: OwnedRow,
210-
/// The previous outputs of all agg calls recorded in the `AggState`.
211-
pub prev_outputs: Option<OwnedRow>,
139+
/// Type of aggregation change.
140+
pub enum AggChangeType {
141+
Insert,
142+
Delete,
143+
Update,
144+
}
145+
146+
/// Aggregation change. The result rows include group key prefix.
147+
pub enum AggChange {
148+
Insert {
149+
new_row: OwnedRow,
150+
},
151+
Delete {
152+
old_row: OwnedRow,
153+
},
154+
Update {
155+
old_row: OwnedRow,
156+
new_row: OwnedRow,
157+
},
212158
}
213159

214160
impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
@@ -318,8 +264,10 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
318264
self.states.iter_mut().for_each(|state| state.reset());
319265
}
320266

321-
/// Get the outputs of all managed agg states.
267+
/// Get the outputs of all managed agg states, without group key prefix.
322268
/// Possibly need to read/sync from state table if the state not cached in memory.
269+
/// This method is idempotent, i.e. it can be called multiple times and the outputs are
270+
/// guaranteed to be the same.
323271
pub async fn get_outputs(
324272
&mut self,
325273
storages: &[AggStateStorage<S>],
@@ -349,15 +297,9 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
349297
.map(OwnedRow::new)
350298
}
351299

352-
/// Build changes into `builders` and `new_ops`, according to previous and current agg outputs.
353-
/// Returns [`AggChangesInfo`] contains information about changes built.
354-
/// The saved previous outputs will be updated to the latest outputs after building changes.
355-
pub fn build_changes(
356-
&mut self,
357-
curr_outputs: OwnedRow,
358-
builders: &mut [ArrayBuilderImpl],
359-
new_ops: &mut Vec<Op>,
360-
) -> AggChangesInfo {
300+
/// Build aggregation result change, according to previous and current agg outputs.
301+
/// The saved previous outputs will be updated to the latest outputs after this method.
302+
pub fn build_change(&mut self, curr_outputs: OwnedRow) -> Option<AggChange> {
361303
let prev_row_count = self.prev_row_count();
362304
let curr_row_count = curr_outputs[self.row_count_index]
363305
.as_ref()
@@ -370,27 +312,78 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
370312
curr_row_count
371313
);
372314

373-
let n_appended_ops = Strtg::build_changes(
315+
let change_type = Strtg::infer_change_type(
374316
prev_row_count,
375317
curr_row_count,
376318
self.prev_outputs.as_ref(),
377319
&curr_outputs,
378-
builders,
379-
new_ops,
380320
);
381321

382-
let result_row = self.group_key().chain(&curr_outputs).into_owned_row();
322+
// Split `AggChangeType` and `AggChange` to avoid unnecessary cloning.
323+
change_type.map(|change_type| match change_type {
324+
AggChangeType::Insert => {
325+
let new_row = self.group_key().chain(&curr_outputs).into_owned_row();
326+
self.prev_outputs = Some(curr_outputs);
327+
AggChange::Insert { new_row }
328+
}
329+
AggChangeType::Delete => {
330+
let prev_outputs = self.prev_outputs.take();
331+
let old_row = self.group_key().chain(prev_outputs).into_owned_row();
332+
AggChange::Delete { old_row }
333+
}
334+
AggChangeType::Update => {
335+
let new_row = self.group_key().chain(&curr_outputs).into_owned_row();
336+
let prev_outputs = self.prev_outputs.replace(curr_outputs);
337+
let old_row = self.group_key().chain(prev_outputs).into_owned_row();
338+
AggChange::Update { old_row, new_row }
339+
}
340+
})
341+
}
383342

384-
let prev_outputs = if n_appended_ops == 0 {
385-
self.prev_outputs.clone()
386-
} else {
387-
std::mem::replace(&mut self.prev_outputs, Some(curr_outputs))
388-
};
343+
pub fn apply_change_to_builders(
344+
&self,
345+
change: &AggChange,
346+
builders: &mut [ArrayBuilderImpl],
347+
ops: &mut Vec<Op>,
348+
) {
349+
match change {
350+
AggChange::Insert { new_row } => {
351+
trace!("insert row: {:?}", new_row);
352+
ops.push(Op::Insert);
353+
for (builder, new_value) in builders.iter_mut().zip_eq_fast(new_row.iter()) {
354+
builder.append_datum(new_value);
355+
}
356+
}
357+
AggChange::Delete { old_row } => {
358+
trace!("delete row: {:?}", old_row);
359+
ops.push(Op::Delete);
360+
for (builder, old_value) in builders.iter_mut().zip_eq_fast(old_row.iter()) {
361+
builder.append_datum(old_value);
362+
}
363+
}
364+
AggChange::Update { old_row, new_row } => {
365+
trace!("update row: prev = {:?}, curr = {:?}", old_row, new_row);
366+
ops.push(Op::UpdateDelete);
367+
ops.push(Op::UpdateInsert);
368+
for (builder, old_value, new_value) in
369+
itertools::multizip((builders.iter_mut(), old_row.iter(), new_row.iter()))
370+
{
371+
builder.append_datum(old_value);
372+
builder.append_datum(new_value);
373+
}
374+
}
375+
}
376+
}
389377

390-
AggChangesInfo {
391-
n_appended_ops,
392-
result_row,
393-
prev_outputs,
378+
pub fn apply_change_to_result_table(
379+
&self,
380+
change: &AggChange,
381+
result_table: &mut StateTable<S>,
382+
) {
383+
match change {
384+
AggChange::Insert { new_row } => result_table.insert(new_row),
385+
AggChange::Delete { old_row } => result_table.delete(old_row),
386+
AggChange::Update { old_row, new_row } => result_table.update(old_row, new_row),
394387
}
395388
}
396389
}

src/stream/src/executor/global_simple_agg.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@ use futures::StreamExt;
1616
use futures_async_stream::try_stream;
1717
use risingwave_common::array::StreamChunk;
1818
use risingwave_common::catalog::Schema;
19-
use risingwave_common::row::RowExt;
2019
use risingwave_common::util::iter_util::ZipEqFast;
2120
use risingwave_storage::StateStore;
2221

2322
use super::agg_common::AggExecutorArgs;
2423
use super::aggregation::{
25-
agg_call_filter_res, iter_table_storage, AggChangesInfo, AggStateStorage, AlwaysOutput,
26-
DistinctDeduplicater,
24+
agg_call_filter_res, iter_table_storage, AggStateStorage, AlwaysOutput, DistinctDeduplicater,
2725
};
2826
use super::*;
2927
use crate::common::table::state_table::StateTable;
@@ -251,29 +249,18 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
251249
let mut new_ops = Vec::with_capacity(2);
252250
// Retrieve modified states and put the changes into the builders.
253251
let curr_outputs = vars.agg_group.get_outputs(&this.storages).await?;
254-
let AggChangesInfo {
255-
result_row,
256-
prev_outputs,
257-
n_appended_ops,
258-
} = vars
259-
.agg_group
260-
.build_changes(curr_outputs, &mut builders, &mut new_ops);
261-
262-
if n_appended_ops == 0 {
252+
if let Some(change) = vars.agg_group.build_change(curr_outputs) {
253+
vars.agg_group
254+
.apply_change_to_builders(&change, &mut builders, &mut new_ops);
255+
vars.agg_group
256+
.apply_change_to_result_table(&change, &mut this.result_table);
257+
this.result_table.commit(epoch).await?;
258+
} else {
263259
// Agg result is not changed.
264260
this.result_table.commit_no_data_expected(epoch);
265261
return Ok(None);
266262
}
267263

268-
// Update the result table with latest agg outputs.
269-
if let Some(prev_outputs) = prev_outputs {
270-
let old_row = vars.agg_group.group_key().chain(prev_outputs);
271-
this.result_table.update(old_row, result_row);
272-
} else {
273-
this.result_table.insert(result_row);
274-
}
275-
this.result_table.commit(epoch).await?;
276-
277264
let columns = builders
278265
.into_iter()
279266
.map(|builder| builder.finish().into())

0 commit comments

Comments
 (0)