@@ -396,13 +396,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
396
396
ref total_lookup_count,
397
397
ref metrics,
398
398
ref chunk_size,
399
- buffered_watermarks : ref _buffered_watermarks ,
399
+ ref buffered_watermarks ,
400
400
..
401
401
} : & ' a mut HashAggExecutorExtra < K , S > ,
402
402
agg_groups : & ' a mut AggGroupMap < K , S > ,
403
403
epoch : EpochPair ,
404
404
) {
405
- // TODO("https://github.com/risingwavelabs/risingwave/issues/6112"): use buffered_watermarks[0] do some state cleaning
405
+ let state_clean_watermark = buffered_watermarks
406
+ . first ( )
407
+ . and_then ( |opt_watermark| opt_watermark. as_ref ( ) )
408
+ . map ( |watermark| watermark. val . clone ( ) ) ;
409
+
406
410
let actor_id_str = ctx. id . to_string ( ) ;
407
411
metrics
408
412
. agg_lookup_miss_count
@@ -483,10 +487,16 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
483
487
}
484
488
485
489
// Commit all state tables.
486
- futures:: future:: try_join_all (
487
- iter_table_storage ( storages) . map ( |state_table| state_table. commit ( epoch) ) ,
488
- )
490
+ futures:: future:: try_join_all ( iter_table_storage ( storages) . map ( |state_table| async {
491
+ if let Some ( watermark) = state_clean_watermark. as_ref ( ) {
492
+ state_table. update_watermark ( watermark. clone ( ) )
493
+ } ;
494
+ state_table. commit ( epoch) . await
495
+ } ) )
489
496
. await ?;
497
+ if let Some ( watermark) = state_clean_watermark. as_ref ( ) {
498
+ result_table. update_watermark ( watermark. clone ( ) ) ;
499
+ } ;
490
500
result_table. commit ( epoch) . await ?;
491
501
492
502
// Evict cache to target capacity.
@@ -495,8 +505,14 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
495
505
// Nothing to flush.
496
506
// Call commit on state table to increment the epoch.
497
507
iter_table_storage ( storages) . for_each ( |state_table| {
508
+ if let Some ( watermark) = state_clean_watermark. as_ref ( ) {
509
+ state_table. update_watermark ( watermark. clone ( ) )
510
+ } ;
498
511
state_table. commit_no_data_expected ( epoch) ;
499
512
} ) ;
513
+ if let Some ( watermark) = state_clean_watermark. as_ref ( ) {
514
+ result_table. update_watermark ( watermark. clone ( ) ) ;
515
+ } ;
500
516
result_table. commit_no_data_expected ( epoch) ;
501
517
return Ok ( ( ) ) ;
502
518
}
0 commit comments