Skip to content

Commit 03cc2ae

Browse files
refactor(expr): make evaluation async (risingwavelabs#8229)
Signed-off-by: Runji Wang <[email protected]>
1 parent 1a11c3f commit 03cc2ae

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1099
-1013
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/batch/src/executor/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl FilterExecutor {
5858
#[for_await]
5959
for data_chunk in self.child.execute() {
6060
let data_chunk = data_chunk?.compact();
61-
let vis_array = self.expr.eval(&data_chunk)?;
61+
let vis_array = self.expr.eval(&data_chunk).await?;
6262

6363
if let Bool(vis) = vis_array.as_ref() {
6464
// TODO: should we yield masked data chunk directly?

src/batch/src/executor/hash_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
213213

214214
// TODO: currently not a vectorized implementation
215215
for state in states {
216-
state.update_single(&chunk, row_id)?
216+
state.update_single(&chunk, row_id).await?
217217
}
218218
}
219219
}

src/batch/src/executor/hop_window.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,12 @@ impl HopWindowExecutor {
178178
let len = data_chunk.cardinality();
179179
for i in 0..units {
180180
let window_start_col = if output_indices.contains(&window_start_col_index) {
181-
Some(self.window_start_exprs[i].eval(&data_chunk)?)
181+
Some(self.window_start_exprs[i].eval(&data_chunk).await?)
182182
} else {
183183
None
184184
};
185185
let window_end_col = if output_indices.contains(&window_end_col_index) {
186-
Some(self.window_end_exprs[i].eval(&data_chunk)?)
186+
Some(self.window_end_exprs[i].eval(&data_chunk).await?)
187187
} else {
188188
None
189189
};

src/batch/src/executor/join/hash_join.rs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
366366
#[for_await]
367367
for chunk in Self::do_inner_join(params) {
368368
let mut chunk = chunk?;
369-
chunk.set_visibility(cond.eval(&chunk)?.as_bool().iter().collect());
369+
chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
370370
yield chunk
371371
}
372372
}
@@ -473,7 +473,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
473473
spilled,
474474
cond.as_ref(),
475475
&mut non_equi_state,
476-
)?
476+
)
477+
.await?
477478
}
478479
}
479480
} else {
@@ -494,7 +495,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
494495
spilled,
495496
cond.as_ref(),
496497
&mut non_equi_state,
497-
)?
498+
)
499+
.await?
498500
}
499501
}
500502

@@ -593,7 +595,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
593595
spilled,
594596
cond.as_ref(),
595597
&mut non_equi_state,
596-
)?
598+
)
599+
.await?
597600
}
598601
}
599602
}
@@ -606,7 +609,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
606609
spilled,
607610
cond.as_ref(),
608611
&mut non_equi_state,
609-
)?
612+
)
613+
.await?
610614
}
611615
}
612616

@@ -657,7 +661,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
657661
spilled,
658662
cond.as_ref(),
659663
&mut non_equi_state,
660-
)?
664+
)
665+
.await?
661666
}
662667
}
663668
} else if let Some(spilled) = Self::append_one_probe_row(
@@ -675,7 +680,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
675680
spilled,
676681
cond.as_ref(),
677682
&mut non_equi_state,
678-
)?
683+
)
684+
.await?
679685
}
680686
if let Some(spilled) = remaining_chunk_builder.consume_all() {
681687
yield spilled
@@ -777,7 +783,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
777783
spilled,
778784
cond.as_ref(),
779785
&mut non_equi_state,
780-
)?
786+
)
787+
.await?
781788
}
782789
}
783790
}
@@ -787,7 +794,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
787794
spilled,
788795
cond.as_ref(),
789796
&mut non_equi_state,
790-
)?
797+
)
798+
.await?
791799
}
792800
#[for_await]
793801
for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
@@ -884,7 +892,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
884892
spilled,
885893
cond.as_ref(),
886894
&mut non_equi_state,
887-
)?
895+
)
896+
.await?
888897
}
889898
}
890899
}
@@ -894,7 +903,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
894903
spilled,
895904
cond.as_ref(),
896905
&mut non_equi_state,
897-
)?
906+
)
907+
.await?
898908
}
899909
#[for_await]
900910
for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
@@ -1028,7 +1038,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
10281038
cond.as_ref(),
10291039
&mut left_non_equi_state,
10301040
&mut right_non_equi_state,
1031-
)?
1041+
)
1042+
.await?
10321043
}
10331044
}
10341045
} else {
@@ -1050,7 +1061,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
10501061
cond.as_ref(),
10511062
&mut left_non_equi_state,
10521063
&mut right_non_equi_state,
1053-
)?
1064+
)
1065+
.await?
10541066
}
10551067
#[for_await]
10561068
for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
@@ -1199,7 +1211,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
11991211
///
12001212
/// For more information about how `process_*_join_non_equi_condition` work, see their unit
12011213
/// tests.
1202-
fn process_left_outer_join_non_equi_condition(
1214+
async fn process_left_outer_join_non_equi_condition(
12031215
chunk: DataChunk,
12041216
cond: &dyn Expression,
12051217
LeftNonEquiJoinState {
@@ -1209,7 +1221,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
12091221
found_matched,
12101222
}: &mut LeftNonEquiJoinState,
12111223
) -> Result<DataChunk> {
1212-
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
1224+
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
12131225
Ok(DataChunkMutator(chunk)
12141226
.nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
12151227
.remove_duplicate_rows_for_left_outer_join(
@@ -1223,7 +1235,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
12231235

12241236
/// Filters for candidate rows which satisfy `non_equi` predicate.
12251237
/// Removes duplicate rows.
1226-
fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
1238+
async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
12271239
chunk: DataChunk,
12281240
cond: &dyn Expression,
12291241
LeftNonEquiJoinState {
@@ -1233,7 +1245,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
12331245
..
12341246
}: &mut LeftNonEquiJoinState,
12351247
) -> Result<DataChunk> {
1236-
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
1248+
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
12371249
Ok(DataChunkMutator(chunk)
12381250
.remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
12391251
&filter,
@@ -1244,29 +1256,29 @@ impl<K: HashKey> HashJoinExecutor<K> {
12441256
.take())
12451257
}
12461258

1247-
fn process_right_outer_join_non_equi_condition(
1259+
async fn process_right_outer_join_non_equi_condition(
12481260
chunk: DataChunk,
12491261
cond: &dyn Expression,
12501262
RightNonEquiJoinState {
12511263
build_row_ids,
12521264
build_row_matched,
12531265
}: &mut RightNonEquiJoinState,
12541266
) -> Result<DataChunk> {
1255-
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
1267+
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
12561268
Ok(DataChunkMutator(chunk)
12571269
.remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
12581270
.take())
12591271
}
12601272

1261-
fn process_right_semi_anti_join_non_equi_condition(
1273+
async fn process_right_semi_anti_join_non_equi_condition(
12621274
chunk: DataChunk,
12631275
cond: &dyn Expression,
12641276
RightNonEquiJoinState {
12651277
build_row_ids,
12661278
build_row_matched,
12671279
}: &mut RightNonEquiJoinState,
12681280
) -> Result<()> {
1269-
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
1281+
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
12701282
DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
12711283
&filter,
12721284
build_row_ids,
@@ -1275,13 +1287,13 @@ impl<K: HashKey> HashJoinExecutor<K> {
12751287
Ok(())
12761288
}
12771289

1278-
fn process_full_outer_join_non_equi_condition(
1290+
async fn process_full_outer_join_non_equi_condition(
12791291
chunk: DataChunk,
12801292
cond: &dyn Expression,
12811293
left_non_equi_state: &mut LeftNonEquiJoinState,
12821294
right_non_equi_state: &mut RightNonEquiJoinState,
12831295
) -> Result<DataChunk> {
1284-
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
1296+
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
12851297
Ok(DataChunkMutator(chunk)
12861298
.nullify_build_side_for_non_equi_condition(
12871299
&filter,
@@ -2609,6 +2621,7 @@ mod tests {
26092621
cond.as_ref(),
26102622
&mut state
26112623
)
2624+
.await
26122625
.unwrap()
26132626
.compact(),
26142627
&expect
@@ -2638,6 +2651,7 @@ mod tests {
26382651
cond.as_ref(),
26392652
&mut state
26402653
)
2654+
.await
26412655
.unwrap()
26422656
.compact(),
26432657
&expect
@@ -2667,6 +2681,7 @@ mod tests {
26672681
cond.as_ref(),
26682682
&mut state
26692683
)
2684+
.await
26702685
.unwrap()
26712686
.compact(),
26722687
&expect
@@ -2706,6 +2721,7 @@ mod tests {
27062721
cond.as_ref(),
27072722
&mut state
27082723
)
2724+
.await
27092725
.unwrap()
27102726
.compact(),
27112727
&expect
@@ -2732,6 +2748,7 @@ mod tests {
27322748
cond.as_ref(),
27332749
&mut state
27342750
)
2751+
.await
27352752
.unwrap()
27362753
.compact(),
27372754
&expect
@@ -2758,6 +2775,7 @@ mod tests {
27582775
cond.as_ref(),
27592776
&mut state
27602777
)
2778+
.await
27612779
.unwrap()
27622780
.compact(),
27632781
&expect
@@ -2799,6 +2817,7 @@ mod tests {
27992817
cond.as_ref(),
28002818
&mut state
28012819
)
2820+
.await
28022821
.unwrap()
28032822
.compact(),
28042823
&expect
@@ -2827,6 +2846,7 @@ mod tests {
28272846
cond.as_ref(),
28282847
&mut state
28292848
)
2849+
.await
28302850
.unwrap()
28312851
.compact(),
28322852
&expect
@@ -2855,6 +2875,7 @@ mod tests {
28552875
cond.as_ref(),
28562876
&mut state
28572877
)
2878+
.await
28582879
.unwrap()
28592880
.compact(),
28602881
&expect
@@ -2918,6 +2939,7 @@ mod tests {
29182939
cond.as_ref(),
29192940
&mut state
29202941
)
2942+
.await
29212943
.unwrap()
29222944
.compact(),
29232945
&expect
@@ -2958,6 +2980,7 @@ mod tests {
29582980
cond.as_ref(),
29592981
&mut state
29602982
)
2983+
.await
29612984
.unwrap()
29622985
.compact(),
29632986
&expect
@@ -3010,6 +3033,7 @@ mod tests {
30103033
cond.as_ref(),
30113034
&mut state
30123035
)
3036+
.await
30133037
.is_ok()
30143038
);
30153039
assert_eq!(state.build_row_ids, Vec::new());
@@ -3044,6 +3068,7 @@ mod tests {
30443068
cond.as_ref(),
30453069
&mut state
30463070
)
3071+
.await
30473072
.is_ok()
30483073
);
30493074
assert_eq!(state.build_row_ids, Vec::new());
@@ -3105,6 +3130,7 @@ mod tests {
31053130
&mut left_state,
31063131
&mut right_state,
31073132
)
3133+
.await
31083134
.unwrap()
31093135
.compact(),
31103136
&expect
@@ -3152,6 +3178,7 @@ mod tests {
31523178
&mut left_state,
31533179
&mut right_state,
31543180
)
3181+
.await
31553182
.unwrap()
31563183
.compact(),
31573184
&expect

0 commit comments

Comments
 (0)