-
Notifications
You must be signed in to change notification settings - Fork 640
feat(hash agg): do state cleaning by watermarks in hash agg #6330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## main #6330 +/- ##
==========================================
- Coverage 73.23% 73.23% -0.01%
==========================================
Files 1023 1023
Lines 163534 163543 +9
==========================================
+ Hits 119760 119765 +5
- Misses 43774 43778 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Is this a sequel of #6299? |
c6b80ed
to
9bb5b1d
Compare
4664a38
to
f55f8d1
Compare
f55f8d1
to
5f34a84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
others LGTM
@@ -495,8 +505,14 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> { | |||
// Nothing to flush. | |||
// Call commit on state table to increment the epoch. | |||
iter_table_storage(storages).for_each(|state_table| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can only apply on MaterializedInput
but not on AggStateStorage::Table
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can only apply on
MaterializedInput
but not onAggStateStorage::Table
?
why? register state table also starts with group key
futures::future::try_join_all( | ||
iter_table_storage(storages).map(|state_table| state_table.commit(epoch)), | ||
) | ||
futures::future::try_join_all(iter_table_storage(storages).map(|state_table| async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
do state cleaning by watermarks in hash agg
leverage
delete_range
interface in storageenumerate vnodes that belong to the table
need to enumerate too many vnodes
Checklist
./risedev check
(or alias,./risedev c
)Documentation
If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.
Types of user-facing changes
Please keep the types that apply to your changes, and remove those that do not apply.
Release note
Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.
Refer to a related PR or issue link (optional)
#6112