Skip to content

Commit fa34964

Browse files
authored
feat(storage): add may_exist interface in state table (#7489)
This PR adds the `may_exist` interface in both local state store and state table, which will be used by the streaming executor to check the existence of a given key range in storage efficiently. A typical use case is for hash join to check existence of a join key before refilling its operator cache on write (#7393 (comment)) Note that there can be false positive when using this interface but the false positive rate can be improved by providing a `prefix_hint` in ReadOptions to check SST bloom filters. In the current implementation, state table will always provide the `prefix_hint` in its `may_exist` implementation. LocalStateStore: ```rust /// Check existence of a given `key_range`. /// It is better to provide `prefix_hint` in `read_options`, which will be used /// for checking bloom filter if hummock is used. If `prefix_hint` is not provided, /// the false positive rate can be significantly higher because bloom filter cannot /// be used. /// /// Returns: /// - false: `key_range` is guaranteed to be absent in storage. /// - true: `key_range` may or may not exist in storage. fn may_exist( &self, key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>), read_options: ReadOptions, ) -> Self::MayExistFuture<'_>; ``` StateTable: ```rust /// Returns: /// false: the provided pk prefix is absent in state store. /// true: the provided pk prefix may or may not be present in state store. pub async fn may_exist(&self, pk_prefix: impl Row) -> StreamExecutorResult<bool> { ``` Approved-By: Li0k Approved-By: wenym1 Approved-By: KeXiangWang Approved-By: wcy-fdu Approved-By: Little-Wallace
1 parent 9932864 commit fa34964

16 files changed

+763
-99
lines changed

grafana/risingwave-dashboard.dashboard.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,6 +1696,23 @@ def section_hummock(panels):
16961696
),
16971697
],
16981698
),
1699+
panels.timeseries_latency(
1700+
"Read Duration - MayExist",
1701+
"",
1702+
[
1703+
*quantile(
1704+
lambda quantile, legend: panels.target(
1705+
f"histogram_quantile({quantile}, sum(rate({metric('state_store_may_exist_duration_bucket')}[$__rate_interval])) by (le, job, instance, table_id))",
1706+
f"p{legend}" + " - {{table_id}} @ {{job}} @ {{instance}}",
1707+
),
1708+
[50, 90, 99, "max"],
1709+
),
1710+
panels.target(
1711+
f"sum by(le, job, instance, table_id)(rate({metric('state_store_may_exist_duration_sum')}[$__rate_interval])) / sum by(le, job, instance, table_id) (rate({metric('state_store_may_exist_duration_count')}[$__rate_interval]))",
1712+
"avg - {{table_id}} {{job}} @ {{instance}}",
1713+
),
1714+
],
1715+
),
16991716
panels.timeseries_ops(
17001717
"Read Bloom Filter",
17011718
"",

grafana/risingwave-dashboard.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cmp::Ordering;
1516
use std::fmt::Debug;
1617
use std::future::Future;
1718
use std::marker::PhantomData;
@@ -23,7 +24,7 @@ use std::sync::{Arc, LazyLock};
2324
use bytes::Bytes;
2425
use itertools::Itertools;
2526
use risingwave_common::catalog::TableId;
26-
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
27+
use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
2728

2829
use crate::hummock::iterator::{
2930
Backward, DeleteRangeIterator, DirectionEnum, Forward, HummockIterator,
@@ -194,6 +195,33 @@ impl SharedBufferBatch {
194195
.le(table_key.as_ref())
195196
}
196197

198+
pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
199+
self.inner
200+
.binary_search_by(|m| {
201+
let key = &m.0;
202+
let too_left = match &table_key_range.0 {
203+
std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
204+
std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
205+
std::ops::Bound::Unbounded => false,
206+
};
207+
if too_left {
208+
return Ordering::Less;
209+
}
210+
211+
let too_right = match &table_key_range.1 {
212+
std::ops::Bound::Included(range_end) => range_end.as_ref() < key.as_ref(),
213+
std::ops::Bound::Excluded(range_end) => range_end.as_ref() <= key.as_ref(),
214+
std::ops::Bound::Unbounded => false,
215+
};
216+
if too_right {
217+
return Ordering::Greater;
218+
}
219+
220+
Ordering::Equal
221+
})
222+
.is_ok()
223+
}
224+
197225
pub fn into_directed_iter<D: HummockIteratorDirection>(self) -> SharedBufferBatchIterator<D> {
198226
SharedBufferBatchIterator::<D>::new(self.inner, self.table_id, self.epoch)
199227
}
@@ -498,7 +526,10 @@ impl DeleteRangeIterator for SharedBufferDeleteRangeIterator {
498526

499527
#[cfg(test)]
500528
mod tests {
529+
use std::ops::Bound::{Excluded, Included};
530+
501531
use itertools::Itertools;
532+
use risingwave_hummock_sdk::key::map_table_key_range;
502533

503534
use super::*;
504535
use crate::hummock::iterator::test_utils::{
@@ -764,4 +795,48 @@ mod tests {
764795
.await
765796
.unwrap();
766797
}
798+
799+
#[tokio::test]
800+
async fn test_shared_buffer_batch_range_existx() {
801+
let epoch = 1;
802+
let shared_buffer_items = vec![
803+
(Vec::from("a_1"), HummockValue::put(Bytes::from("value1"))),
804+
(Vec::from("a_3"), HummockValue::put(Bytes::from("value2"))),
805+
(Vec::from("a_5"), HummockValue::put(Bytes::from("value3"))),
806+
(Vec::from("b_2"), HummockValue::put(Bytes::from("value3"))),
807+
];
808+
let shared_buffer_batch = SharedBufferBatch::for_test(
809+
transform_shared_buffer(shared_buffer_items),
810+
epoch,
811+
Default::default(),
812+
);
813+
814+
let range = (Included(Vec::from("a")), Excluded(Vec::from("b")));
815+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
816+
let range = (Included(Vec::from("a_")), Excluded(Vec::from("b_")));
817+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
818+
let range = (Included(Vec::from("a_1")), Included(Vec::from("a_1")));
819+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
820+
let range = (Included(Vec::from("a_1")), Included(Vec::from("a_2")));
821+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
822+
let range = (Included(Vec::from("a_0x")), Included(Vec::from("a_2x")));
823+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
824+
let range = (Included(Vec::from("a_")), Excluded(Vec::from("c_")));
825+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
826+
let range = (Included(Vec::from("b_0x")), Included(Vec::from("b_2x")));
827+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
828+
let range = (Included(Vec::from("b_2")), Excluded(Vec::from("c_1x")));
829+
assert!(shared_buffer_batch.range_exists(&map_table_key_range(range)));
830+
831+
let range = (Included(Vec::from("a_0")), Excluded(Vec::from("a_1")));
832+
assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
833+
let range = (Included(Vec::from("a__0")), Excluded(Vec::from("a__5")));
834+
assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
835+
let range = (Included(Vec::from("b_1")), Excluded(Vec::from("b_2")));
836+
assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
837+
let range = (Included(Vec::from("b_3")), Excluded(Vec::from("c_1")));
838+
assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
839+
let range = (Included(Vec::from("b__x")), Excluded(Vec::from("c__x")));
840+
assert!(!shared_buffer_batch.range_exists(&map_table_key_range(range)));
841+
}
767842
}

src/storage/src/hummock/state_store_v1.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use std::cmp::Ordering;
1616
use std::future::Future;
17-
use std::ops::Bound::{Excluded, Included};
1817
use std::ops::{Bound, RangeBounds};
1918
use std::sync::Arc;
2019
use std::time::Duration;
@@ -30,7 +29,7 @@ use risingwave_hummock_sdk::key::{
3029
bound_table_key_range, map_table_key_range, user_key, FullKey, TableKey, TableKeyRange, UserKey,
3130
};
3231
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
33-
use risingwave_hummock_sdk::{can_concat, HummockReadEpoch};
32+
use risingwave_hummock_sdk::HummockReadEpoch;
3433
use risingwave_pb::hummock::LevelType;
3534
use tokio::sync::oneshot;
3635
use tracing::log::warn;
@@ -51,7 +50,7 @@ use crate::hummock::iterator::{
5150
use crate::hummock::local_version::ReadVersion;
5251
use crate::hummock::shared_buffer::build_ordered_merge_iter;
5352
use crate::hummock::sstable::SstableIteratorReadOptions;
54-
use crate::hummock::utils::{prune_ssts, search_sst_idx};
53+
use crate::hummock::utils::{prune_nonoverlapping_ssts, prune_overlapping_ssts};
5554
use crate::hummock::{
5655
DeleteRangeAggregator, ForwardIter, HummockEpoch, HummockError, HummockIteratorType,
5756
HummockResult, Sstable,
@@ -62,8 +61,8 @@ use crate::monitor::{
6261
use crate::storage_value::StorageValue;
6362
use crate::store::*;
6463
use crate::{
65-
define_state_store_associated_type, define_state_store_read_associated_type,
66-
define_state_store_write_associated_type,
64+
define_local_state_store_associated_type, define_state_store_associated_type,
65+
define_state_store_read_associated_type, define_state_store_write_associated_type,
6766
};
6867

6968
impl HummockStorageV1 {
@@ -139,8 +138,12 @@ impl HummockStorageV1 {
139138
}
140139
match level.level_type() {
141140
LevelType::Overlapping | LevelType::Unspecified => {
142-
let sstable_infos =
143-
prune_ssts(level.table_infos.iter(), table_id, &(table_key..=table_key));
141+
let single_table_key_range = table_key..=table_key;
142+
let sstable_infos = prune_overlapping_ssts(
143+
&level.table_infos,
144+
table_id,
145+
&single_table_key_range,
146+
);
144147
for sstable_info in sstable_infos {
145148
stats_guard.local_stats.sub_iter_count += 1;
146149
if let Some(v) = get_from_sstable_info(
@@ -300,24 +303,12 @@ impl HummockStorageV1 {
300303
continue;
301304
}
302305
if level.level_type == LevelType::Nonoverlapping as i32 {
303-
debug_assert!(can_concat(&level.table_infos));
304-
let start_table_idx = match encoded_user_key_range.start_bound() {
305-
Included(key) | Excluded(key) => search_sst_idx(&level.table_infos, key),
306-
_ => 0,
307-
};
308-
let end_table_idx = match encoded_user_key_range.end_bound() {
309-
Included(key) | Excluded(key) => search_sst_idx(&level.table_infos, key),
310-
_ => level.table_infos.len().saturating_sub(1),
311-
};
312-
assert!(
313-
start_table_idx < level.table_infos.len()
314-
&& end_table_idx < level.table_infos.len()
315-
);
316-
let matched_table_infos = &level.table_infos[start_table_idx..=end_table_idx];
306+
let matched_table_infos =
307+
prune_nonoverlapping_ssts(&level.table_infos, &encoded_user_key_range);
317308

318309
let pruned_sstables = match T::Direction::direction() {
319-
DirectionEnum::Backward => matched_table_infos.iter().rev().collect_vec(),
320-
DirectionEnum::Forward => matched_table_infos.iter().collect_vec(),
310+
DirectionEnum::Backward => matched_table_infos.rev().collect_vec(),
311+
DirectionEnum::Forward => matched_table_infos.collect_vec(),
321312
};
322313

323314
let mut sstables = vec![];
@@ -346,7 +337,8 @@ impl HummockStorageV1 {
346337
iter_read_options.clone(),
347338
)));
348339
} else {
349-
let table_infos = prune_ssts(level.table_infos.iter(), table_id, &table_key_range);
340+
let table_infos =
341+
prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
350342
for table_info in table_infos.into_iter().rev() {
351343
let sstable = self
352344
.sstable_store
@@ -463,7 +455,17 @@ impl StateStoreWrite for HummockStorageV1 {
463455
}
464456
}
465457

466-
impl LocalStateStore for HummockStorageV1 {}
458+
impl LocalStateStore for HummockStorageV1 {
459+
define_local_state_store_associated_type!();
460+
461+
fn may_exist(
462+
&self,
463+
_key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>),
464+
_read_options: ReadOptions,
465+
) -> Self::MayExistFuture<'_> {
466+
async move { Ok(true) }
467+
}
468+
}
467469

468470
impl StateStore for HummockStorageV1 {
469471
type Local = Self;

src/storage/src/hummock/store/state_store.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use minitrace::future::FutureExt;
2020
use parking_lot::RwLock;
2121
use risingwave_common::catalog::TableId;
2222
use risingwave_hummock_sdk::key::{map_table_key_range, TableKey, TableKeyRange};
23+
use risingwave_hummock_sdk::HummockEpoch;
2324
use tokio::sync::mpsc;
2425
use tracing::warn;
2526

@@ -39,8 +40,8 @@ use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocal
3940
use crate::storage_value::StorageValue;
4041
use crate::store::*;
4142
use crate::{
42-
define_state_store_read_associated_type, define_state_store_write_associated_type,
43-
StateStoreIter,
43+
define_local_state_store_associated_type, define_state_store_read_associated_type,
44+
define_state_store_write_associated_type, StateStoreIter,
4445
};
4546

4647
pub struct LocalHummockStorage {
@@ -107,6 +108,25 @@ impl LocalHummockStorage {
107108
.iter(table_key_range, epoch, read_options, read_snapshot)
108109
.await
109110
}
111+
112+
pub async fn may_exist_inner(
113+
&self,
114+
key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>),
115+
read_options: ReadOptions,
116+
) -> StorageResult<bool> {
117+
let table_key_range = map_table_key_range(key_range);
118+
119+
let read_snapshot = read_filter_for_local(
120+
HummockEpoch::MAX, // Use MAX epoch to make sure we read from latest
121+
read_options.table_id,
122+
&table_key_range,
123+
self.read_version.clone(),
124+
)?;
125+
126+
self.hummock_version_reader
127+
.may_exist(table_key_range, read_options, read_snapshot)
128+
.await
129+
}
110130
}
111131

112132
impl StateStoreRead for LocalHummockStorage {
@@ -195,7 +215,17 @@ impl StateStoreWrite for LocalHummockStorage {
195215
}
196216
}
197217

198-
impl LocalStateStore for LocalHummockStorage {}
218+
impl LocalStateStore for LocalHummockStorage {
219+
define_local_state_store_associated_type!();
220+
221+
fn may_exist(
222+
&self,
223+
key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>),
224+
read_options: ReadOptions,
225+
) -> Self::MayExistFuture<'_> {
226+
self.may_exist_inner(key_range, read_options)
227+
}
228+
}
199229

200230
impl LocalHummockStorage {
201231
pub fn new(

0 commit comments

Comments
 (0)