Skip to content

Commit 85e450d

Browse files
authored
fix(streaming): map watermark in dispatcher with output indices (risingwavelabs#8506)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent cdaa8cf commit 85e450d

File tree

6 files changed

+42
-23
lines changed

6 files changed

+42
-23
lines changed

src/stream/src/executor/backfill.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -398,10 +398,7 @@ where
398398
}
399399

400400
fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
401-
upstream_indices
402-
.iter()
403-
.position(|&idx| idx == watermark.col_idx)
404-
.map(|idx| watermark.with_idx(idx))
401+
watermark.transform_with_indices(upstream_indices)
405402
}
406403

407404
fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {

src/stream/src/executor/chain.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,7 @@ fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk
5353
}
5454

5555
fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
56-
upstream_indices
57-
.iter()
58-
.position(|&idx| idx == watermark.col_idx)
59-
.map(|idx| watermark.with_idx(idx))
56+
watermark.transform_with_indices(upstream_indices)
6057
}
6158

6259
impl ChainExecutor {

src/stream/src/executor/dispatch.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -446,14 +446,20 @@ pub trait Dispatcher: Debug + 'static {
446446
#[derive(Debug)]
447447
pub struct RoundRobinDataDispatcher {
448448
outputs: Vec<BoxedOutput>,
449+
output_indices: Vec<usize>,
449450
cur: usize,
450451
dispatcher_id: DispatcherId,
451452
}
452453

453454
impl RoundRobinDataDispatcher {
454-
pub fn new(outputs: Vec<BoxedOutput>, dispatcher_id: DispatcherId) -> Self {
455+
pub fn new(
456+
outputs: Vec<BoxedOutput>,
457+
output_indices: Vec<usize>,
458+
dispatcher_id: DispatcherId,
459+
) -> Self {
455460
Self {
456461
outputs,
462+
output_indices,
457463
cur: 0,
458464
dispatcher_id,
459465
}
@@ -465,6 +471,7 @@ impl Dispatcher for RoundRobinDataDispatcher {
465471

466472
fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> {
467473
async move {
474+
let chunk = chunk.reorder_columns(&self.output_indices);
468475
self.outputs[self.cur].send(Message::Chunk(chunk)).await?;
469476
self.cur += 1;
470477
self.cur %= self.outputs.len();
@@ -484,9 +491,11 @@ impl Dispatcher for RoundRobinDataDispatcher {
484491

485492
fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> {
486493
async move {
487-
// always broadcast watermark
488-
for output in &mut self.outputs {
489-
output.send(Message::Watermark(watermark.clone())).await?;
494+
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
495+
// always broadcast watermark
496+
for output in &mut self.outputs {
497+
output.send(Message::Watermark(watermark.clone())).await?;
498+
}
490499
}
491500
Ok(())
492501
}
@@ -569,9 +578,11 @@ impl Dispatcher for HashDataDispatcher {
569578

570579
fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> {
571580
async move {
572-
// always broadcast watermark
573-
for output in &mut self.outputs {
574-
output.send(Message::Watermark(watermark.clone())).await?;
581+
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
582+
// always broadcast watermark
583+
for output in &mut self.outputs {
584+
output.send(Message::Watermark(watermark.clone())).await?;
585+
}
575586
}
576587
Ok(())
577588
}
@@ -751,8 +762,11 @@ impl Dispatcher for BroadcastDispatcher {
751762

752763
fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> {
753764
async move {
754-
for output in self.outputs.values_mut() {
755-
output.send(Message::Watermark(watermark.clone())).await?;
765+
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
766+
// always broadcast watermark
767+
for output in self.outputs.values_mut() {
768+
output.send(Message::Watermark(watermark.clone())).await?;
769+
}
756770
}
757771
Ok(())
758772
}
@@ -851,7 +865,10 @@ impl Dispatcher for SimpleDispatcher {
851865
.exactly_one()
852866
.expect("expect exactly one output");
853867

854-
output.send(Message::Watermark(watermark)).await
868+
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
869+
output.send(Message::Watermark(watermark)).await?;
870+
}
871+
Ok(())
855872
}
856873
}
857874

src/stream/src/executor/integration_tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ async fn test_merger_sum_aggr() {
124124
let dispatcher = DispatchExecutor::new(
125125
receiver_op,
126126
vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new(
127-
inputs, 0,
127+
inputs,
128+
vec![0],
129+
0,
128130
))],
129131
0,
130132
ctx,

src/stream/src/executor/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,15 @@ impl Watermark {
615615
})
616616
}
617617

618+
/// Transform the watermark with the given output indices. If this watermark is not in the
619+
/// output, return `None`.
620+
pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
621+
output_indices
622+
.iter()
623+
.position(|p| *p == self.col_idx)
624+
.map(|new_col_idx| self.with_idx(new_col_idx))
625+
}
626+
618627
pub fn to_protobuf(&self) -> ProstWatermark {
619628
ProstWatermark {
620629
column: Some(ProstInputRef {

src/stream/src/executor/rearranged_chain.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,7 @@ fn mapping(upstream_indices: &[usize], msg: Message) -> Option<Message> {
7171
}
7272

7373
fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
74-
upstream_indices
75-
.iter()
76-
.position(|&idx| idx == watermark.col_idx)
77-
.map(|idx| watermark.with_idx(idx))
74+
watermark.transform_with_indices(upstream_indices)
7875
}
7976

8077
#[derive(Debug)]

0 commit comments

Comments
 (0)