@@ -51,6 +51,7 @@ use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution
51
51
use risingwave_storage:: StateStore ;
52
52
use tracing:: trace;
53
53
54
+ use super :: watermark:: { WatermarkBufferByEpoch , WatermarkBufferStrategy } ;
54
55
use crate :: executor:: { StreamExecutorError , StreamExecutorResult } ;
55
56
56
57
/// This num is arbitrary and we may want to improve this choice in the future.
@@ -59,7 +60,10 @@ const STATE_CLEANING_PERIOD_EPOCH: usize = 5;
59
60
/// `StateTable` is the interface accessing relational data in KV(`StateStore`) with
60
61
/// row-based encoding.
61
62
#[ derive( Clone ) ]
62
- pub struct StateTable < S : StateStore > {
63
+ pub struct StateTable <
64
+ S : StateStore ,
65
+ W : WatermarkBufferStrategy = WatermarkBufferByEpoch < STATE_CLEANING_PERIOD_EPOCH > ,
66
+ > {
63
67
/// Id for this table.
64
68
table_id : TableId ,
65
69
@@ -119,12 +123,11 @@ pub struct StateTable<S: StateStore> {
119
123
/// latest watermark
120
124
cur_watermark : Option < ScalarImpl > ,
121
125
122
- /// number of commits with watermark since the last time we did state cleaning by watermark.
123
- num_wmked_commits_since_last_clean : usize ,
126
+ watermark_buffer_strategy : W ,
124
127
}
125
128
126
129
// initialize
127
- impl < S : StateStore > StateTable < S > {
130
+ impl < S : StateStore , W : WatermarkBufferStrategy > StateTable < S , W > {
128
131
/// Create state table from table catalog and store.
129
132
pub async fn from_table_catalog (
130
133
table_catalog : & Table ,
@@ -243,7 +246,7 @@ impl<S: StateStore> StateTable<S> {
243
246
epoch : None ,
244
247
last_watermark : None ,
245
248
cur_watermark : None ,
246
- num_wmked_commits_since_last_clean : 0 ,
249
+ watermark_buffer_strategy : W :: default ( ) ,
247
250
}
248
251
}
249
252
@@ -424,7 +427,7 @@ impl<S: StateStore> StateTable<S> {
424
427
epoch : None ,
425
428
last_watermark : None ,
426
429
cur_watermark : None ,
427
- num_wmked_commits_since_last_clean : 0 ,
430
+ watermark_buffer_strategy : W :: default ( ) ,
428
431
}
429
432
}
430
433
@@ -595,6 +598,7 @@ impl<S: StateStore> StateTable<S> {
595
598
std:: mem:: replace ( & mut self . vnodes , new_vnodes)
596
599
}
597
600
}
601
+
598
602
// write
599
603
impl < S : StateStore > StateTable < S > {
600
604
#[ expect( clippy:: boxed_local) ]
@@ -751,7 +755,7 @@ impl<S: StateStore> StateTable<S> {
751
755
assert_eq ! ( self . epoch( ) , new_epoch. prev) ;
752
756
assert ! ( !self . is_dirty( ) ) ;
753
757
if self . cur_watermark . is_some ( ) {
754
- self . num_wmked_commits_since_last_clean += 1 ;
758
+ self . watermark_buffer_strategy . tick ( ) ;
755
759
}
756
760
self . update_epoch ( new_epoch) ;
757
761
}
@@ -762,15 +766,19 @@ impl<S: StateStore> StateTable<S> {
762
766
buffer : BTreeMap < Bytes , KeyOp > ,
763
767
epoch : u64 ,
764
768
) -> StreamExecutorResult < ( ) > {
765
- let watermark = self . cur_watermark . as_ref ( ) . and_then ( |cur_watermark_ref| {
766
- self . num_wmked_commits_since_last_clean += 1 ;
767
-
768
- if self . num_wmked_commits_since_last_clean >= STATE_CLEANING_PERIOD_EPOCH {
769
- Some ( cur_watermark_ref)
769
+ let watermark = {
770
+ if let Some ( watermark) = self . cur_watermark . take ( ) {
771
+ self . watermark_buffer_strategy . tick ( ) ;
772
+ if !self . watermark_buffer_strategy . apply ( ) {
773
+ self . cur_watermark = Some ( watermark) ;
774
+ None
775
+ } else {
776
+ Some ( watermark)
777
+ }
770
778
} else {
771
779
None
772
780
}
773
- } ) ;
781
+ } ;
774
782
775
783
let mut write_batch = self . local_store . start_write_batch ( WriteOptions {
776
784
epoch,
@@ -784,7 +792,7 @@ impl<S: StateStore> StateTable<S> {
784
792
} ;
785
793
let range_end_suffix = watermark. map ( |watermark| {
786
794
serialize_pk (
787
- row:: once ( Some ( watermark. clone ( ) ) ) ,
795
+ row:: once ( Some ( watermark) ) ,
788
796
prefix_serializer. as_ref ( ) . unwrap ( ) ,
789
797
)
790
798
} ) ;
@@ -835,10 +843,6 @@ impl<S: StateStore> StateTable<S> {
835
843
}
836
844
}
837
845
write_batch. ingest ( ) . await ?;
838
- if watermark. is_some ( ) {
839
- self . last_watermark = self . cur_watermark . take ( ) ;
840
- self . num_wmked_commits_since_last_clean = 0 ;
841
- }
842
846
Ok ( ( ) )
843
847
}
844
848
@@ -950,7 +954,7 @@ fn get_second<T, U>(arg: StreamExecutorResult<(T, U)>) -> StreamExecutorResult<U
950
954
}
951
955
952
956
// Iterator functions
953
- impl < S : StateStore > StateTable < S > {
957
+ impl < S : StateStore , W : WatermarkBufferStrategy > StateTable < S , W > {
954
958
/// This function scans rows from the relational table.
955
959
pub async fn iter ( & self ) -> StreamExecutorResult < RowStream < ' _ , S > > {
956
960
self . iter_with_pk_prefix ( row:: empty ( ) ) . await
0 commit comments