-
Notifications
You must be signed in to change notification settings - Fork 637
refactor: replace more GAT-based async trait with RPITIT #12271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Bugen Zhao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
P.S. Today I learned "hide whitespace".
type SnapshotStream<'a> = impl Stream<Item = StreamExecutorResult<Option<StreamChunk>>> + 'a; | ||
|
||
fn snapshot_read(&self, args: SnapshotReadArgs) -> Self::SnapshotStream<'_> { | ||
#[try_stream] | ||
async move { | ||
let primary_keys = self | ||
.inner | ||
.pk_indices() | ||
.iter() | ||
.map(|idx| { | ||
let f = &self.inner.schema().fields[*idx]; | ||
f.name.clone() | ||
}) | ||
.collect_vec(); | ||
|
||
tracing::debug!( | ||
"snapshot_read primary keys: {:?}, current_pos: {:?}", | ||
primary_keys, | ||
args.current_pos | ||
); | ||
|
||
let row_stream = self.inner.table_reader().snapshot_read( | ||
self.inner.schema_table_name(), | ||
args.current_pos, | ||
primary_keys, | ||
); | ||
|
||
pin_mut!(row_stream); | ||
|
||
let mut builder = | ||
DataChunkBuilder::new(self.inner.schema().data_types(), args.chunk_size); | ||
let chunk_stream = iter_chunks(row_stream, &mut builder); | ||
#[for_await] | ||
for chunk in chunk_stream { | ||
yield chunk?; | ||
} | ||
#[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)] | ||
async fn snapshot_read(&self, args: SnapshotReadArgs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can also use #[try_stream]
in impl Executor
to avoid the indirect execute_inner
. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I believe it's true.
Codecov Report
@@ Coverage Diff @@
## main #12271 +/- ##
=======================================
Coverage 69.72% 69.72%
=======================================
Files 1411 1411
Lines 236365 236326 -39
=======================================
- Hits 164807 164782 -25
+ Misses 71558 71544 -14
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 4 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Signed-off-by: Bugen Zhao <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Hide the whitespaces and you'll find few changes.
As
return-position-impl-trait-in-trait
is now complete, we can replace more GAT-based async trait with that.async-fn-in-trait
andreturn-position-impl-trait-in-trait
#10554Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.