Skip to content

Commit aa70d64

Browse files
author
Liang Zhao
committed
feat(hash agg): do state cleaning by watermarks in hash agg (close #6112)
1 parent c6c4365 commit aa70d64

File tree

12 files changed

+54
-22
lines changed

12 files changed

+54
-22
lines changed

src/storage/src/table/streaming_table/state_table.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use itertools::{izip, Itertools};
2727
use risingwave_common::array::{Op, Row, RowDeserializer, StreamChunk, Vis};
2828
use risingwave_common::buffer::Bitmap;
2929
use risingwave_common::catalog::{ColumnDesc, TableId, TableOption};
30-
use risingwave_common::types::VirtualNode;
30+
use risingwave_common::types::{ScalarImpl, VirtualNode};
3131
use risingwave_common::util::epoch::EpochPair;
3232
use risingwave_common::util::ordered::OrderedRowSerde;
3333
use risingwave_common::util::sort_util::OrderType;
@@ -583,18 +583,25 @@ impl<S: StateStore> StateTable<S> {
583583
self.epoch = Some(new_epoch);
584584
}
585585

586-
pub async fn commit(&mut self, new_epoch: EpochPair) -> StorageResult<()> {
586+
pub async fn commit(
587+
&mut self,
588+
new_epoch: EpochPair,
589+
watermark: Option<&ScalarImpl>,
590+
) -> StorageResult<()> {
587591
assert_eq!(self.epoch(), new_epoch.prev);
588592
let mem_table = std::mem::take(&mut self.mem_table).into_parts();
589-
self.batch_write_rows(mem_table, new_epoch.prev).await?;
593+
self.batch_write_rows(mem_table, new_epoch.prev, watermark)
594+
.await?;
590595
self.update_epoch(new_epoch);
591596
Ok(())
592597
}
593598

594599
/// used for unit test, and do not need to assert epoch.
600+
#[cfg(any(test, feature = "test"))]
595601
pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StorageResult<()> {
596602
let mem_table = std::mem::take(&mut self.mem_table).into_parts();
597-
self.batch_write_rows(mem_table, new_epoch.prev).await?;
603+
self.batch_write_rows(mem_table, new_epoch.prev, None)
604+
.await?;
598605
self.update_epoch(new_epoch);
599606
Ok(())
600607
}
@@ -612,6 +619,7 @@ impl<S: StateStore> StateTable<S> {
612619
&mut self,
613620
buffer: BTreeMap<Vec<u8>, RowOp>,
614621
epoch: u64,
622+
watermark: Option<&ScalarImpl>,
615623
) -> StorageResult<()> {
616624
let mut write_batch = self.keyspace.start_write_batch(WriteOptions {
617625
epoch,
@@ -643,6 +651,17 @@ impl<S: StateStore> StateTable<S> {
643651
}
644652
}
645653
}
654+
if let Some(watermark) = watermark {
655+
let prefix_serializer = self.pk_serde.prefix(1);
656+
let encoded_prefix =
657+
serialize_pk(&Row::new(vec![Some(watermark.clone())]), &prefix_serializer);
658+
for vnode in self.vnodes.ones() {
659+
let vnode_bytes = vnode.to_be_bytes().to_vec();
660+
let mut range_end = vnode_bytes.clone();
661+
range_end.extend(&encoded_prefix);
662+
write_batch.delete_range(vnode_bytes, range_end);
663+
}
664+
}
646665
write_batch.ingest().await?;
647666
Ok(())
648667
}

src/stream/src/executor/dynamic_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
390390
self.right_table.insert(row.clone());
391391
last_committed_epoch_row = Some(row.clone());
392392
}
393-
self.right_table.commit(barrier.epoch).await?;
393+
self.right_table.commit(barrier.epoch, None).await?;
394394
} else {
395395
self.right_table.commit_no_data_expected(barrier.epoch);
396396
}

src/stream/src/executor/global_simple_agg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
214214

215215
// Commit all state tables except for result table.
216216
futures::future::try_join_all(
217-
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)),
217+
iter_table_storage(storages).map(|state_table| state_table.commit(epoch, None)),
218218
)
219219
.await?;
220220

@@ -248,7 +248,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
248248
} else {
249249
result_table.insert(result_row);
250250
}
251-
result_table.commit(epoch).await?;
251+
result_table.commit(epoch, None).await?;
252252

253253
let columns = builders
254254
.into_iter()

src/stream/src/executor/hash_agg.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,13 +395,12 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
395395
ref total_lookup_count,
396396
ref metrics,
397397
ref chunk_size,
398-
buffered_watermarks: ref _buffered_watermarks,
398+
ref buffered_watermarks,
399399
..
400400
}: &'a mut HashAggExecutorExtra<K, S>,
401401
agg_groups: &'a mut AggGroupMap<K, S>,
402402
epoch: EpochPair,
403403
) {
404-
// TODO("https://github.com/risingwavelabs/risingwave/issues/6112"): use buffered_watermarks[0] do some state cleaning
405404
let actor_id_str = ctx.id.to_string();
406405
metrics
407406
.agg_lookup_miss_count
@@ -481,12 +480,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
481480
yield chunk;
482481
}
483482

483+
let state_clean_watermark = buffered_watermarks
484+
.first()
485+
.and_then(|opt_watermark| opt_watermark.as_ref().map(|watermark| &watermark.val));
486+
484487
// Commit all state tables.
485488
futures::future::try_join_all(
486-
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)),
489+
iter_table_storage(storages)
490+
.map(|state_table| state_table.commit(epoch, state_clean_watermark)),
487491
)
488492
.await?;
489-
result_table.commit(epoch).await?;
493+
result_table.commit(epoch, state_clean_watermark).await?;
490494

491495
// Evict cache to target capacity.
492496
agg_groups.evict();

src/stream/src/executor/managed_state/dynamic_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl<S: StateStore> RangeCache<S> {
203203
/// Flush writes to the `StateTable` from the in-memory buffer.
204204
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
205205
// self.metrics.flush();
206-
self.state_table.commit(epoch).await?;
206+
self.state_table.commit(epoch, None).await?;
207207
Ok(())
208208
}
209209
}

src/stream/src/executor/managed_state/join/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,8 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
477477

478478
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
479479
self.metrics.flush();
480-
self.state.table.commit(epoch).await?;
481-
self.degree_state.table.commit(epoch).await?;
480+
self.state.table.commit(epoch, None).await?;
481+
self.degree_state.table.commit(epoch, None).await?;
482482
Ok(())
483483
}
484484

src/stream/src/executor/managed_state/top_n/top_n_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ impl<S: StateStore> ManagedTopNState<S> {
280280
}
281281

282282
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
283-
self.state_table.commit(epoch).await?;
283+
self.state_table.commit(epoch, None).await?;
284284
Ok(())
285285
}
286286
}

src/stream/src/executor/mview/materialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl<S: StateStore> MaterializeExecutor<S> {
143143
Message::Chunk(chunk)
144144
}
145145
Message::Barrier(b) => {
146-
self.state_table.commit(b.epoch).await?;
146+
self.state_table.commit(b.epoch, None).await?;
147147

148148
// Update the vnode bitmap for the state table if asked.
149149
if let Some(vnode_bitmap) = b.as_update_vnode_bitmap(self.actor_context.id) {

src/stream/src/executor/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ impl<S: StateStore> SortExecutor<S> {
216216
}
217217
}
218218
// Commit the epoch.
219-
self.state_table.commit(barrier.epoch).await?;
219+
self.state_table.commit(barrier.epoch, None).await?;
220220
} else {
221221
// If the barrier is not a checkpoint, then there is no actual data to
222222
// commit. Therefore, we simply update the epoch of state table.

src/stream/src/executor/source/source_executor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,10 @@ impl<S: StateStore> SourceExecutor<S> {
215215
self.split_state_store.take_snapshot(cache).await?
216216
}
217217
// commit anyway, even if no message saved
218-
self.split_state_store.state_store.commit(epoch).await?;
218+
self.split_state_store
219+
.state_store
220+
.commit(epoch, None)
221+
.await?;
219222

220223
Ok(())
221224
}

src/stream/src/executor/source/state_table_handler.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ pub(crate) mod tests {
176176

177177
state_table.init_epoch(init_epoch);
178178
state_table.insert(Row::new(vec![a.clone(), b.clone()]));
179-
state_table.commit(next_epoch).await.unwrap();
179+
state_table.commit(next_epoch, None).await.unwrap();
180180

181181
let a: Arc<str> = String::from("a").into();
182182
let a: Datum = Some(ScalarImpl::Utf8(a.as_ref().into()));
@@ -202,9 +202,15 @@ pub(crate) mod tests {
202202
state_table_handler
203203
.take_snapshot(vec![split_impl.clone()])
204204
.await?;
205-
state_table_handler.state_store.commit(epoch_2).await?;
205+
state_table_handler
206+
.state_store
207+
.commit(epoch_2, None)
208+
.await?;
206209

207-
state_table_handler.state_store.commit(epoch_3).await?;
210+
state_table_handler
211+
.state_store
212+
.commit(epoch_3, None)
213+
.await?;
208214

209215
match state_table_handler
210216
.try_recover_from_state_store(&split_impl)

src/stream/src/executor/watermark_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
212212
// FIXME(yuhao): use upsert.
213213
table.insert(row);
214214
}
215-
table.commit(barrier.epoch).await?;
215+
table.commit(barrier.epoch, None).await?;
216216
} else {
217217
table.commit_no_data_expected(barrier.epoch);
218218
}

0 commit comments

Comments
 (0)