Skip to content

Commit ea72ed7

Browse files
fix(watermark): fix watermark derivation in stream dynamic filter in … (risingwavelabs#8719)
1 parent 1b008f4 commit ea72ed7

File tree

10 files changed

+52
-25
lines changed

10 files changed

+52
-25
lines changed

src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl StreamDynamicFilter {
4444
let mut watermark_columns = FixedBitSet::with_capacity(left.schema().len());
4545
if right.watermark_columns()[0] {
4646
match comparator {
47-
ExprType::GreaterThan | ExprType::GreaterThanOrEqual => {
47+
ExprType::Equal | ExprType::GreaterThan | ExprType::GreaterThanOrEqual => {
4848
watermark_columns.set(left_index, true)
4949
}
5050
_ => {}

src/stream/src/common/table/state_table.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use risingwave_storage::StateStore;
4747
use tracing::trace;
4848

4949
use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy};
50+
use crate::cache::cache_may_stale;
5051
use crate::executor::{StreamExecutorError, StreamExecutorResult};
5152

5253
/// This num is arbitrary and we may want to improve this choice in the future.
@@ -577,7 +578,7 @@ where
577578

578579
/// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
579580
#[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
580-
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
581+
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> (Arc<Bitmap>, bool) {
581582
assert!(
582583
!self.is_dirty(),
583584
"vnode bitmap should only be updated when state table is clean"
@@ -590,9 +591,16 @@ where
590591
}
591592
assert_eq!(self.vnodes.len(), new_vnodes.len());
592593

593-
self.cur_watermark = None;
594+
let cache_may_stale = cache_may_stale(&self.vnodes, &new_vnodes);
594595

595-
std::mem::replace(&mut self.vnodes, new_vnodes)
596+
if cache_may_stale {
597+
self.cur_watermark = None;
598+
}
599+
600+
(
601+
std::mem::replace(&mut self.vnodes, new_vnodes),
602+
cache_may_stale,
603+
)
596604
}
597605
}
598606

src/stream/src/executor/dynamic_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ impl<S: StateStore> DynamicFilterExecutor<S> {
456456

457457
// Update the vnode bitmap for the left state table if asked.
458458
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
459-
let _previous_vnode_bitmap =
459+
let (_previous_vnode_bitmap, _cache_may_stale) =
460460
self.left_table.update_vnode_bitmap(vnode_bitmap);
461461
}
462462

src/stream/src/executor/hash_join.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,11 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
704704

705705
// Update the vnode bitmap for state tables of both sides if asked.
706706
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
707-
self.side_l.ht.update_vnode_bitmap(vnode_bitmap.clone());
707+
if self.side_l.ht.update_vnode_bitmap(vnode_bitmap.clone()) {
708+
self.watermark_buffers
709+
.values_mut()
710+
.for_each(|buffers| buffers.clear());
711+
}
708712
self.side_r.ht.update_vnode_bitmap(vnode_bitmap);
709713
}
710714

src/stream/src/executor/managed_state/join/mod.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use risingwave_common::util::sort_util::OrderType;
3636
use risingwave_storage::store::PrefetchOptions;
3737
use risingwave_storage::StateStore;
3838

39-
use crate::cache::{cache_may_stale, new_with_hasher_in, ExecutorCache};
39+
use crate::cache::{new_with_hasher_in, ExecutorCache};
4040
use crate::common::table::state_table::StateTable;
4141
use crate::executor::error::StreamExecutorResult;
4242
use crate::executor::monitor::StreamingMetrics;
@@ -311,16 +311,16 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
311311
}
312312

313313
/// Update the vnode bitmap and manipulate the cache if necessary.
314-
pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
315-
let previous_vnode_bitmap = self.state.table.update_vnode_bitmap(vnode_bitmap.clone());
316-
let _ = self
317-
.degree_state
318-
.table
319-
.update_vnode_bitmap(vnode_bitmap.clone());
320-
321-
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
314+
pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) -> bool {
315+
let (_previous_vnode_bitmap, cache_may_stale) =
316+
self.state.table.update_vnode_bitmap(vnode_bitmap.clone());
317+
let _ = self.degree_state.table.update_vnode_bitmap(vnode_bitmap);
318+
319+
if cache_may_stale {
322320
self.inner.clear();
323321
}
322+
323+
cache_may_stale
324324
}
325325

326326
pub fn update_watermark(&mut self, watermark: ScalarImpl) {

src/stream/src/executor/merge.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,21 @@ impl MergeExecutor {
144144
);
145145
barrier.passed_actors.push(actor_id);
146146

147+
if let Some(Mutation::Update { dispatchers, .. }) = barrier.mutation.as_deref()
148+
{
149+
if select_all
150+
.upstream_actor_ids()
151+
.iter()
152+
.any(|actor_id| dispatchers.contains_key(actor_id))
153+
{
154+
// `Watermark` of upstream may become stale after downstream scaling.
155+
select_all
156+
.buffered_watermarks
157+
.values_mut()
158+
.for_each(|buffers| buffers.clear());
159+
}
160+
}
161+
147162
if let Some(update) =
148163
barrier.as_update_merge(self.actor_context.id, self.upstream_fragment_id)
149164
{

src/stream/src/executor/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl<S: StateStore> SortExecutor<S> {
217217

218218
// Update the vnode bitmap for the state table if asked. Also update the buffer.
219219
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.context.id) {
220-
let prev_vnode_bitmap =
220+
let (prev_vnode_bitmap, _cache_may_stale) =
221221
self.state_table.update_vnode_bitmap(vnode_bitmap.clone());
222222
self.fill_buffer(Some(&prev_vnode_bitmap), &vnode_bitmap)
223223
.await?;

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use risingwave_storage::StateStore;
2828
use super::top_n_cache::TopNCacheTrait;
2929
use super::utils::*;
3030
use super::TopNCache;
31-
use crate::cache::{cache_may_stale, new_unbounded, ExecutorCache};
31+
use crate::cache::{new_unbounded, ExecutorCache};
3232
use crate::common::table::state_table::StateTable;
3333
use crate::error::StreamResult;
3434
use crate::executor::error::StreamExecutorResult;
@@ -222,12 +222,12 @@ where
222222
}
223223

224224
fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
225-
let previous_vnode_bitmap = self
225+
let (_previous_vnode_bitmap, cache_may_stale) = self
226226
.managed_state
227227
.state_table
228-
.update_vnode_bitmap(vnode_bitmap.clone());
228+
.update_vnode_bitmap(vnode_bitmap);
229229

230-
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
230+
if cache_may_stale {
231231
self.caches.clear();
232232
}
233233
}

src/stream/src/executor/top_n/group_top_n_appendonly.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use super::group_top_n::GroupTopNCache;
4242
use super::top_n_cache::AppendOnlyTopNCacheTrait;
4343
use super::utils::*;
4444
use super::TopNCache;
45-
use crate::cache::cache_may_stale;
4645
use crate::common::table::state_table::StateTable;
4746
use crate::error::StreamResult;
4847
use crate::executor::error::StreamExecutorResult;
@@ -207,12 +206,12 @@ where
207206
}
208207

209208
fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
210-
let previous_vnode_bitmap = self
209+
let (_previous_vnode_bitmap, cache_may_stale) = self
211210
.managed_state
212211
.state_table
213-
.update_vnode_bitmap(vnode_bitmap.clone());
212+
.update_vnode_bitmap(vnode_bitmap);
214213

215-
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
214+
if cache_may_stale {
216215
self.caches.clear();
217216
}
218217
}

src/stream/src/executor/watermark_filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
199199
Message::Barrier(barrier) => {
200200
// Update the vnode bitmap for state tables of all agg calls if asked.
201201
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) {
202-
let previous_vnode_bitmap = table.update_vnode_bitmap(vnode_bitmap.clone());
202+
let (previous_vnode_bitmap, _cache_may_stale) =
203+
table.update_vnode_bitmap(vnode_bitmap.clone());
203204

204205
// Take the global max watermark when scaling happens.
205206
if previous_vnode_bitmap != vnode_bitmap {

0 commit comments

Comments
 (0)