Skip to content

Commit c68a581

Browse files
authored
feat(streaming): handle watermark for mv on mv (#7790)
- Ignore watermark during snapshot consumption and forward watermark messages to downstream after that. Approved-By: BugenZhao
1 parent 8fb2d3f commit c68a581

File tree

3 files changed

+82
-39
lines changed

3 files changed

+82
-39
lines changed

src/stream/src/executor/backfill.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use risingwave_storage::StateStore;
3333

3434
use super::error::StreamExecutorError;
3535
use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message, PkIndicesRef};
36-
use crate::executor::PkIndices;
36+
use crate::executor::{PkIndices, Watermark};
3737
use crate::task::{ActorId, CreateMviewProgress};
3838

3939
/// An implementation of the RFC: Use Backfill To Let Mv On Mv Stream Again.(https://github.com/risingwavelabs/rfcs/pull/13)
@@ -136,11 +136,11 @@ where
136136

137137
if !to_backfill {
138138
// Forward messages directly to the downstream.
139-
let upstream = upstream
140-
.map(move |result| result.map(|msg| Self::mapping_message(msg, &upstream_indices)));
141139
#[for_await]
142140
for message in upstream {
143-
yield message?;
141+
if let Some(message) = Self::mapping_message(message?, &upstream_indices) {
142+
yield message;
143+
}
144144
}
145145

146146
return Ok(());
@@ -232,7 +232,7 @@ where
232232
upstream_chunk_buffer.push(chunk.compact());
233233
}
234234
Message::Watermark(_) => {
235-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
235+
// Ignore watermark during backfill.
236236
}
237237
}
238238
}
@@ -283,15 +283,14 @@ where
283283

284284
// Backfill has already finished.
285285
// Forward messages directly to the downstream.
286-
let upstream = upstream
287-
.map(move |result| result.map(|msg| Self::mapping_message(msg, &upstream_indices)));
288286
#[for_await]
289287
for msg in upstream {
290-
let msg: Message = msg?;
291-
if let Some(barrier) = msg.as_barrier() {
292-
self.progress.finish(barrier.epoch.curr);
288+
if let Some(msg) = Self::mapping_message(msg?, &upstream_indices) {
289+
if let Some(barrier) = msg.as_barrier() {
290+
self.progress.finish(barrier.epoch.curr);
291+
}
292+
yield msg;
293293
}
294-
yield msg;
295294
}
296295
}
297296

@@ -379,10 +378,22 @@ where
379378
StreamChunk::new(ops, mapped_columns, visibility)
380379
}
381380

382-
fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Message {
381+
fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
382+
upstream_indices
383+
.iter()
384+
.position(|&idx| idx == watermark.col_idx)
385+
.map(|idx| watermark.with_idx(idx))
386+
}
387+
388+
fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
383389
match msg {
384-
Message::Barrier(_) | Message::Watermark(_) => msg,
385-
Message::Chunk(chunk) => Message::Chunk(Self::mapping_chunk(chunk, upstream_indices)),
390+
Message::Barrier(_) => Some(msg),
391+
Message::Watermark(watermark) => {
392+
Self::mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
393+
}
394+
Message::Chunk(chunk) => {
395+
Some(Message::Chunk(Self::mapping_chunk(chunk, upstream_indices)))
396+
}
386397
}
387398
}
388399
}

src/stream/src/executor/chain.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use risingwave_common::catalog::Schema;
1919

2020
use super::error::StreamExecutorError;
2121
use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message};
22-
use crate::executor::PkIndices;
22+
use crate::executor::{PkIndices, Watermark};
2323
use crate::task::{ActorId, CreateMviewProgress};
2424

2525
/// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and
@@ -43,7 +43,7 @@ pub struct ChainExecutor {
4343
upstream_only: bool,
4444
}
4545

46-
fn mapping(upstream_indices: &[usize], chunk: StreamChunk) -> StreamChunk {
46+
fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk {
4747
let (ops, columns, visibility) = chunk.into_inner();
4848
let mapped_columns = upstream_indices
4949
.iter()
@@ -52,6 +52,13 @@ fn mapping(upstream_indices: &[usize], chunk: StreamChunk) -> StreamChunk {
5252
StreamChunk::new(ops, mapped_columns, visibility)
5353
}
5454

55+
fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
56+
upstream_indices
57+
.iter()
58+
.position(|&idx| idx == watermark.col_idx)
59+
.map(|idx| watermark.with_idx(idx))
60+
}
61+
5562
impl ChainExecutor {
5663
pub fn new(
5764
snapshot: BoxedExecutor,
@@ -114,11 +121,14 @@ impl ChainExecutor {
114121
#[for_await]
115122
for msg in upstream {
116123
match msg? {
117-
Message::Watermark(_) => {
118-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
124+
Message::Watermark(watermark) => {
125+
match mapping_watermark(watermark, &self.upstream_indices) {
126+
Some(mapped_watermark) => yield Message::Watermark(mapped_watermark),
127+
None => continue,
128+
}
119129
}
120130
Message::Chunk(chunk) => {
121-
yield Message::Chunk(mapping(&self.upstream_indices, chunk));
131+
yield Message::Chunk(mapping_chunk(chunk, &self.upstream_indices));
122132
}
123133
Message::Barrier(barrier) => {
124134
self.progress.finish(barrier.epoch.curr);

src/stream/src/executor/rearranged_chain.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use super::error::StreamExecutorError;
2525
use super::{
2626
expect_first_barrier, Barrier, BoxedExecutor, Executor, ExecutorInfo, Message, MessageStream,
2727
};
28-
use crate::executor::PkIndices;
28+
use crate::executor::{BoxedMessageStream, PkIndices, Watermark};
2929
use crate::task::{ActorId, CreateMviewProgress};
3030

3131
/// `ChainExecutor` is an executor that enables synchronization between the existing stream and
@@ -49,35 +49,47 @@ pub struct RearrangedChainExecutor {
4949
info: ExecutorInfo,
5050
}
5151

52-
fn mapping(upstream_indices: &[usize], msg: Message) -> Message {
52+
fn mapping(upstream_indices: &[usize], msg: Message) -> Option<Message> {
5353
match msg {
54-
Message::Watermark(_) => {
55-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
54+
Message::Watermark(watermark) => {
55+
mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
5656
}
57-
5857
Message::Chunk(chunk) => {
5958
let (ops, columns, visibility) = chunk.into_inner();
6059
let mapped_columns = upstream_indices
6160
.iter()
6261
.map(|&i| columns[i].clone())
6362
.collect();
64-
Message::Chunk(StreamChunk::new(ops, mapped_columns, visibility))
63+
Some(Message::Chunk(StreamChunk::new(
64+
ops,
65+
mapped_columns,
66+
visibility,
67+
)))
6568
}
66-
_ => msg,
69+
Message::Barrier(_) => Some(msg),
6770
}
6871
}
6972

73+
fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
74+
upstream_indices
75+
.iter()
76+
.position(|&idx| idx == watermark.col_idx)
77+
.map(|idx| watermark.with_idx(idx))
78+
}
79+
7080
#[derive(Debug)]
7181
enum RearrangedMessage {
7282
RearrangedBarrier(Barrier),
7383
PhantomBarrier(Barrier),
7484
Chunk(StreamChunk),
85+
// This watermark is just a place holder.
86+
Watermark,
7587
}
7688

7789
impl RearrangedMessage {
7890
fn phantom_into(self) -> Option<Message> {
7991
match self {
80-
RearrangedMessage::RearrangedBarrier(_) => None,
92+
RearrangedMessage::RearrangedBarrier(_) | RearrangedMessage::Watermark => None,
8193
RearrangedMessage::PhantomBarrier(barrier) => Message::Barrier(barrier).into(),
8294
RearrangedMessage::Chunk(chunk) => Message::Chunk(chunk).into(),
8395
}
@@ -87,21 +99,15 @@ impl RearrangedMessage {
8799
impl RearrangedMessage {
88100
fn rearranged_from(msg: Message) -> Self {
89101
match msg {
90-
Message::Watermark(_) => {
91-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
92-
}
93-
102+
Message::Watermark(_) => RearrangedMessage::Watermark,
94103
Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
95104
Message::Barrier(barrier) => RearrangedMessage::RearrangedBarrier(barrier),
96105
}
97106
}
98107

99108
fn phantom_from(msg: Message) -> Self {
100109
match msg {
101-
Message::Watermark(_) => {
102-
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
103-
}
104-
110+
Message::Watermark(_) => RearrangedMessage::Watermark,
105111
Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
106112
Message::Barrier(barrier) => RearrangedMessage::PhantomBarrier(barrier),
107113
}
@@ -135,10 +141,11 @@ impl RearrangedChainExecutor {
135141
async fn execute_inner(mut self) {
136142
// 0. Project the upstream with `upstream_indices`.
137143
let upstream_indices = self.upstream_indices.clone();
138-
let mut upstream = self
139-
.upstream
140-
.execute()
141-
.map(move |result| result.map(|msg| mapping(&upstream_indices, msg)));
144+
145+
let mut upstream = Box::pin(Self::mapping_stream(
146+
self.upstream.execute(),
147+
&upstream_indices,
148+
));
142149

143150
// 1. Poll the upstream to get the first barrier.
144151
let first_barrier = expect_first_barrier(&mut upstream).await?;
@@ -219,6 +226,9 @@ impl RearrangedChainExecutor {
219226
yield Message::Barrier(barrier);
220227
}
221228
RearrangedMessage::Chunk(chunk) => yield Message::Chunk(chunk),
229+
RearrangedMessage::Watermark => {
230+
// Ignore watermark during snapshot consumption.
231+
}
222232
}
223233
}
224234

@@ -310,6 +320,18 @@ impl RearrangedChainExecutor {
310320
}
311321
}
312322
}
323+
324+
#[expect(clippy::needless_lifetimes, reason = "code generated by try_stream")]
325+
#[try_stream(ok = Message, error = StreamExecutorError)]
326+
async fn mapping_stream(stream: BoxedMessageStream, upstream_indices: &[usize]) {
327+
#[for_await]
328+
for msg in stream {
329+
match mapping(upstream_indices, msg?) {
330+
Some(msg) => yield msg,
331+
None => continue,
332+
}
333+
}
334+
}
313335
}
314336

315337
impl Executor for RearrangedChainExecutor {

0 commit comments

Comments
 (0)