Skip to content

Commit 0e8d81f

Browse files
authored
fix: evict hash join cache every n messages. (risingwavelabs#8731)
1 parent 8261a30 commit 0e8d81f

File tree

1 file changed

+26
-5
lines changed

1 file changed

+26
-5
lines changed

src/stream/src/executor/hash_join.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ use crate::task::AtomicU64Ref;
5252
/// enum is not supported in const generic.
5353
// TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed.
5454
pub type JoinTypePrimitive = u8;
55+
56+
/// Evict the cache every n rows.
57+
const EVICT_EVERY_N_ROWS: u32 = 1024;
58+
5559
#[allow(non_snake_case, non_upper_case_globals)]
5660
pub mod JoinType {
5761
use super::JoinTypePrimitive;
@@ -242,6 +246,8 @@ pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitiv
242246
metrics: Arc<StreamingMetrics>,
243247
/// The maximum size of the chunk produced by executor at a time
244248
chunk_size: usize,
249+
/// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
250+
cnt_rows_received: u32,
245251

246252
/// watermark column index -> `BufferedWatermarks`
247253
watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
@@ -603,6 +609,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
603609
append_only_optimize,
604610
metrics,
605611
chunk_size,
612+
cnt_rows_received: 0,
606613
watermark_buffers,
607614
}
608615
}
@@ -662,6 +669,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
662669
chunk,
663670
self.append_only_optimize,
664671
self.chunk_size,
672+
&mut self.cnt_rows_received,
665673
) {
666674
left_time += left_start_time.elapsed();
667675
yield Message::Chunk(chunk?);
@@ -687,6 +695,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
687695
chunk,
688696
self.append_only_optimize,
689697
self.chunk_size,
698+
&mut self.cnt_rows_received,
690699
) {
691700
right_time += right_start_time.elapsed();
692701
yield Message::Chunk(chunk?);
@@ -752,14 +761,23 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
752761
// `commit` them here.
753762
self.side_l.ht.flush(epoch).await?;
754763
self.side_r.ht.flush(epoch).await?;
755-
756-
// We need to manually evict the cache to the target capacity.
757-
self.side_l.ht.evict();
758-
self.side_r.ht.evict();
759-
760764
Ok(())
761765
}
762766

767+
// We need to manually evict the cache.
768+
fn evict_cache(
769+
side_update: &mut JoinSide<K, S>,
770+
side_match: &mut JoinSide<K, S>,
771+
cnt_rows_received: &mut u32,
772+
) {
773+
*cnt_rows_received += 1;
774+
if *cnt_rows_received == EVICT_EVERY_N_ROWS {
775+
side_update.ht.evict();
776+
side_match.ht.evict();
777+
*cnt_rows_received = 0;
778+
}
779+
}
780+
763781
fn handle_watermark(
764782
&mut self,
765783
side: SideTypePrimitive,
@@ -850,6 +868,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
850868
chunk: StreamChunk,
851869
append_only_optimize: bool,
852870
chunk_size: usize,
871+
cnt_rows_received: &'a mut u32,
853872
) {
854873
let chunk = chunk.compact();
855874

@@ -870,6 +889,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
870889

871890
let keys = K::build(&side_update.join_key_indices, chunk.data_chunk())?;
872891
for ((op, row), key) in chunk.rows().zip_eq_debug(keys.iter()) {
892+
Self::evict_cache(side_update, side_match, cnt_rows_received);
893+
873894
let matched_rows: Option<HashValueType> =
874895
Self::hash_eq_match(key, &mut side_match.ht).await?;
875896
match op {

0 commit comments

Comments
 (0)