Skip to content

Commit b42e591

Browse files
committed
use strategy
Signed-off-by: TennyZhuang <[email protected]>
1 parent 3322544 commit b42e591

31 files changed

+139
-105
lines changed

src/compute/tests/integration_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ async fn test_row_seq_scan() -> Result<()> {
427427
ColumnDesc::unnamed(ColumnId::from(2), schema[2].data_type.clone()),
428428
];
429429

430-
let mut state = StateTable::new_without_distribution(
430+
let mut state = StateTable::<_>::new_without_distribution(
431431
memory_state_store.clone(),
432432
TableId::from(0x42),
433433
column_descs.clone(),

src/stream/src/common/table/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
pub mod state_table;
16-
mod watermark;
16+
pub mod watermark;
1717

1818
#[cfg(test)]
1919
pub mod test_state_table;

src/stream/src/common/table/state_table.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,15 @@ use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution
5151
use risingwave_storage::StateStore;
5252
use tracing::trace;
5353

54-
use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy};
54+
use super::watermark::{WatermarkBufferStrategy, WatermarkBufferStrategyByEpochDefault};
5555
use crate::executor::{StreamExecutorError, StreamExecutorResult};
5656

57-
/// This num is arbitrary and we may want to improve this choice in the future.
58-
const STATE_CLEANING_PERIOD_EPOCH: usize = 5;
59-
6057
/// `StateTable` is the interface accessing relational data in KV(`StateStore`) with
6158
/// row-based encoding.
6259
#[derive(Clone)]
6360
pub struct StateTable<
6461
S: StateStore,
65-
W: WatermarkBufferStrategy = WatermarkBufferByEpoch<STATE_CLEANING_PERIOD_EPOCH>,
62+
W: WatermarkBufferStrategy = WatermarkBufferStrategyByEpochDefault,
6663
> {
6764
/// Id for this table.
6865
table_id: TableId,
@@ -508,7 +505,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
508505
const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions);
509506

510507
// point get
511-
impl<S: StateStore> StateTable<S> {
508+
impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
512509
/// Get a single row from state table.
513510
pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
514511
let compacted_row: Option<CompactedRow> = self.get_compacted_row(pk).await?;
@@ -600,7 +597,7 @@ impl<S: StateStore> StateTable<S> {
600597
}
601598

602599
// write
603-
impl<S: StateStore> StateTable<S> {
600+
impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
604601
#[expect(clippy::boxed_local)]
605602
fn handle_mem_table_error(&self, e: Box<MemTableError>) {
606603
match *e {

src/stream/src/common/table/test_state_table.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async fn test_state_table_update_insert() {
5050

5151
test_env.register_table(table.clone()).await;
5252
let mut state_table =
53-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
53+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
5454
.await;
5555

5656
let mut epoch = EpochPair::new_test_epoch(1);
@@ -228,7 +228,7 @@ async fn test_state_table_iter_with_prefix() {
228228

229229
test_env.register_table(table.clone()).await;
230230
let mut state_table =
231-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
231+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
232232
.await;
233233

234234
let mut epoch = EpochPair::new_test_epoch(1);
@@ -353,7 +353,7 @@ async fn test_state_table_iter_with_pk_range() {
353353

354354
test_env.register_table(table.clone()).await;
355355
let mut state_table =
356-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
356+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
357357
.await;
358358

359359
let mut epoch = EpochPair::new_test_epoch(1);
@@ -487,7 +487,7 @@ async fn test_mem_table_assertion() {
487487

488488
test_env.register_table(table.clone()).await;
489489
let mut state_table =
490-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
490+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
491491
.await;
492492

493493
let epoch = EpochPair::new_test_epoch(1);
@@ -530,7 +530,7 @@ async fn test_state_table_iter_with_value_indices() {
530530

531531
test_env.register_table(table.clone()).await;
532532
let mut state_table =
533-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
533+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
534534
.await;
535535

536536
let mut epoch = EpochPair::new_test_epoch(1);
@@ -691,7 +691,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() {
691691

692692
test_env.register_table(table.clone()).await;
693693
let mut state_table =
694-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
694+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
695695
.await;
696696

697697
let mut epoch = EpochPair::new_test_epoch(1);
@@ -926,7 +926,7 @@ async fn test_state_table_write_chunk() {
926926

927927
test_env.register_table(table.clone()).await;
928928
let mut state_table =
929-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
929+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
930930
.await;
931931

932932
let epoch = EpochPair::new_test_epoch(1);
@@ -1055,7 +1055,7 @@ async fn test_state_table_write_chunk_visibility() {
10551055

10561056
test_env.register_table(table.clone()).await;
10571057
let mut state_table =
1058-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
1058+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
10591059
.await;
10601060

10611061
let epoch = EpochPair::new_test_epoch(1);
@@ -1182,7 +1182,7 @@ async fn test_state_table_write_chunk_value_indices() {
11821182

11831183
test_env.register_table(table.clone()).await;
11841184
let mut state_table =
1185-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
1185+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
11861186
.await;
11871187

11881188
let epoch = EpochPair::new_test_epoch(1);

src/stream/src/common/table/test_storage_table.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async fn test_storage_table_value_indices() {
6464

6565
test_env.register_table(table.clone()).await;
6666
let mut state =
67-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
67+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
6868
.await;
6969

7070
let table = StorageTable::for_test(
@@ -192,7 +192,7 @@ async fn test_shuffled_column_id_for_storage_table_get_row() {
192192

193193
test_env.register_table(table.clone()).await;
194194
let mut state =
195-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
195+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
196196
.await;
197197

198198
let mut epoch = EpochPair::new_test_epoch(1);
@@ -295,7 +295,7 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() {
295295

296296
test_env.register_table(table.clone()).await;
297297
let mut state =
298-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
298+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
299299
.await;
300300

301301
let column_ids_partial = vec![ColumnId::from(1), ColumnId::from(2)];
@@ -402,7 +402,7 @@ async fn test_batch_scan_with_value_indices() {
402402

403403
test_env.register_table(table.clone()).await;
404404
let mut state =
405-
StateTable::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
405+
StateTable::<_>::from_table_catalog_no_sanity_check(&table, test_env.storage.clone(), None)
406406
.await;
407407

408408
let column_ids_partial = vec![ColumnId::from(1), ColumnId::from(2)];

src/stream/src/common/table/watermark.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
/// Strategy to decide how to buffer the watermarks, used for state cleaning.
16-
pub trait WatermarkBufferStrategy: Default {
16+
pub trait WatermarkBufferStrategy: Default + Send + Sync + 'static {
1717
/// Trigger when a epoch is committed.
1818
fn tick(&mut self);
1919

@@ -58,3 +58,9 @@ impl<const PERIOD: usize> WatermarkBufferStrategy for WatermarkBufferByEpoch<PER
5858
}
5959
}
6060
}
61+
62+
/// This num is arbitrary and we may want to improve this choice in the future.
63+
const STATE_CLEANING_PERIOD_EPOCH: usize = 5;
64+
65+
pub type WatermarkBufferStrategyByEpochDefault =
66+
WatermarkBufferByEpoch<STATE_CLEANING_PERIOD_EPOCH>;

src/stream/src/executor/aggregation/agg_group.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use risingwave_storage::StateStore;
2727
use super::agg_state::{AggState, AggStateStorage};
2828
use super::AggCall;
2929
use crate::common::table::state_table::StateTable;
30+
use crate::common::table::watermark::WatermarkBufferStrategy;
3031
use crate::executor::error::StreamExecutorResult;
3132
use crate::executor::PkIndices;
3233

@@ -67,11 +68,11 @@ pub struct AggChangesInfo {
6768
impl<S: StateStore> AggGroup<S> {
6869
/// Create [`AggGroup`] for the given [`AggCall`]s and `group_key`.
6970
/// For [`crate::executor::GlobalSimpleAggExecutor`], the `group_key` should be `None`.
70-
pub async fn create(
71+
pub async fn create<W: WatermarkBufferStrategy>(
7172
group_key: Option<OwnedRow>,
7273
agg_calls: &[AggCall],
7374
storages: &[AggStateStorage<S>],
74-
result_table: &StateTable<S>,
75+
result_table: &StateTable<S, W>,
7576
pk_indices: &PkIndices,
7677
extreme_cache_size: usize,
7778
input_schema: &Schema,

src/stream/src/executor/aggregation/minput.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ mod tests {
325325
.collect_vec();
326326
let mapping = StateTableColumnMapping::new(upstream_columns, None);
327327
let pk_len = order_types.len();
328-
let table = StateTable::new_without_distribution(
328+
let table = StateTable::<_>::new_without_distribution(
329329
MemoryStateStore::new(),
330330
table_id,
331331
columns,

src/stream/src/executor/dynamic_filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,15 +492,15 @@ mod tests {
492492
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
493493
let column_descs = ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64);
494494
// TODO: enable sanity check for dynamic filter <https://github.com/risingwavelabs/risingwave/issues/3893>
495-
let state_table_l = StateTable::new_without_distribution_no_sanity_check(
495+
let state_table_l = StateTable::<_>::new_without_distribution_no_sanity_check(
496496
mem_state.clone(),
497497
TableId::new(0),
498498
vec![column_descs.clone()],
499499
vec![OrderType::Ascending],
500500
vec![0],
501501
)
502502
.await;
503-
let state_table_r = StateTable::new_without_distribution_no_sanity_check(
503+
let state_table_r = StateTable::<_>::new_without_distribution_no_sanity_check(
504504
mem_state,
505505
TableId::new(1),
506506
vec![column_descs],

0 commit comments

Comments
 (0)