Skip to content

Commit 4664a38

Browse files
author
Liang Zhao
committed
refine update watermark
1 parent 7d04b19 commit 4664a38

File tree

1 file changed

+21
-5
lines changed

1 file changed

+21
-5
lines changed

src/stream/src/executor/hash_agg.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,13 +396,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
396396
ref total_lookup_count,
397397
ref metrics,
398398
ref chunk_size,
399-
buffered_watermarks: ref _buffered_watermarks,
399+
ref buffered_watermarks,
400400
..
401401
}: &'a mut HashAggExecutorExtra<K, S>,
402402
agg_groups: &'a mut AggGroupMap<K, S>,
403403
epoch: EpochPair,
404404
) {
405-
// TODO("https://github.com/risingwavelabs/risingwave/issues/6112"): use buffered_watermarks[0] do some state cleaning
405+
let state_clean_watermark = buffered_watermarks
406+
.first()
407+
.and_then(|opt_watermark| opt_watermark.as_ref())
408+
.map(|watermark| watermark.val.clone());
409+
406410
let actor_id_str = ctx.id.to_string();
407411
metrics
408412
.agg_lookup_miss_count
@@ -483,10 +487,16 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
483487
}
484488

485489
// Commit all state tables.
486-
futures::future::try_join_all(
487-
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)),
488-
)
490+
futures::future::try_join_all(iter_table_storage(storages).map(|state_table| async {
491+
if let Some(watermark) = state_clean_watermark.as_ref() {
492+
state_table.update_watermark(watermark.clone())
493+
};
494+
state_table.commit(epoch).await
495+
}))
489496
.await?;
497+
if let Some(watermark) = state_clean_watermark.as_ref() {
498+
result_table.update_watermark(watermark.clone());
499+
};
490500
result_table.commit(epoch).await?;
491501

492502
// Evict cache to target capacity.
@@ -495,8 +505,14 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
495505
// Nothing to flush.
496506
// Call commit on state table to increment the epoch.
497507
iter_table_storage(storages).for_each(|state_table| {
508+
if let Some(watermark) = state_clean_watermark.as_ref() {
509+
state_table.update_watermark(watermark.clone())
510+
};
498511
state_table.commit_no_data_expected(epoch);
499512
});
513+
if let Some(watermark) = state_clean_watermark.as_ref() {
514+
result_table.update_watermark(watermark.clone());
515+
};
500516
result_table.commit_no_data_expected(epoch);
501517
return Ok(());
502518
}

0 commit comments

Comments
 (0)