-
Notifications
You must be signed in to change notification settings - Fork 643
feat(streaming): introduce watermark filter #6224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…troduce_watermark_filter
…troduce_watermark_filter
src/common/src/types/mod.rs
Outdated
pub fn min(&self) -> Datum { | ||
match self { | ||
DataType::Int16 => Some(ScalarImpl::Int16(i16::MIN)), | ||
DataType::Int32 => Some(ScalarImpl::Int32(i32::MIN)), | ||
DataType::Int64 => Some(ScalarImpl::Int64(i64::MIN)), | ||
DataType::Float32 => Some(ScalarImpl::Float32(OrderedF32::neg_infinity())), | ||
DataType::Float64 => Some(ScalarImpl::Float64(OrderedF64::neg_infinity())), | ||
DataType::Boolean => Some(ScalarImpl::Bool(false)), | ||
DataType::Varchar => Some(ScalarImpl::Utf8("".to_string())), | ||
DataType::Date => Some(ScalarImpl::NaiveDate(NaiveDateWrapper(NaiveDate::MIN))), | ||
DataType::Time => Some(ScalarImpl::NaiveTime(NaiveTimeWrapper( | ||
NaiveTime::from_hms(0, 0, 0), | ||
))), | ||
DataType::Timestamp => Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper( | ||
NaiveDateTime::MIN, | ||
))), | ||
// FIXME(yuhao): Add a timestampz scalar. | ||
DataType::Timestampz => Some(ScalarImpl::Int64(i64::MIN)), | ||
DataType::Decimal => Some(ScalarImpl::Decimal(Decimal::NegativeInf)), | ||
DataType::Interval => Some(ScalarImpl::Interval(IntervalUnit::min())), | ||
DataType::Struct { .. } => Some(ScalarImpl::Struct(StructValue::new(vec![]))), | ||
DataType::List { .. } => Some(ScalarImpl::List(ListValue::new(vec![]))), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL @xiangjinwu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to recursively call this function to match the schema of Struct
.
Codecov Report
@@ Coverage Diff @@
## main #6224 +/- ##
==========================================
+ Coverage 74.56% 74.60% +0.03%
==========================================
Files 945 947 +2
Lines 152769 153125 +356
==========================================
+ Hits 113914 114238 +324
- Misses 38855 38887 +32
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
async fn get_global_max_watermark( | ||
table: &StateTable<S>, | ||
watermark_type: DataType, | ||
) -> StreamExecutorResult<Datum> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use ScalarImpl
here, or what's the semantic for watermark being NULL
? cc @st1page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, in the currently watermark filter design it should not be Null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But maybe do it in later PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for vnode in vnodes.iter_pos() { | ||
let pk = Some(ScalarImpl::Int16(vnode as _)); | ||
let row = Row::new(vec![pk, current_watermark.clone()]); | ||
table.insert(row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need upsert
here since we're unsure about the prev value after taking the maximum on fail-over & scaling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, we may simply disable_sanity_check
to make it run. Could you please also make the unit test case cover this?
.enumerate() | ||
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) | ||
.collect_vec(); | ||
StateTable::new_without_distribution_partial( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if we use the from_catalog
to create a state table here, as...
- the distribution should be
Hash
in most cases, - with the
vnode_col_idx_in_pk
set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find any other executor (except SourceExecutor) using from_catalog
in tests. NTFS!
params | ||
.vnode_bitmap | ||
.expect("vnodes not set for watermark filter"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be possible that the watermark filter is a singleton? cc @st1page @TennyZhuang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. Great work!!
@@ -276,6 +276,12 @@ impl Bitmap { | |||
} | |||
} | |||
|
|||
pub fn iter_pos(&self) -> impl Iterator<Item = usize> + '_ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may reuse this in other places as well.
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
The executor will generate a
Watermark
after each chunk.This will also guarantee all later rows with event time less than the watermark will be filtered.
Checklist
./risedev check
(or alias,./risedev c
)Refer to a related PR or issue link (optional)
close #6046