Skip to content

Commit 3c8ae2f

Browse files
author
Liang Zhao
committed
feat(watermark): Clean state in DynamicFilter by watermark (close #6472)
1 parent d772092 commit 3c8ae2f

14 files changed

+151
-37
lines changed

src/stream/src/common/table/state_table.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use risingwave_common::array::{Op, StreamChunk, Vis};
2828
use risingwave_common::buffer::Bitmap;
2929
use risingwave_common::catalog::{ColumnDesc, TableId, TableOption};
3030
use risingwave_common::row::{self, CompactedRow, Row, Row2, RowDeserializer, RowExt};
31-
use risingwave_common::types::VirtualNode;
31+
use risingwave_common::types::{ScalarImpl, VirtualNode, VIRTUAL_NODE_SIZE};
3232
use risingwave_common::util::epoch::EpochPair;
3333
use risingwave_common::util::ordered::OrderedRowSerde;
3434
use risingwave_common::util::sort_util::OrderType;
@@ -106,6 +106,9 @@ pub struct StateTable<S: StateStore> {
106106

107107
/// The epoch flush to the state store last time.
108108
epoch: Option<EpochPair>,
109+
110+
/// last watermark that is used to construct delete ranges in `ingest`
111+
last_watermark: Option<ScalarImpl>,
109112
}
110113

111114
// initialize
@@ -219,6 +222,7 @@ impl<S: StateStore> StateTable<S> {
219222
vnode_col_idx_in_pk,
220223
value_indices,
221224
epoch: None,
225+
last_watermark: None,
222226
}
223227
}
224228

@@ -321,6 +325,7 @@ impl<S: StateStore> StateTable<S> {
321325
vnode_col_idx_in_pk: None,
322326
value_indices,
323327
epoch: None,
328+
last_watermark: None,
324329
}
325330
}
326331

@@ -489,6 +494,8 @@ impl<S: StateStore> StateTable<S> {
489494
}
490495
assert_eq!(self.vnodes.len(), new_vnodes.len());
491496

497+
self.last_watermark = None;
498+
492499
std::mem::replace(&mut self.vnodes, new_vnodes)
493500
}
494501
}
@@ -630,18 +637,24 @@ impl<S: StateStore> StateTable<S> {
630637
self.epoch = Some(new_epoch);
631638
}
632639

633-
pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
640+
pub async fn commit(
641+
&mut self,
642+
new_epoch: EpochPair,
643+
watermark: Option<&ScalarImpl>,
644+
) -> StreamExecutorResult<()> {
634645
assert_eq!(self.epoch(), new_epoch.prev);
635646
let mem_table = std::mem::take(&mut self.mem_table).into_parts();
636-
self.batch_write_rows(mem_table, new_epoch.prev).await?;
647+
self.batch_write_rows(mem_table, new_epoch.prev, watermark)
648+
.await?;
637649
self.update_epoch(new_epoch);
638650
Ok(())
639651
}
640652

641653
/// used for unit test, and do not need to assert epoch.
642654
pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
643655
let mem_table = std::mem::take(&mut self.mem_table).into_parts();
644-
self.batch_write_rows(mem_table, new_epoch.prev).await?;
656+
self.batch_write_rows(mem_table, new_epoch.prev, None)
657+
.await?;
645658
self.update_epoch(new_epoch);
646659
Ok(())
647660
}
@@ -660,12 +673,28 @@ impl<S: StateStore> StateTable<S> {
660673
&mut self,
661674
buffer: BTreeMap<Vec<u8>, RowOp>,
662675
epoch: u64,
676+
watermark: Option<&ScalarImpl>,
663677
) -> StreamExecutorResult<()> {
664678
let mut write_batch = self.local_store.start_write_batch(WriteOptions {
665679
epoch,
666680
table_id: self.table_id(),
667681
});
682+
683+
let prefix_serializer = if self.pk_indices().is_empty() {
684+
None
685+
} else {
686+
Some(self.pk_serde.prefix(1))
687+
};
688+
let range_end_suffix = watermark.map(|watermark| {
689+
serialize_pk(
690+
&Row::new(vec![Some(watermark.clone())]),
691+
prefix_serializer.as_ref().unwrap(),
692+
)
693+
});
668694
for (pk, row_op) in buffer {
695+
if let Some(ref range_end) = range_end_suffix && &pk[VIRTUAL_NODE_SIZE..] < range_end.as_slice() {
696+
continue;
697+
}
669698
match row_op {
670699
// Currently, some executors do not strictly comply with these semantics. As a
671700
// workaround you may call disable the check by calling `.disable_sanity_check()` on
@@ -691,7 +720,27 @@ impl<S: StateStore> StateTable<S> {
691720
}
692721
}
693722
}
723+
if let Some(range_end_suffix) = range_end_suffix {
724+
let range_begin_suffix = if let Some(ref last_watermark) = self.last_watermark {
725+
serialize_pk(
726+
&Row::new(vec![Some(last_watermark.clone())]),
727+
prefix_serializer.as_ref().unwrap(),
728+
)
729+
} else {
730+
vec![]
731+
};
732+
for vnode in self.vnodes.ones() {
733+
let mut range_begin = vnode.to_be_bytes().to_vec();
734+
let mut range_end = range_begin.clone();
735+
range_begin.extend(&range_begin_suffix);
736+
range_end.extend(&range_end_suffix);
737+
write_batch.delete_range(range_begin, range_end);
738+
}
739+
}
694740
write_batch.ingest().await?;
741+
if let Some(watermark) = watermark {
742+
self.last_watermark = Some(watermark.clone());
743+
}
695744
Ok(())
696745
}
697746

src/stream/src/executor/barrier_align.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use futures_async_stream::try_stream;
2323
use risingwave_common::bail;
2424

2525
use super::error::StreamExecutorError;
26-
use super::{Barrier, BoxedMessageStream, Message, StreamChunk, StreamExecutorResult};
26+
use super::{Barrier, BoxedMessageStream, Message, StreamChunk, StreamExecutorResult, Watermark};
2727
use crate::executor::monitor::StreamingMetrics;
2828
use crate::task::ActorId;
2929

@@ -33,6 +33,8 @@ pub trait AlignedMessageStream = futures::Stream<Item = AlignedMessageStreamItem
3333
#[derive(Debug, EnumAsInner, PartialEq)]
3434
pub enum AlignedMessage {
3535
Barrier(Barrier),
36+
WatermarkLeft(Watermark),
37+
WatermarkRight(Watermark),
3638
Left(StreamChunk),
3739
Right(StreamChunk),
3840
}
@@ -60,8 +62,8 @@ pub async fn barrier_align(
6062
// left stream end, passthrough right chunks
6163
while let Some(msg) = right.next().await {
6264
match msg? {
63-
Message::Watermark(_) => {
64-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
65+
Message::Watermark(watermark) => {
66+
yield AlignedMessage::WatermarkRight(watermark)
6567
}
6668
Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
6769
Message::Barrier(_) => {
@@ -75,8 +77,8 @@ pub async fn barrier_align(
7577
// right stream end, passthrough left chunks
7678
while let Some(msg) = left.next().await {
7779
match msg? {
78-
Message::Watermark(_) => {
79-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
80+
Message::Watermark(watermark) => {
81+
yield AlignedMessage::WatermarkLeft(watermark)
8082
}
8183
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
8284
Message::Barrier(_) => {
@@ -87,9 +89,7 @@ pub async fn barrier_align(
8789
break;
8890
}
8991
Either::Left((Some(msg), _)) => match msg? {
90-
Message::Watermark(_) => {
91-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
92-
}
92+
Message::Watermark(watermark) => yield AlignedMessage::WatermarkLeft(watermark),
9393
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
9494
Message::Barrier(_) => loop {
9595
let start_time = Instant::now();
@@ -99,8 +99,8 @@ pub async fn barrier_align(
9999
.await
100100
.context("failed to poll right message, stream closed unexpectedly")??
101101
{
102-
Message::Watermark(_) => {
103-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
102+
Message::Watermark(watermark) => {
103+
yield AlignedMessage::WatermarkRight(watermark)
104104
}
105105
Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
106106
Message::Barrier(barrier) => {
@@ -115,9 +115,7 @@ pub async fn barrier_align(
115115
},
116116
},
117117
Either::Right((Some(msg), _)) => match msg? {
118-
Message::Watermark(_) => {
119-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
120-
}
118+
Message::Watermark(watermark) => yield AlignedMessage::WatermarkRight(watermark),
121119
Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
122120
Message::Barrier(_) => loop {
123121
let start_time = Instant::now();
@@ -127,8 +125,8 @@ pub async fn barrier_align(
127125
.await
128126
.context("failed to poll left message, stream closed unexpectedly")??
129127
{
130-
Message::Watermark(_) => {
131-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
128+
Message::Watermark(watermark) => {
129+
yield AlignedMessage::WatermarkLeft(watermark)
132130
}
133131
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
134132
Message::Barrier(barrier) => {

src/stream/src/executor/dynamic_filter.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,9 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
300300
left_to_output,
301301
)?;
302302

303+
let is_watermark_direct_accord = !matches!(self.comparator, LessThan | LessThanOrEqual);
304+
let mut unused_clean_hint = None;
305+
303306
#[for_await]
304307
for msg in aligned_stream {
305308
match msg? {
@@ -354,6 +357,14 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
354357
}
355358
}
356359
}
360+
AlignedMessage::WatermarkLeft(_) => {
361+
// Do nothing.
362+
}
363+
AlignedMessage::WatermarkRight(watermark) => {
364+
if is_watermark_direct_accord {
365+
unused_clean_hint = Some(watermark);
366+
}
367+
}
357368
AlignedMessage::Barrier(barrier) => {
358369
// Flush the difference between the `prev_value` and `current_value`
359370
//
@@ -383,6 +394,14 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
383394
}
384395
}
385396

397+
let mut flush_watermark = None;
398+
if let Some(mut watermark) = unused_clean_hint.take() {
399+
self.range_cache.shrink(&watermark.val);
400+
flush_watermark = Some(watermark.val.clone());
401+
watermark.col_idx = self.key_l;
402+
yield Message::Watermark(watermark);
403+
};
404+
386405
// Update the committed value on RHS if it has changed.
387406
if last_committed_epoch_row != current_epoch_row {
388407
// Only write the RHS value if this actor is in charge of vnode 0
@@ -396,7 +415,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
396415
if let Some(row) = &current_epoch_row {
397416
self.right_table.insert(row.clone());
398417
}
399-
self.right_table.commit(barrier.epoch).await?;
418+
self.right_table.commit(barrier.epoch, None).await?;
400419
} else {
401420
self.right_table.commit_no_data_expected(barrier.epoch);
402421
}
@@ -406,7 +425,9 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
406425
self.right_table.commit_no_data_expected(barrier.epoch);
407426
}
408427

409-
self.range_cache.flush(barrier.epoch).await?;
428+
self.range_cache
429+
.flush(barrier.epoch, flush_watermark.as_ref())
430+
.await?;
410431

411432
prev_epoch_value = Some(curr);
412433

src/stream/src/executor/global_simple_agg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
215215

216216
// Commit all state tables except for result table.
217217
futures::future::try_join_all(
218-
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)),
218+
iter_table_storage(storages).map(|state_table| state_table.commit(epoch, None)),
219219
)
220220
.await?;
221221

@@ -249,7 +249,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
249249
} else {
250250
result_table.insert(result_row);
251251
}
252-
result_table.commit(epoch).await?;
252+
result_table.commit(epoch, None).await?;
253253

254254
let columns = builders
255255
.into_iter()

src/stream/src/executor/hash_agg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,10 +467,10 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
467467

468468
// Commit all state tables.
469469
futures::future::try_join_all(
470-
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)),
470+
iter_table_storage(storages).map(|state_table| state_table.commit(epoch, None)),
471471
)
472472
.await?;
473-
result_table.commit(epoch).await?;
473+
result_table.commit(epoch, None).await?;
474474

475475
// Evict cache to target capacity.
476476
agg_groups.evict();

src/stream/src/executor/hash_join.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,9 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
636636
.with_label_values(&[&actor_id_str])
637637
.inc_by(start_time.elapsed().as_nanos() as u64);
638638
match msg? {
639+
AlignedMessage::WatermarkLeft(_) | AlignedMessage::WatermarkRight(_) => {
640+
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
641+
}
639642
AlignedMessage::Left(chunk) => {
640643
#[for_await]
641644
for chunk in Self::eq_join_oneside::<{ SideType::Left }>(

src/stream/src/executor/managed_state/dynamic_filter.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use risingwave_common::buffer::Bitmap;
2525
use risingwave_common::row::{CompactedRow, Row, Row2};
2626
use risingwave_common::types::{ScalarImpl, VIRTUAL_NODE_SIZE};
2727
use risingwave_common::util::epoch::EpochPair;
28+
use risingwave_storage::row_serde::row_serde_util::serialize_pk;
2829
use risingwave_storage::StateStore;
2930

3031
use crate::common::table::state_table::{prefix_range_to_memcomparable, StateTable};
@@ -223,10 +224,43 @@ impl<S: StateStore> RangeCache<S> {
223224
Ok(old_vnodes)
224225
}
225226

227+
pub fn shrink(&mut self, watermark: &ScalarImpl) {
228+
if let Some((range_lower, range_upper)) = self.range.as_mut() {
229+
let delete_old = match range_upper.as_ref() {
230+
Bound::Excluded(x) => x <= watermark,
231+
Bound::Included(x) => x < watermark,
232+
Bound::Unbounded => false,
233+
};
234+
if delete_old {
235+
self.cache = HashMap::new();
236+
self.range = None;
237+
} else {
238+
let need_cut = match range_lower.as_ref() {
239+
Bound::Excluded(x) | Bound::Included(x) => x < watermark,
240+
Bound::Unbounded => true,
241+
};
242+
if need_cut {
243+
let watermark_pk = serialize_pk(
244+
[Some(watermark.as_scalar_ref_impl())],
245+
self.state_table.pk_serde().prefix(1).as_ref(),
246+
);
247+
for cache in self.cache.values_mut() {
248+
*cache = cache.split_off(&watermark_pk);
249+
}
250+
*range_lower = Bound::Included(watermark.clone());
251+
}
252+
}
253+
}
254+
}
255+
226256
/// Flush writes to the `StateTable` from the in-memory buffer.
227-
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
257+
pub async fn flush(
258+
&mut self,
259+
epoch: EpochPair,
260+
watermark: Option<&ScalarImpl>,
261+
) -> StreamExecutorResult<()> {
228262
// self.metrics.flush();
229-
self.state_table.commit(epoch).await?;
263+
self.state_table.commit(epoch, watermark).await?;
230264
Ok(())
231265
}
232266
}

src/stream/src/executor/managed_state/join/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,8 +465,8 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
465465

466466
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
467467
self.metrics.flush();
468-
self.state.table.commit(epoch).await?;
469-
self.degree_state.table.commit(epoch).await?;
468+
self.state.table.commit(epoch, None).await?;
469+
self.degree_state.table.commit(epoch, None).await?;
470470
Ok(())
471471
}
472472

src/stream/src/executor/managed_state/top_n/top_n_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ impl<S: StateStore> ManagedTopNState<S> {
280280
}
281281

282282
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
283-
self.state_table.commit(epoch).await?;
283+
self.state_table.commit(epoch, None).await?;
284284
Ok(())
285285
}
286286
}

src/stream/src/executor/mview/materialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl<S: StateStore> MaterializeExecutor<S> {
204204
}
205205
}
206206
Message::Barrier(b) => {
207-
self.state_table.commit(b.epoch).await?;
207+
self.state_table.commit(b.epoch, None).await?;
208208

209209
// Update the vnode bitmap for the state table if asked.
210210
if let Some(vnode_bitmap) = b.as_update_vnode_bitmap(self.actor_context.id) {

0 commit comments

Comments
 (0)