Skip to content

feat: introduce watermark on source #7750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 9, 2023
55 changes: 54 additions & 1 deletion dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 10 additions & 15 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ message ColumnIndex {
uint64 index = 1;
}

message WatermarkDesc {
// The column idx the watermark is on
ColumnIndex watermark_idx = 1;
// The expression to calculate the watermark value.
expr.ExprNode expr = 2;
}

message StreamSourceInfo {
plan_common.RowFormatType row_format = 1;
string row_schema_location = 2;
Expand Down Expand Up @@ -44,6 +51,8 @@ message Source {
uint32 owner = 9;

StreamSourceInfo info = 10;
// Define watermarks on source.
repeated WatermarkDesc watermark_descs = 11;
}

message Sink {
Expand Down
8 changes: 3 additions & 5 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,10 @@ message LookupNode {

// WatermarkFilter needs to filter the upstream data by the water mark.
message WatermarkFilterNode {
// The expression to calculate the watermark value.
expr.ExprNode watermark_expr = 1;
// The column the event time belongs.
uint64 event_time_col_idx = 2;
// The watermark desc
catalog.WatermarkDesc watermark_desc = 1;
// The table used to persist watermark, the key is vnode.
catalog.Table table = 3;
catalog.Table table = 2;
}

// Acts like a merger, but on different inputs.
Expand Down
44 changes: 44 additions & 0 deletions src/frontend/src/binder/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::Field;
use risingwave_common::error::Result;

use crate::catalog::column_catalog::ColumnCatalog;
use crate::Binder;

impl Binder {
pub fn bind_column_defs(
&mut self,
name: String,
column_catalogs: Vec<ColumnCatalog>,
) -> Result<()> {
let columns = column_catalogs
.iter()
.map(|c| (c.is_hidden, Field::from(&c.column_desc)))
.collect_vec();
self.bind_table_to_context(columns, name, None)
}

pub fn get_column_binding_index(
&mut self,
table_name: String,
column_name: &String,
) -> Result<usize> {
Ok(self
.context
.get_column_binding_index(&Some(table_name), column_name)?)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod subquery;
mod value;

impl Binder {
pub(super) fn bind_expr(&mut self, expr: Expr) -> Result<ExprImpl> {
pub fn bind_expr(&mut self, expr: Expr) -> Result<ExprImpl> {
match expr {
// literal
Expr::Value(v) => Ok(ExprImpl::Literal(Box::new(self.bind_value(v)?))),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::session_config::SearchPath;
use risingwave_sqlparser::ast::Statement;

mod bind_context;
mod create;
mod delete;
mod expr;
mod insert;
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo};
use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo, WatermarkDesc};

use super::column_catalog::ColumnCatalog;
use super::{ColumnId, RelationCatalog, SourceId};
Expand All @@ -34,6 +34,7 @@ pub struct SourceCatalog {
pub info: StreamSourceInfo,
pub row_id_index: Option<usize>,
pub properties: HashMap<String, String>,
pub watermark_descs: Vec<WatermarkDesc>,
}

impl From<&ProstSource> for SourceCatalog {
Expand All @@ -56,6 +57,7 @@ impl From<&ProstSource> for SourceCatalog {

let append_only = row_id_index.is_some();
let owner = prost.owner;
let watermark_descs = prost.get_watermark_descs().clone();

Self {
id,
Expand All @@ -67,6 +69,7 @@ impl From<&ProstSource> for SourceCatalog {
info: prost.info.clone().unwrap(),
row_id_index,
properties: with_options.into_inner(),
watermark_descs,
}
}
}
Expand Down
Loading