Skip to content

feat(streaming): handle watermark for mv on mv #7790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_storage::StateStore;

use super::error::StreamExecutorError;
use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message, PkIndicesRef};
use crate::executor::PkIndices;
use crate::executor::{PkIndices, Watermark};
use crate::task::{ActorId, CreateMviewProgress};

/// An implementation of the RFC: Use Backfill To Let Mv On Mv Stream Again.(https://github.com/risingwavelabs/rfcs/pull/13)
Expand Down Expand Up @@ -136,11 +136,11 @@ where

if !to_backfill {
// Forward messages directly to the downstream.
let upstream = upstream
.map(move |result| result.map(|msg| Self::mapping_message(msg, &upstream_indices)));
#[for_await]
for message in upstream {
yield message?;
if let Some(message) = Self::mapping_message(message?, &upstream_indices) {
yield message;
}
}

return Ok(());
Expand Down Expand Up @@ -232,7 +232,7 @@ where
upstream_chunk_buffer.push(chunk.compact());
}
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
// Ignore watermark during backfill.
}
}
}
Expand Down Expand Up @@ -283,15 +283,14 @@ where

// Backfill has already finished.
// Forward messages directly to the downstream.
let upstream = upstream
.map(move |result| result.map(|msg| Self::mapping_message(msg, &upstream_indices)));
#[for_await]
for msg in upstream {
let msg: Message = msg?;
if let Some(barrier) = msg.as_barrier() {
self.progress.finish(barrier.epoch.curr);
if let Some(msg) = Self::mapping_message(msg?, &upstream_indices) {
if let Some(barrier) = msg.as_barrier() {
self.progress.finish(barrier.epoch.curr);
}
yield msg;
}
yield msg;
}
}

Expand Down Expand Up @@ -379,10 +378,22 @@ where
StreamChunk::new(ops, mapped_columns, visibility)
}

fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Message {
fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
}

fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
match msg {
Message::Barrier(_) | Message::Watermark(_) => msg,
Message::Chunk(chunk) => Message::Chunk(Self::mapping_chunk(chunk, upstream_indices)),
Message::Barrier(_) => Some(msg),
Message::Watermark(watermark) => {
Self::mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
}
Message::Chunk(chunk) => {
Some(Message::Chunk(Self::mapping_chunk(chunk, upstream_indices)))
}
}
}
}
Expand Down
20 changes: 15 additions & 5 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::catalog::Schema;

use super::error::StreamExecutorError;
use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message};
use crate::executor::PkIndices;
use crate::executor::{PkIndices, Watermark};
use crate::task::{ActorId, CreateMviewProgress};

/// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and
Expand All @@ -43,7 +43,7 @@ pub struct ChainExecutor {
upstream_only: bool,
}

fn mapping(upstream_indices: &[usize], chunk: StreamChunk) -> StreamChunk {
fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk {
let (ops, columns, visibility) = chunk.into_inner();
let mapped_columns = upstream_indices
.iter()
Expand All @@ -52,6 +52,13 @@ fn mapping(upstream_indices: &[usize], chunk: StreamChunk) -> StreamChunk {
StreamChunk::new(ops, mapped_columns, visibility)
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
}

impl ChainExecutor {
pub fn new(
snapshot: BoxedExecutor,
Expand Down Expand Up @@ -114,11 +121,14 @@ impl ChainExecutor {
#[for_await]
for msg in upstream {
match msg? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
Message::Watermark(watermark) => {
match mapping_watermark(watermark, &self.upstream_indices) {
Some(mapped_watermark) => yield Message::Watermark(mapped_watermark),
None => continue,
}
}
Message::Chunk(chunk) => {
yield Message::Chunk(mapping(&self.upstream_indices, chunk));
yield Message::Chunk(mapping_chunk(chunk, &self.upstream_indices));
}
Message::Barrier(barrier) => {
self.progress.finish(barrier.epoch.curr);
Expand Down
62 changes: 42 additions & 20 deletions src/stream/src/executor/rearranged_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::error::StreamExecutorError;
use super::{
expect_first_barrier, Barrier, BoxedExecutor, Executor, ExecutorInfo, Message, MessageStream,
};
use crate::executor::PkIndices;
use crate::executor::{BoxedMessageStream, PkIndices, Watermark};
use crate::task::{ActorId, CreateMviewProgress};

/// `ChainExecutor` is an executor that enables synchronization between the existing stream and
Expand All @@ -49,35 +49,47 @@ pub struct RearrangedChainExecutor {
info: ExecutorInfo,
}

fn mapping(upstream_indices: &[usize], msg: Message) -> Message {
fn mapping(upstream_indices: &[usize], msg: Message) -> Option<Message> {
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
Message::Watermark(watermark) => {
mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
}

Message::Chunk(chunk) => {
let (ops, columns, visibility) = chunk.into_inner();
let mapped_columns = upstream_indices
.iter()
.map(|&i| columns[i].clone())
.collect();
Message::Chunk(StreamChunk::new(ops, mapped_columns, visibility))
Some(Message::Chunk(StreamChunk::new(
ops,
mapped_columns,
visibility,
)))
}
_ => msg,
Message::Barrier(_) => Some(msg),
}
}

fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
upstream_indices
.iter()
.position(|&idx| idx == watermark.col_idx)
.map(|idx| watermark.with_idx(idx))
}

#[derive(Debug)]
enum RearrangedMessage {
RearrangedBarrier(Barrier),
PhantomBarrier(Barrier),
Chunk(StreamChunk),
// This watermark is just a place holder.
Watermark,
}

impl RearrangedMessage {
fn phantom_into(self) -> Option<Message> {
match self {
RearrangedMessage::RearrangedBarrier(_) => None,
RearrangedMessage::RearrangedBarrier(_) | RearrangedMessage::Watermark => None,
RearrangedMessage::PhantomBarrier(barrier) => Message::Barrier(barrier).into(),
RearrangedMessage::Chunk(chunk) => Message::Chunk(chunk).into(),
}
Expand All @@ -87,21 +99,15 @@ impl RearrangedMessage {
impl RearrangedMessage {
fn rearranged_from(msg: Message) -> Self {
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

Message::Watermark(_) => RearrangedMessage::Watermark,
Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
Message::Barrier(barrier) => RearrangedMessage::RearrangedBarrier(barrier),
}
}

fn phantom_from(msg: Message) -> Self {
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

Message::Watermark(_) => RearrangedMessage::Watermark,
Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
Message::Barrier(barrier) => RearrangedMessage::PhantomBarrier(barrier),
}
Expand Down Expand Up @@ -135,10 +141,11 @@ impl RearrangedChainExecutor {
async fn execute_inner(mut self) {
// 0. Project the upstream with `upstream_indices`.
let upstream_indices = self.upstream_indices.clone();
let mut upstream = self
.upstream
.execute()
.map(move |result| result.map(|msg| mapping(&upstream_indices, msg)));

let mut upstream = Box::pin(Self::mapping_stream(
self.upstream.execute(),
&upstream_indices,
));

// 1. Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
Expand Down Expand Up @@ -219,6 +226,9 @@ impl RearrangedChainExecutor {
yield Message::Barrier(barrier);
}
RearrangedMessage::Chunk(chunk) => yield Message::Chunk(chunk),
RearrangedMessage::Watermark => {
// Ignore watermark during snapshot consumption.
}
}
}

Expand Down Expand Up @@ -310,6 +320,18 @@ impl RearrangedChainExecutor {
}
}
}

#[expect(clippy::needless_lifetimes, reason = "code generated by try_stream")]
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn mapping_stream(stream: BoxedMessageStream, upstream_indices: &[usize]) {
#[for_await]
for msg in stream {
match mapping(upstream_indices, msg?) {
Some(msg) => yield msg,
None => continue,
}
}
}
}

impl Executor for RearrangedChainExecutor {
Expand Down