Skip to content

Commit 7d1349d

Browse files
author
Liang Zhao
committed
feat(hash agg): do state cleaning by watermarks in hash agg (close #6112)
1 parent 38ec2ac commit 7d1349d

File tree

12 files changed

+50
-22
lines changed

12 files changed

+50
-22
lines changed

src/stream/src/common/table/state_table.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use risingwave_common::array::{Op, StreamChunk, Vis};
2828
use risingwave_common::buffer::Bitmap;
2929
use risingwave_common::catalog::{ColumnDesc, TableId, TableOption};
3030
use risingwave_common::row::{self, CompactedRow, Row, Row2, RowDeserializer, RowExt};
31-
use risingwave_common::types::VirtualNode;
31+
use risingwave_common::types::{ScalarImpl, VirtualNode};
3232
use risingwave_common::util::epoch::EpochPair;
3333
use risingwave_common::util::ordered::OrderedRowSerde;
3434
use risingwave_common::util::sort_util::OrderType;
@@ -630,18 +630,21 @@ impl<S: StateStore> StateTable<S> {
630630
self.epoch = Some(new_epoch);
631631
}
632632

633-
pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
633+
pub async fn commit(&mut self, new_epoch: EpochPair,
634+
watermark: Option<&ScalarImpl>,) -> StreamExecutorResult<()> {
634635
assert_eq!(self.epoch(), new_epoch.prev);
635636
let mem_table = std::mem::take(&mut self.mem_table).into_parts();
636-
self.batch_write_rows(mem_table, new_epoch.prev).await?;
637+
self.batch_write_rows(mem_table, new_epoch.prev, watermark)
638+
.await?;
637639
self.update_epoch(new_epoch);
638640
Ok(())
639641
}
640642

641643
/// used for unit test, and do not need to assert epoch.
642644
pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
643645
let mem_table = std::mem::take(&mut self.mem_table).into_parts();
644-
self.batch_write_rows(mem_table, new_epoch.prev).await?;
646+
self.batch_write_rows(mem_table, new_epoch.prev, None)
647+
.await?;
645648
self.update_epoch(new_epoch);
646649
Ok(())
647650
}
@@ -660,6 +663,7 @@ impl<S: StateStore> StateTable<S> {
660663
&mut self,
661664
buffer: BTreeMap<Vec<u8>, RowOp>,
662665
epoch: u64,
666+
watermark: Option<&ScalarImpl>,
663667
) -> StreamExecutorResult<()> {
664668
let mut write_batch = self.local_store.start_write_batch(WriteOptions {
665669
epoch,
@@ -691,6 +695,17 @@ impl<S: StateStore> StateTable<S> {
691695
}
692696
}
693697
}
698+
if let Some(watermark) = watermark {
699+
let prefix_serializer = self.pk_serde.prefix(1);
700+
let encoded_prefix =
701+
serialize_pk(&Row::new(vec![Some(watermark.clone())]), &prefix_serializer);
702+
for vnode in self.vnodes.ones() {
703+
let vnode_bytes = vnode.to_be_bytes().to_vec();
704+
let mut range_end = vnode_bytes.clone();
705+
range_end.extend(&encoded_prefix);
706+
write_batch.delete_range(vnode_bytes, range_end);
707+
}
708+
}
694709
write_batch.ingest().await?;
695710
Ok(())
696711
}

src/stream/src/executor/dynamic_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
387387
self.right_table.insert(row.clone());
388388
last_committed_epoch_row = Some(row.clone());
389389
}
390-
self.right_table.commit(barrier.epoch).await?;
390+
self.right_table.commit(barrier.epoch, None).await?;
391391
} else {
392392
self.right_table.commit_no_data_expected(barrier.epoch);
393393
}

src/stream/src/executor/global_simple_agg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
215215

216216
// Commit all state tables except for result table.
217217
futures::future::try_join_all(
218-
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)),
218+
iter_table_storage(storages).map(|state_table| state_table.commit(epoch, None)),
219219
)
220220
.await?;
221221

@@ -249,7 +249,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
249249
} else {
250250
result_table.insert(result_row);
251251
}
252-
result_table.commit(epoch).await?;
252+
result_table.commit(epoch, None).await?;
253253

254254
let columns = builders
255255
.into_iter()

src/stream/src/executor/hash_agg.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -397,13 +397,12 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
397397
ref total_lookup_count,
398398
ref metrics,
399399
ref chunk_size,
400-
buffered_watermarks: ref _buffered_watermarks,
400+
ref buffered_watermarks,
401401
..
402402
}: &'a mut HashAggExecutorExtra<K, S>,
403403
agg_groups: &'a mut AggGroupMap<K, S>,
404404
epoch: EpochPair,
405405
) {
406-
// TODO("https://github.com/risingwavelabs/risingwave/issues/6112"): use buffered_watermarks[0] do some state cleaning
407406
let actor_id_str = ctx.id.to_string();
408407
metrics
409408
.agg_lookup_miss_count
@@ -483,12 +482,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
483482
yield chunk;
484483
}
485484

485+
let state_clean_watermark = buffered_watermarks
486+
.first()
487+
.and_then(|opt_watermark| opt_watermark.as_ref().map(|watermark| &watermark.val));
488+
486489
// Commit all state tables.
487490
futures::future::try_join_all(
488-
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)),
491+
iter_table_storage(storages)
492+
.map(|state_table| state_table.commit(epoch, state_clean_watermark)),
489493
)
490494
.await?;
491-
result_table.commit(epoch).await?;
495+
result_table.commit(epoch, state_clean_watermark).await?;
492496

493497
// Evict cache to target capacity.
494498
agg_groups.evict();

src/stream/src/executor/managed_state/dynamic_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl<S: StateStore> RangeCache<S> {
210210
/// Flush writes to the `StateTable` from the in-memory buffer.
211211
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
212212
// self.metrics.flush();
213-
self.state_table.commit(epoch).await?;
213+
self.state_table.commit(epoch, None).await?;
214214
Ok(())
215215
}
216216
}

src/stream/src/executor/managed_state/join/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,8 +465,8 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
465465

466466
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
467467
self.metrics.flush();
468-
self.state.table.commit(epoch).await?;
469-
self.degree_state.table.commit(epoch).await?;
468+
self.state.table.commit(epoch, None).await?;
469+
self.degree_state.table.commit(epoch, None).await?;
470470
Ok(())
471471
}
472472

src/stream/src/executor/managed_state/top_n/top_n_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ impl<S: StateStore> ManagedTopNState<S> {
280280
}
281281

282282
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
283-
self.state_table.commit(epoch).await?;
283+
self.state_table.commit(epoch, None).await?;
284284
Ok(())
285285
}
286286
}

src/stream/src/executor/mview/materialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl<S: StateStore> MaterializeExecutor<S> {
204204
}
205205
}
206206
Message::Barrier(b) => {
207-
self.state_table.commit(b.epoch).await?;
207+
self.state_table.commit(b.epoch, None).await?;
208208

209209
// Update the vnode bitmap for the state table if asked.
210210
if let Some(vnode_bitmap) = b.as_update_vnode_bitmap(self.actor_context.id) {

src/stream/src/executor/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl<S: StateStore> SortExecutor<S> {
217217
}
218218
}
219219
// Commit the epoch.
220-
self.state_table.commit(barrier.epoch).await?;
220+
self.state_table.commit(barrier.epoch, None).await?;
221221
} else {
222222
// If the barrier is not a checkpoint, then there is no actual data to
223223
// commit. Therefore, we simply update the epoch of state table.

src/stream/src/executor/source/source_executor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,10 @@ impl<S: StateStore> SourceExecutor<S> {
215215
self.split_state_store.take_snapshot(cache).await?
216216
}
217217
// commit anyway, even if no message saved
218-
self.split_state_store.state_store.commit(epoch).await?;
218+
self.split_state_store
219+
.state_store
220+
.commit(epoch, None)
221+
.await?;
219222

220223
Ok(())
221224
}

src/stream/src/executor/source/state_table_handler.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ pub(crate) mod tests {
176176

177177
state_table.init_epoch(init_epoch);
178178
state_table.insert(Row::new(vec![a.clone(), b.clone()]));
179-
state_table.commit(next_epoch).await.unwrap();
179+
state_table.commit(next_epoch, None).await.unwrap();
180180

181181
let a: Arc<str> = String::from("a").into();
182182
let a: Datum = Some(ScalarImpl::Utf8(a.as_ref().into()));
@@ -202,9 +202,15 @@ pub(crate) mod tests {
202202
state_table_handler
203203
.take_snapshot(vec![split_impl.clone()])
204204
.await?;
205-
state_table_handler.state_store.commit(epoch_2).await?;
205+
state_table_handler
206+
.state_store
207+
.commit(epoch_2, None)
208+
.await?;
206209

207-
state_table_handler.state_store.commit(epoch_3).await?;
210+
state_table_handler
211+
.state_store
212+
.commit(epoch_3, None)
213+
.await?;
208214

209215
match state_table_handler
210216
.try_recover_from_state_store(&split_impl)

src/stream/src/executor/watermark_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
212212
// FIXME(yuhao): use upsert.
213213
table.insert(row);
214214
}
215-
table.commit(barrier.epoch).await?;
215+
table.commit(barrier.epoch, None).await?;
216216
} else {
217217
table.commit_no_data_expected(barrier.epoch);
218218
}

0 commit comments

Comments
 (0)