Skip to content

Commit 7bac239

Browse files
authored
feat(meta): support scaling delta join (risingwavelabs#8694)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 837d43a commit 7bac239

File tree

13 files changed

+209
-53
lines changed

13 files changed

+209
-53
lines changed

src/meta/src/barrier/command.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ where
443443
dropped_actors,
444444
actor_splits,
445445
});
446-
tracing::trace!("update mutation: {mutation:#?}");
446+
tracing::debug!("update mutation: {mutation:#?}");
447447
Some(mutation)
448448
}
449449
};

src/meta/src/stream/scale.rs

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,6 @@ where
432432
// treatment because the upstream and downstream of NoShuffle are always 1-1
433433
// correspondence, so we need to clone the reschedule plan to the downstream of all
434434
// cascading relations.
435-
//
436-
// Delta join will introduce a `NoShuffle` edge between index chain node and lookup node
437-
// (index_mv --NoShuffle--> index_chain --NoShuffle--> lookup) which will break current
438-
// `NoShuffle` scaling assumption. Currently we detect this case and forbid it to scale.
439435
if no_shuffle_source_fragment_ids.contains(fragment_id) {
440436
let mut queue: VecDeque<_> = fragment_dispatcher_map
441437
.get(fragment_id)
@@ -451,21 +447,12 @@ where
451447

452448
if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
453449
{
454-
// If `NoShuffle` used by other fragment type rather than `ChainNode`, bail.
455-
for downstream_fragment_id in downstream_fragments.keys() {
456-
let downstream_fragment = fragment_map
457-
.get(downstream_fragment_id)
458-
.ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
459-
if (downstream_fragment.get_fragment_type_mask()
460-
& (FragmentTypeFlag::ChainNode as u32
461-
| FragmentTypeFlag::Mview as u32))
462-
== 0
463-
{
464-
bail!("Rescheduling NoShuffle edge only supports ChainNode and Mview. Other usage for e.g. delta join is forbidden currently.");
465-
}
466-
}
450+
let no_shuffle_downstreams = downstream_fragments
451+
.iter()
452+
.filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
453+
.map(|(fragment_id, _)| fragment_id);
467454

468-
queue.extend(downstream_fragments.keys().cloned());
455+
queue.extend(no_shuffle_downstreams.copied());
469456
}
470457

471458
no_shuffle_reschedule.insert(
@@ -743,7 +730,12 @@ where
743730
.unwrap();
744731

745732
if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
746-
for downstream_fragment_id in downstream_fragments.keys() {
733+
let no_shuffle_downstreams = downstream_fragments
734+
.iter()
735+
.filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
736+
.map(|(fragment_id, _)| fragment_id);
737+
738+
for downstream_fragment_id in no_shuffle_downstreams {
747739
arrange_no_shuffle_relation(
748740
ctx,
749741
downstream_fragment_id,
@@ -1014,20 +1006,19 @@ where
10141006
}
10151007
}
10161008

1017-
let downstream_fragment_ids =
1018-
if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(&fragment_id) {
1019-
// Skip NoShuffle fragments' downstream
1020-
if ctx
1021-
.no_shuffle_source_fragment_ids
1022-
.contains(&fragment.fragment_id)
1023-
{
1024-
vec![]
1025-
} else {
1026-
downstream_fragments.keys().copied().collect_vec()
1027-
}
1028-
} else {
1029-
vec![]
1030-
};
1009+
let downstream_fragment_ids = if let Some(downstream_fragments) =
1010+
ctx.fragment_dispatcher_map.get(&fragment_id)
1011+
{
1012+
// Skip fragments' no-shuffle downstream, as there's no need to update the merger
1013+
// (receiver) of a no-shuffle downstream
1014+
downstream_fragments
1015+
.iter()
1016+
.filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1017+
.map(|(fragment_id, _)| *fragment_id)
1018+
.collect_vec()
1019+
} else {
1020+
vec![]
1021+
};
10311022

10321023
let vnode_bitmap_updates = match fragment.distribution_type() {
10331024
FragmentDistributionType::Hash => {
@@ -1123,7 +1114,7 @@ where
11231114

11241115
let _source_pause_guard = self.source_manager.paused.lock().await;
11251116

1126-
tracing::trace!("reschedule plan: {:#?}", reschedule_fragment);
1117+
tracing::debug!("reschedule plan: {:#?}", reschedule_fragment);
11271118

11281119
self.barrier_scheduler
11291120
.run_command_with_paused(Command::RescheduleFragment(reschedule_fragment))

src/storage/src/table/batch_table/storage_table.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,13 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
379379
Ok(None)
380380
}
381381
}
382+
383+
/// Update the vnode bitmap of the storage table, returns the previous vnode bitmap.
384+
#[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
385+
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
386+
assert_eq!(self.vnodes.len(), new_vnodes.len());
387+
std::mem::replace(&mut self.vnodes, new_vnodes)
388+
}
382389
}
383390

384391
pub trait PkAndRowStream = Stream<Item = StorageResult<(Vec<u8>, OwnedRow)>> + Send;

src/stream/src/executor/chain.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ impl ChainExecutor {
7676
// Otherwise, it means we've recovered and the snapshot is already consumed.
7777
let to_consume_snapshot = barrier.is_add_dispatcher(self.actor_id) && !self.upstream_only;
7878

79-
if self.upstream_only {
79+
// If the barrier is a conf change of creating this mview, and the snapshot is not to be
80+
// consumed, we can finish the progress immediately.
81+
if barrier.is_add_dispatcher(self.actor_id) && self.upstream_only {
8082
self.progress.finish(barrier.epoch.curr);
8183
}
8284

@@ -100,7 +102,7 @@ impl ChainExecutor {
100102
#[for_await]
101103
for msg in upstream {
102104
let msg = msg?;
103-
if let Message::Barrier(barrier) = &msg {
105+
if to_consume_snapshot && let Message::Barrier(barrier) = &msg {
104106
self.progress.finish(barrier.epoch.curr);
105107
}
106108
yield msg;

src/stream/src/executor/lookup.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ mod impl_;
2828

2929
pub use impl_::LookupExecutorParams;
3030

31+
use super::ActorContextRef;
32+
3133
#[cfg(test)]
3234
mod tests;
3335

@@ -38,6 +40,8 @@ mod tests;
3840
/// The output schema is `| stream columns | arrangement columns |`.
3941
/// The input is required to be first stream and then arrangement.
4042
pub struct LookupExecutor<S: StateStore> {
43+
ctx: ActorContextRef,
44+
4145
/// the data types of the produced data chunk inside lookup (before reordering)
4246
chunk_data_types: Vec<DataType>,
4347

src/stream/src/executor/lookup/cache.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ impl LookupCache {
6464
self.data.update_epoch(epoch);
6565
}
6666

67+
/// Clear the cache.
68+
pub fn clear(&mut self) {
69+
self.data.clear();
70+
}
71+
6772
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
6873
let cache = ExecutorCache::new(new_unbounded(watermark_epoch));
6974
Self { data: cache }

src/stream/src/executor/lookup/impl_.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,19 @@ use risingwave_storage::table::TableIter;
2828
use risingwave_storage::StateStore;
2929

3030
use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
31+
use crate::cache::cache_may_stale;
3132
use crate::common::StreamChunkBuilder;
3233
use crate::executor::error::{StreamExecutorError, StreamExecutorResult};
3334
use crate::executor::lookup::cache::LookupCache;
3435
use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide};
3536
use crate::executor::lookup::LookupExecutor;
36-
use crate::executor::{Barrier, Executor, Message, PkIndices};
37+
use crate::executor::{ActorContextRef, Barrier, Executor, Message, PkIndices};
3738
use crate::task::AtomicU64Ref;
3839

3940
/// Parameters for [`LookupExecutor`].
4041
pub struct LookupExecutorParams<S: StateStore> {
42+
pub ctx: ActorContextRef,
43+
4144
/// The side for arrangement. Currently, it should be a
4245
/// `MaterializeExecutor`.
4346
pub arrangement: Box<dyn Executor>,
@@ -116,6 +119,7 @@ pub struct LookupExecutorParams<S: StateStore> {
116119
impl<S: StateStore> LookupExecutor<S> {
117120
pub fn new(params: LookupExecutorParams<S>) -> Self {
118121
let LookupExecutorParams {
122+
ctx,
119123
arrangement,
120124
stream,
121125
arrangement_col_descs,
@@ -202,6 +206,7 @@ impl<S: StateStore> LookupExecutor<S> {
202206
);
203207

204208
Self {
209+
ctx,
205210
chunk_data_types,
206211
schema: output_schema,
207212
pk_indices,
@@ -273,10 +278,8 @@ impl<S: StateStore> LookupExecutor<S> {
273278
self.lookup_cache.flush();
274279
}
275280

276-
// Use the new stream barrier epoch as new cache epoch
277-
self.lookup_cache.update_epoch(barrier.epoch.curr);
281+
self.process_barrier(&barrier);
278282

279-
self.process_barrier(barrier.clone()).await?;
280283
if self.arrangement.use_current_epoch {
281284
// When lookup this epoch, stream side barrier always come after arrangement
282285
// ready, so we can forward barrier now.
@@ -336,11 +339,23 @@ impl<S: StateStore> LookupExecutor<S> {
336339
}
337340
}
338341

339-
/// Store the barrier.
340-
#[expect(clippy::unused_async)]
341-
async fn process_barrier(&mut self, barrier: Barrier) -> StreamExecutorResult<()> {
342-
self.last_barrier = Some(barrier);
343-
Ok(())
342+
/// Process the barrier and apply changes if necessary.
343+
fn process_barrier(&mut self, barrier: &Barrier) {
344+
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
345+
let previous_vnode_bitmap = self
346+
.arrangement
347+
.storage_table
348+
.update_vnode_bitmap(vnode_bitmap.clone());
349+
350+
// Manipulate the cache if necessary.
351+
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
352+
self.lookup_cache.clear();
353+
}
354+
}
355+
356+
// Use the new stream barrier epoch as new cache epoch
357+
self.lookup_cache.update_epoch(barrier.epoch.curr);
358+
self.last_barrier = Some(barrier.clone());
344359
}
345360

346361
/// Lookup all rows corresponding to a join key in shared buffer.

src/stream/src/executor/lookup/tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::executor::lookup::impl_::LookupExecutorParams;
3030
use crate::executor::lookup::LookupExecutor;
3131
use crate::executor::test_utils::*;
3232
use crate::executor::{
33-
Barrier, BoxedMessageStream, Executor, MaterializeExecutor, Message, PkIndices,
33+
ActorContext, Barrier, BoxedMessageStream, Executor, MaterializeExecutor, Message, PkIndices,
3434
};
3535

3636
fn arrangement_col_descs() -> Vec<ColumnDesc> {
@@ -218,6 +218,7 @@ async fn test_lookup_this_epoch() {
218218
let arrangement = create_arrangement(table_id, store.clone()).await;
219219
let stream = create_source();
220220
let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams {
221+
ctx: ActorContext::create(0),
221222
arrangement,
222223
stream,
223224
arrangement_col_descs: arrangement_col_descs(),
@@ -281,14 +282,13 @@ async fn test_lookup_this_epoch() {
281282
}
282283

283284
#[tokio::test]
284-
#[ignore]
285-
// Deprecated because the ability to read from prev epoch has been deprecated.
286285
async fn test_lookup_last_epoch() {
287286
let store = MemoryStateStore::new();
288287
let table_id = TableId::new(1);
289288
let arrangement = create_arrangement(table_id, store.clone()).await;
290289
let stream = create_source();
291290
let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams {
291+
ctx: ActorContext::create(0),
292292
arrangement,
293293
stream,
294294
arrangement_col_descs: arrangement_col_descs(),

src/stream/src/from_proto/lookup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ impl ExecutorBuilder for LookupExecutorBuilder {
125125
);
126126

127127
Ok(Box::new(LookupExecutor::new(LookupExecutorParams {
128+
ctx: params.actor_context,
128129
schema: params.schema,
129130
arrangement,
130131
stream,

src/tests/simulation/src/ctl_ext.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,11 @@ pub mod predicate {
9696

9797
/// The fragment is able to be rescheduled. Used for locating random fragment.
9898
pub fn can_reschedule() -> BoxedPredicate {
99-
// The rescheduling of `Chain` must be derived from the upstream `Materialize`, not
100-
// specified by the user.
101-
no_identity_contains("StreamTableScan")
99+
// The rescheduling of no-shuffle downstreams must be derived from the upstream
100+
// `Materialize`, not specified by the user.
101+
let p =
102+
|f: &PbFragment| no_identity_contains("Chain")(f) && no_identity_contains("Lookup")(f);
103+
Box::new(p)
102104
}
103105

104106
/// The fragment with the given id.

src/tests/simulation/src/risingwave.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,7 @@
44

55
[system]
66
barrier_interval_ms = 250
7-
checkpoint_frequency = 4
7+
checkpoint_frequency = 4
8+
9+
[server]
10+
telemetry_enabled = false

0 commit comments

Comments
 (0)