Skip to content

Commit d772092

Browse files
refactor(streaming): use LRU cache for group top-n (#6491)
* temp commit * top_n cache use lru manager * ready for review * call evict manually * fix * fix Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent bcdc717 commit d772092

File tree

4 files changed

+72
-10
lines changed

4 files changed

+72
-10
lines changed

src/stream/src/executor/mview/materialize.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl<S: StateStore> MaterializeExecutor<S> {
210210
if let Some(vnode_bitmap) = b.as_update_vnode_bitmap(self.actor_context.id) {
211211
let _ = self.state_table.update_vnode_bitmap(vnode_bitmap);
212212
}
213-
213+
self.materialize_cache.evict();
214214
Message::Barrier(b)
215215
}
216216
}
@@ -501,6 +501,10 @@ impl MaterializeCache {
501501
pub fn put(&mut self, key: Vec<u8>, value: Option<CompactedRow>) {
502502
self.data.push(key, value);
503503
}
504+
505+
fn evict(&mut self) {
506+
self.data.evict()
507+
}
504508
}
505509
#[cfg(test)]
506510
mod tests {

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::hash_map::Entry::Vacant;
16-
use std::collections::{HashMap, HashSet};
15+
use std::collections::HashSet;
1716
use std::sync::Arc;
1817

1918
use async_trait::async_trait;
@@ -30,7 +29,7 @@ use risingwave_storage::StateStore;
3029
use super::top_n_cache::TopNCacheTrait;
3130
use super::utils::*;
3231
use super::TopNCache;
33-
use crate::cache::cache_may_stale;
32+
use crate::cache::{cache_may_stale, EvictableHashMap, ExecutorCache, LruManagerRef};
3433
use crate::common::table::state_table::StateTable;
3534
use crate::error::StreamResult;
3635
use crate::executor::error::StreamExecutorResult;
@@ -52,6 +51,8 @@ impl<S: StateStore> GroupTopNExecutor<S, false> {
5251
executor_id: u64,
5352
group_by: Vec<usize>,
5453
state_table: StateTable<S>,
54+
lru_manager: Option<LruManagerRef>,
55+
cache_size: usize,
5556
) -> StreamResult<Self> {
5657
let info = input.info();
5758
let schema = input.schema().clone();
@@ -68,6 +69,8 @@ impl<S: StateStore> GroupTopNExecutor<S, false> {
6869
executor_id,
6970
group_by,
7071
state_table,
72+
lru_manager,
73+
cache_size,
7174
)?,
7275
})
7376
}
@@ -85,6 +88,8 @@ impl<S: StateStore> GroupTopNExecutor<S, true> {
8588
executor_id: u64,
8689
group_by: Vec<usize>,
8790
state_table: StateTable<S>,
91+
lru_manager: Option<LruManagerRef>,
92+
cache_size: usize,
8893
) -> StreamResult<Self> {
8994
let info = input.info();
9095
let schema = input.schema().clone();
@@ -102,6 +107,8 @@ impl<S: StateStore> GroupTopNExecutor<S, true> {
102107
executor_id,
103108
group_by,
104109
state_table,
110+
lru_manager,
111+
cache_size,
105112
)?,
106113
})
107114
}
@@ -132,7 +139,7 @@ pub struct InnerGroupTopNExecutorNew<S: StateStore, const WITH_TIES: bool> {
132139
group_by: Vec<usize>,
133140

134141
/// group key -> cache for this group
135-
caches: HashMap<Vec<Datum>, TopNCache<WITH_TIES>>,
142+
caches: GroupTopNCache<WITH_TIES>,
136143

137144
/// The number of fields of the ORDER BY clause, and will be used to split key into `CacheKey`.
138145
order_by_len: usize,
@@ -153,6 +160,8 @@ impl<S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutorNew<S, WITH_TIE
153160
executor_id: u64,
154161
group_by: Vec<usize>,
155162
state_table: StateTable<S>,
163+
lru_manager: Option<LruManagerRef>,
164+
cache_size: usize,
156165
) -> StreamResult<Self> {
157166
// order_pairs is superset of pk
158167
assert!(order_pairs
@@ -196,13 +205,47 @@ impl<S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutorNew<S, WITH_TIE
196205
pk_indices,
197206
internal_key_indices,
198207
group_by,
199-
caches: HashMap::new(),
208+
caches: GroupTopNCache::new(lru_manager, cache_size),
200209
order_by_len,
201210
cache_key_serde,
202211
})
203212
}
204213
}
205214

215+
pub struct GroupTopNCache<const WITH_TIES: bool> {
216+
data: ExecutorCache<Vec<Datum>, TopNCache<WITH_TIES>>,
217+
}
218+
219+
impl<const WITH_TIES: bool> GroupTopNCache<WITH_TIES> {
220+
pub fn new(lru_manager: Option<LruManagerRef>, cache_size: usize) -> Self {
221+
let cache = if let Some(lru_manager) = lru_manager {
222+
ExecutorCache::Managed(lru_manager.create_cache())
223+
} else {
224+
ExecutorCache::Local(EvictableHashMap::new(cache_size))
225+
};
226+
Self { data: cache }
227+
}
228+
229+
fn clear(&mut self) {
230+
self.data.clear()
231+
}
232+
233+
fn get_mut(&mut self, key: &[Datum]) -> Option<&mut TopNCache<WITH_TIES>> {
234+
self.data.get_mut(key)
235+
}
236+
237+
fn contains(&mut self, key: &[Datum]) -> bool {
238+
self.data.contains(key)
239+
}
240+
241+
fn insert(&mut self, key: Vec<Datum>, value: TopNCache<WITH_TIES>) {
242+
self.data.push(key, value);
243+
}
244+
245+
fn evict(&mut self) {
246+
self.data.evict()
247+
}
248+
}
206249
#[async_trait]
207250
impl<S: StateStore, const WITH_TIES: bool> TopNExecutorBase
208251
for InnerGroupTopNExecutorNew<S, WITH_TIES>
@@ -229,12 +272,12 @@ where
229272

230273
// If 'self.caches' does not already have a cache for the current group, create a new
231274
// cache for it and insert it into `self.caches`
232-
if let Vacant(entry) = self.caches.entry(group_key) {
275+
if !self.caches.contains(&group_key) {
233276
let mut topn_cache = TopNCache::new(self.offset, self.limit, self.order_by_len);
234277
self.managed_state
235278
.init_topn_cache(Some(&pk_prefix), &mut topn_cache, self.order_by_len)
236279
.await?;
237-
entry.insert(topn_cache);
280+
self.caches.insert(group_key, topn_cache);
238281
}
239282
let cache = self.caches.get_mut(&pk_prefix.0).unwrap();
240283

@@ -291,6 +334,10 @@ where
291334
}
292335
}
293336

337+
fn evict(&mut self) {
338+
self.caches.evict()
339+
}
340+
294341
async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
295342
self.managed_state.state_table.init_epoch(epoch);
296343
Ok(())
@@ -406,6 +453,8 @@ mod tests {
406453
1,
407454
vec![1],
408455
state_table,
456+
None,
457+
0,
409458
)
410459
.unwrap(),
411460
);
@@ -504,6 +553,8 @@ mod tests {
504553
1,
505554
vec![1],
506555
state_table,
556+
None,
557+
0,
507558
)
508559
.unwrap(),
509560
);
@@ -594,6 +645,8 @@ mod tests {
594645
1,
595646
vec![1, 2],
596647
state_table,
648+
None,
649+
0,
597650
)
598651
.unwrap(),
599652
);

src/stream/src/executor/top_n/utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub trait TopNExecutorBase: Send + 'static {
5858
unreachable!()
5959
}
6060

61+
fn evict(&mut self) {}
6162
async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>;
6263
}
6364

@@ -120,7 +121,7 @@ where
120121
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
121122
self.inner.update_vnode_bitmap(vnode_bitmap);
122123
}
123-
124+
self.inner.evict();
124125
yield Message::Barrier(barrier)
125126
}
126127
};

src/stream/src/from_proto/group_top_n.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder {
3131
mut params: ExecutorParams,
3232
node: &Self::Node,
3333
store: impl StateStore,
34-
_stream: &mut LocalStreamManagerCore,
34+
stream: &mut LocalStreamManagerCore,
3535
) -> StreamResult<BoxedExecutor> {
3636
let group_by = node
3737
.get_group_key()
@@ -54,6 +54,8 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder {
5454
params.executor_id,
5555
group_by,
5656
state_table,
57+
stream.context.lru_manager.clone(),
58+
1 << 16,
5759
)?
5860
.boxed())
5961
} else {
@@ -67,6 +69,8 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder {
6769
params.executor_id,
6870
group_by,
6971
state_table,
72+
stream.context.lru_manager.clone(),
73+
1 << 16,
7074
)?
7175
.boxed())
7276
}

0 commit comments

Comments
 (0)