Skip to content

Commit 3a06226

Browse files
xxchanst1page
andauthored
fix(stream): careful cache invadiation for TopN (risingwavelabs#8659)
Co-authored-by: st1page <[email protected]>
1 parent 9abe5dc commit 3a06226

File tree

11 files changed

+245
-91
lines changed

11 files changed

+245
-91
lines changed

Makefile.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ else
759759
fi
760760
761761
cd "${JAVA_DIR}"
762-
"${MAVEN_PATH}" spotless:check
762+
"${MAVEN_PATH}" spotless:check -q
763763
"""
764764

765765
[tasks.check-java-fix]
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# https://github.com/risingwavelabs/risingwave/issues/8570
2+
# TopN cache invalidation issue
3+
4+
statement ok
5+
SET RW_IMPLICIT_FLUSH TO true;
6+
7+
statement ok
8+
create table t(x int);
9+
10+
statement ok
11+
create materialized view t_singleton as select * from t order by x limit 100;
12+
13+
statement ok
14+
create materialized view mv as select * from t_singleton order by x limit 1;
15+
16+
statement ok
17+
insert into t values (1), (2), (3), (4);
18+
19+
statement ok
20+
delete from t where x = 2;
21+
22+
statement ok
23+
insert into t values (5);
24+
25+
statement ok
26+
delete from t where x = 1;
27+
28+
statement ok
29+
insert into t values (6);
30+
31+
statement ok
32+
delete from t where x = 3;
33+
34+
# Shouldn't be 5
35+
query I
36+
select * from mv;
37+
----
38+
4
39+
40+
statement ok
41+
delete from t where x = 4;
42+
43+
# Shouldn't panic
44+
statement ok
45+
insert into t values (1);
46+
47+
statement ok
48+
drop materialized view mv;
49+
50+
statement ok
51+
drop materialized view t_singleton;
52+
53+
statement ok
54+
drop table t;

src/common/src/row/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
// limitations under the License.
1414

1515
use std::borrow::Cow;
16+
use std::fmt::Display;
1617
use std::hash::{BuildHasher, Hasher};
1718

1819
use bytes::{BufMut, Bytes, BytesMut};
20+
use itertools::Itertools;
1921

2022
use self::empty::EMPTY;
2123
use crate::hash::HashCode;
24+
use crate::types::to_text::ToText;
2225
use crate::types::{hash_datum, DatumRef, ToDatumRef, ToOwnedDatum};
2326
use crate::util::ordered::OrderedRowSerde;
2427
use crate::util::value_encoding;
@@ -145,6 +148,25 @@ pub trait RowExt: Row {
145148
{
146149
assert_row(Project::new(self, indices))
147150
}
151+
152+
fn display(&self) -> impl Display + '_ {
153+
struct D<'a, T: Row>(&'a T);
154+
impl<'a, T: Row> Display for D<'a, T> {
155+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156+
write!(
157+
f,
158+
"{}",
159+
self.0.iter().format_with(" | ", |datum, f| {
160+
match datum {
161+
None => f(&"NULL"),
162+
Some(scalar) => f(&format_args!("{}", scalar.to_text())),
163+
}
164+
})
165+
)
166+
}
167+
}
168+
D(self)
169+
}
148170
}
149171

150172
impl<R: Row> RowExt for R {}

src/stream/src/executor/managed_state/top_n/top_n_state.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,9 +354,10 @@ mod tests {
354354

355355
#[tokio::test]
356356
async fn test_managed_top_n_state_fill_cache() {
357+
let data_types = vec![DataType::Varchar, DataType::Int64];
357358
let state_table = {
358359
let mut tb = create_in_memory_state_table(
359-
&[DataType::Varchar, DataType::Int64],
360+
&data_types,
360361
&[OrderType::ascending(), OrderType::ascending()],
361362
&[0, 1],
362363
)
@@ -382,7 +383,7 @@ mod tests {
382383
let rows = vec![row1, row2, row3, row4, row5];
383384
let ordered_rows = vec![row1_bytes, row2_bytes, row3_bytes, row4_bytes, row5_bytes];
384385

385-
let mut cache = TopNCache::<false>::new(1, 1);
386+
let mut cache = TopNCache::<false>::new(1, 1, data_types);
386387

387388
managed_state.insert(rows[3].clone());
388389
managed_state.insert(rows[1].clone());

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::executor::{ActorContextRef, Executor, ExecutorInfo, PkIndices, Waterm
3737
use crate::task::AtomicU64Ref;
3838

3939
pub type GroupTopNExecutor<K, S, const WITH_TIES: bool> =
40-
TopNExecutorWrapper<InnerGroupTopNExecutorNew<K, S, WITH_TIES>>;
40+
TopNExecutorWrapper<InnerGroupTopNExecutor<K, S, WITH_TIES>>;
4141

4242
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, WITH_TIES> {
4343
#[allow(clippy::too_many_arguments)]
@@ -56,7 +56,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, W
5656
Ok(TopNExecutorWrapper {
5757
input,
5858
ctx,
59-
inner: InnerGroupTopNExecutorNew::new(
59+
inner: InnerGroupTopNExecutor::new(
6060
info,
6161
storage_key,
6262
offset_and_limit,
@@ -70,7 +70,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> GroupTopNExecutor<K, S, W
7070
}
7171
}
7272

73-
pub struct InnerGroupTopNExecutorNew<K: HashKey, S: StateStore, const WITH_TIES: bool> {
73+
pub struct InnerGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
7474
info: ExecutorInfo,
7575

7676
/// `LIMIT XXX`. None means no limit.
@@ -94,7 +94,7 @@ pub struct InnerGroupTopNExecutorNew<K: HashKey, S: StateStore, const WITH_TIES:
9494
cache_key_serde: CacheKeySerde,
9595
}
9696

97-
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutorNew<K, S, WITH_TIES> {
97+
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutor<K, S, WITH_TIES> {
9898
#[allow(clippy::too_many_arguments)]
9999
pub fn new(
100100
input_info: ExecutorInfo,
@@ -158,7 +158,7 @@ impl<K: HashKey, const WITH_TIES: bool> DerefMut for GroupTopNCache<K, WITH_TIES
158158

159159
#[async_trait]
160160
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
161-
for InnerGroupTopNExecutorNew<K, S, WITH_TIES>
161+
for InnerGroupTopNExecutor<K, S, WITH_TIES>
162162
where
163163
TopNCache<WITH_TIES>: TopNCacheTrait,
164164
{
@@ -178,7 +178,8 @@ where
178178
// If 'self.caches' does not already have a cache for the current group, create a new
179179
// cache for it and insert it into `self.caches`
180180
if !self.caches.contains(group_cache_key) {
181-
let mut topn_cache = TopNCache::new(self.offset, self.limit);
181+
let mut topn_cache =
182+
TopNCache::new(self.offset, self.limit, self.schema().data_types());
182183
self.managed_state
183184
.init_topn_cache(Some(group_key), &mut topn_cache)
184185
.await?;

src/stream/src/executor/top_n/group_top_n_appendonly.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use crate::task::AtomicU64Ref;
5454
/// to keep all the data records/rows that have been seen. As long as a record
5555
/// is no longer being in the result set, it can be deleted.
5656
pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
57-
TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutorNew<K, S, WITH_TIES>>;
57+
TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;
5858

5959
impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
6060
AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
@@ -75,7 +75,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
7575
Ok(TopNExecutorWrapper {
7676
input,
7777
ctx,
78-
inner: InnerAppendOnlyGroupTopNExecutorNew::new(
78+
inner: InnerAppendOnlyGroupTopNExecutor::new(
7979
info,
8080
storage_key,
8181
offset_and_limit,
@@ -89,7 +89,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
8989
}
9090
}
9191

92-
pub struct InnerAppendOnlyGroupTopNExecutorNew<K: HashKey, S: StateStore, const WITH_TIES: bool> {
92+
pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
9393
info: ExecutorInfo,
9494

9595
/// `LIMIT XXX`. None means no limit.
@@ -114,7 +114,7 @@ pub struct InnerAppendOnlyGroupTopNExecutorNew<K: HashKey, S: StateStore, const
114114
}
115115

116116
impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
117-
InnerAppendOnlyGroupTopNExecutorNew<K, S, WITH_TIES>
117+
InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
118118
{
119119
#[allow(clippy::too_many_arguments)]
120120
pub fn new(
@@ -153,7 +153,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
153153
}
154154
#[async_trait]
155155
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
156-
for InnerAppendOnlyGroupTopNExecutorNew<K, S, WITH_TIES>
156+
for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
157157
where
158158
TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
159159
{
@@ -164,7 +164,7 @@ where
164164
let keys = K::build(&self.group_by, chunk.data_chunk())?;
165165

166166
let data_types = self.schema().data_types();
167-
let row_deserializer = RowDeserializer::new(data_types);
167+
let row_deserializer = RowDeserializer::new(data_types.clone());
168168

169169
for ((op, row_ref), group_cache_key) in chunk.rows().zip_eq_debug(keys.iter()) {
170170
// The pk without group by
@@ -176,7 +176,7 @@ where
176176
// If 'self.caches' does not already have a cache for the current group, create a new
177177
// cache for it and insert it into `self.caches`
178178
if !self.caches.contains(group_cache_key) {
179-
let mut topn_cache = TopNCache::new(self.offset, self.limit);
179+
let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone());
180180
self.managed_state
181181
.init_topn_cache(Some(group_key), &mut topn_cache)
182182
.await?;

src/stream/src/executor/top_n/top_n_appendonly.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_T
129129
let cache_key_serde =
130130
create_cache_key_serde(&storage_key, &pk_indices, &schema, &order_by, &[]);
131131
let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
132+
let data_types = schema.data_types();
132133

133134
Ok(Self {
134135
info: ExecutorInfo {
@@ -138,7 +139,7 @@ impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_T
138139
},
139140
managed_state,
140141
storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
141-
cache: TopNCache::new(num_offset, num_limit),
142+
cache: TopNCache::new(num_offset, num_limit, data_types),
142143
cache_key_serde,
143144
})
144145
}

0 commit comments

Comments
 (0)