Skip to content

feat(streaming): introduce watermark filter #6224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 8, 2022
Merged

Conversation

yuhao-su
Copy link
Contributor

@yuhao-su yuhao-su commented Nov 7, 2022

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

The executor will generate a Watermark after each chunk.
This will also guarantee all later rows with event time less than the watermark will be filtered.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Refer to a related PR or issue link (optional)

close #6046

@github-actions github-actions bot added the type/feature Type: New feature. label Nov 7, 2022
Comment on lines 329 to 352
pub fn min(&self) -> Datum {
match self {
DataType::Int16 => Some(ScalarImpl::Int16(i16::MIN)),
DataType::Int32 => Some(ScalarImpl::Int32(i32::MIN)),
DataType::Int64 => Some(ScalarImpl::Int64(i64::MIN)),
DataType::Float32 => Some(ScalarImpl::Float32(OrderedF32::neg_infinity())),
DataType::Float64 => Some(ScalarImpl::Float64(OrderedF64::neg_infinity())),
DataType::Boolean => Some(ScalarImpl::Bool(false)),
DataType::Varchar => Some(ScalarImpl::Utf8("".to_string())),
DataType::Date => Some(ScalarImpl::NaiveDate(NaiveDateWrapper(NaiveDate::MIN))),
DataType::Time => Some(ScalarImpl::NaiveTime(NaiveTimeWrapper(
NaiveTime::from_hms(0, 0, 0),
))),
DataType::Timestamp => Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper(
NaiveDateTime::MIN,
))),
// FIXME(yuhao): Add a timestampz scalar.
DataType::Timestampz => Some(ScalarImpl::Int64(i64::MIN)),
DataType::Decimal => Some(ScalarImpl::Decimal(Decimal::NegativeInf)),
DataType::Interval => Some(ScalarImpl::Interval(IntervalUnit::min())),
DataType::Struct { .. } => Some(ScalarImpl::Struct(StructValue::new(vec![]))),
DataType::List { .. } => Some(ScalarImpl::List(ListValue::new(vec![]))),
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to recursively call this function to match the schema of Struct.

@codecov
Copy link

codecov bot commented Nov 7, 2022

Codecov Report

Merging #6224 (85a833a) into main (7b57fbf) will increase coverage by 0.03%.
The diff coverage is 78.94%.

@@            Coverage Diff             @@
##             main    #6224      +/-   ##
==========================================
+ Coverage   74.56%   74.60%   +0.03%     
==========================================
  Files         945      947       +2     
  Lines      152769   153125     +356     
==========================================
+ Hits       113914   114238     +324     
- Misses      38855    38887      +32     
Flag Coverage Δ
rust 74.60% <78.94%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/common/src/array/data_chunk_iter.rs 83.89% <0.00%> (-0.86%) ⬇️
src/frontend/src/handler/explain.rs 52.42% <0.00%> (-2.68%) ⬇️
src/meta/src/barrier/recovery.rs 70.33% <ø> (-0.13%) ⬇️
src/meta/src/hummock/compaction_group/mod.rs 100.00% <ø> (ø)
src/meta/src/hummock/compaction_scheduler.rs 82.08% <ø> (ø)
src/meta/src/hummock/manager/compaction.rs 91.27% <0.00%> (+0.60%) ⬆️
src/meta/src/rpc/service/hummock_service.rs 3.20% <0.00%> (-0.44%) ⬇️
src/meta/src/rpc/service/notification_service.rs 15.15% <ø> (+0.15%) ⬆️
src/stream/src/executor/simple.rs 30.76% <ø> (ø)
src/stream/src/from_proto/mod.rs 0.00% <ø> (ø)
... and 31 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

async fn get_global_max_watermark(
table: &StateTable<S>,
watermark_type: DataType,
) -> StreamExecutorResult<Datum> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use ScalarImpl here, or what's the semantic for watermark being NULL? cc @st1page.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, in the currently watermark filter design it should not be Null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But maybe do it in later PRs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for vnode in vnodes.iter_pos() {
let pk = Some(ScalarImpl::Int16(vnode as _));
let row = Row::new(vec![pk, current_watermark.clone()]);
table.insert(row);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need upsert here since we're unsure about the prev value after taking the maximum on fail-over & scaling.

Copy link
Member

@BugenZhao BugenZhao Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we may simply disable_sanity_check to make it run. Could you please also make the unit test case cover this?

.enumerate()
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
.collect_vec();
StateTable::new_without_distribution_partial(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we use the from_catalog to create a state table here, as...

  • the distribution should be Hash in most cases,
  • with the vnode_col_idx_in_pk set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find any other executor (except SourceExecutor) using from_catalog in tests. NTFS!

Comment on lines +38 to +40
params
.vnode_bitmap
.expect("vnodes not set for watermark filter"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be possible that the watermark filter is a singleton? cc @st1page @TennyZhuang

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM. Great work!!

@@ -276,6 +276,12 @@ impl Bitmap {
}
}

pub fn iter_pos(&self) -> impl Iterator<Item = usize> + '_ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may reuse this in other places as well.

@mergify mergify bot merged commit b0dfdb8 into main Nov 8, 2022
@mergify mergify bot deleted the introduce_watermark_filter branch November 8, 2022 11:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Type: New feature.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce WatermarkFilter operator
6 participants