Skip to content

Commit a5ef008

Browse files
authored
feat(storage): refactor LocalStateStore::init and StateTable::init (risingwavelabs#12050)
1 parent ff623ee commit a5ef008

37 files changed

+355
-157
lines changed

src/java_binding/src/hummock_iterator.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,9 @@ impl HummockJavaBindingIterator {
102102
),
103103
read_plan.epoch,
104104
ReadOptions {
105-
prefix_hint: None,
106-
ignore_range_tombstone: false,
107-
retention_seconds: None,
108105
table_id: read_plan.table_id.into(),
109-
read_version_from_backup: false,
110-
prefetch_options: Default::default(),
111106
cache_policy: CachePolicy::NotFill,
107+
..Default::default()
112108
},
113109
(vec![], vec![], pin_version.clone()),
114110
)

src/storage/hummock_test/benches/bench_hummock_iter.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
2020
use futures::{pin_mut, TryStreamExt};
2121
use risingwave_common::cache::CachePriority;
2222
use risingwave_hummock_test::get_notification_client_for_test;
23+
use risingwave_hummock_test::local_state_store_test_utils::LocalStateStoreTestExt;
2324
use risingwave_hummock_test::test_utils::TestIngestBatch;
2425
use risingwave_meta::hummock::test_utils::setup_compute_env;
2526
use risingwave_meta::hummock::MockHummockMetaClient;
@@ -78,7 +79,9 @@ fn criterion_benchmark(c: &mut Criterion) {
7879
});
7980

8081
let epoch = 100;
81-
hummock_storage.init(epoch);
82+
runtime
83+
.block_on(hummock_storage.init_for_test(epoch))
84+
.unwrap();
8285

8386
for batch in batches {
8487
runtime

src/storage/hummock_test/src/bin/replay/replay_impl.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use risingwave_common_service::observer_manager::{Channel, NotificationClient};
2323
use risingwave_hummock_sdk::HummockReadEpoch;
2424
use risingwave_hummock_trace::{
2525
GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
26-
ReplayWrite, Result, TraceError, TracedBytes, TracedNewLocalOptions, TracedReadOptions,
27-
TracedSubResp,
26+
ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
27+
TracedReadOptions, TracedSubResp,
2828
};
2929
use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
3030
use risingwave_meta::storage::{MemStore, MetaStore};
@@ -38,6 +38,7 @@ use risingwave_storage::store::{
3838
};
3939
use risingwave_storage::{StateStore, StateStoreReadIterStream};
4040
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
41+
4142
pub(crate) struct GlobalReplayIter<S>
4243
where
4344
S: StateStoreReadIterStream,
@@ -199,8 +200,11 @@ pub(crate) struct LocalReplayImpl(LocalHummockStorage);
199200

200201
#[async_trait::async_trait]
201202
impl LocalReplay for LocalReplayImpl {
202-
fn init(&mut self, epoch: u64) {
203-
self.0.init(epoch);
203+
async fn init(&mut self, options: TracedInitOptions) -> Result<()> {
204+
self.0
205+
.init(options.into())
206+
.await
207+
.map_err(|_| TraceError::Other("init failed"))
204208
}
205209

206210
fn seal_current_epoch(&mut self, next_epoch: u64) {

src/storage/hummock_test/src/compactor_tests.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub(crate) mod tests {
6262
use risingwave_storage::store::*;
6363

6464
use crate::get_notification_client_for_test;
65+
use crate::local_state_store_test_utils::LocalStateStoreTestExt;
6566
use crate::test_utils::{register_tables_with_id_for_test, TestIngestBatch};
6667

6768
pub(crate) async fn get_hummock_storage<S: MetaStore>(
@@ -135,7 +136,7 @@ pub(crate) mod tests {
135136
let mut local = storage.new_local(Default::default()).await;
136137
// 1. add sstables
137138
let val = b"0"[..].repeat(value_size);
138-
local.init(epochs[0]);
139+
local.init_for_test(epochs[0]).await.unwrap();
139140
for (i, &epoch) in epochs.iter().enumerate() {
140141
let mut new_val = val.clone();
141142
new_val.extend_from_slice(&epoch.to_be_bytes());
@@ -551,7 +552,7 @@ pub(crate) mod tests {
551552
epoch += 1;
552553

553554
if idx == 0 {
554-
local.init(epoch);
555+
local.init_for_test(epoch).await.unwrap();
555556
}
556557

557558
for _ in 0..keys_per_epoch {
@@ -727,8 +728,8 @@ pub(crate) mod tests {
727728
epoch += 1;
728729
let next_epoch = epoch + 1;
729730
if index == 0 {
730-
storage_1.init(epoch);
731-
storage_2.init(epoch);
731+
storage_1.init_for_test(epoch).await.unwrap();
732+
storage_2.init_for_test(epoch).await.unwrap();
732733
}
733734

734735
let (storage, other) = if index % 2 == 0 {
@@ -919,7 +920,7 @@ pub(crate) mod tests {
919920
epoch += millisec_interval_epoch;
920921
let next_epoch = epoch + millisec_interval_epoch;
921922
if i == 0 {
922-
local.init(epoch);
923+
local.init_for_test(epoch).await.unwrap();
923924
}
924925
epoch_set.insert(epoch);
925926
let mut prefix = BytesMut::default();
@@ -1112,7 +1113,7 @@ pub(crate) mod tests {
11121113
for i in 0..kv_count {
11131114
epoch += millisec_interval_epoch;
11141115
if i == 0 {
1115-
local.init(epoch);
1116+
local.init_for_test(epoch).await.unwrap();
11161117
}
11171118
let next_epoch = epoch + millisec_interval_epoch;
11181119
epoch_set.insert(epoch);
@@ -1277,7 +1278,7 @@ pub(crate) mod tests {
12771278
let mut local = storage
12781279
.new_local(NewLocalOptions::for_test(existing_table_id.into()))
12791280
.await;
1280-
local.init(130);
1281+
local.init_for_test(130).await.unwrap();
12811282
let prefix_key_range = |k: u16| {
12821283
let key = k.to_be_bytes();
12831284
(

src/storage/hummock_test/src/failpoint_tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use risingwave_storage::store::{
3333
use risingwave_storage::StateStore;
3434

3535
use crate::get_notification_client_for_test;
36+
use crate::local_state_store_test_utils::LocalStateStoreTestExt;
3637
use crate::test_utils::TestIngestBatch;
3738

3839
#[tokio::test]
@@ -74,7 +75,7 @@ async fn test_failpoints_state_store_read_upload() {
7475
];
7576
// Make sure the batch is sorted.
7677
batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
77-
local.init(1);
78+
local.init_for_test(1).await.unwrap();
7879
local
7980
.ingest_batch(
8081
batch1,

src/storage/hummock_test/src/hummock_storage_tests.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14-
1514
use std::ops::Bound::{Excluded, Included, Unbounded};
1615
use std::sync::Arc;
1716

@@ -29,6 +28,7 @@ use risingwave_storage::storage_value::StorageValue;
2928
use risingwave_storage::store::*;
3029
use risingwave_storage::StateStore;
3130

31+
use crate::local_state_store_test_utils::LocalStateStoreTestExt;
3232
use crate::test_utils::{prepare_hummock_test_env, TestIngestBatch};
3333

3434
#[tokio::test]
@@ -92,7 +92,7 @@ async fn test_storage_basic() {
9292

9393
// epoch 0 is reserved by storage service
9494
let epoch1: u64 = 1;
95-
hummock_storage.init(epoch1);
95+
hummock_storage.init_for_test(epoch1).await.unwrap();
9696

9797
// Write the first batch.
9898
hummock_storage
@@ -471,7 +471,7 @@ async fn test_state_store_sync() {
471471
let read_version = hummock_storage.read_version();
472472

473473
let epoch1 = read_version.read().committed().max_committed_epoch() + 1;
474-
hummock_storage.init(epoch1);
474+
hummock_storage.init_for_test(epoch1).await.unwrap();
475475

476476
// ingest 16B batch
477477
let mut batch1 = vec![
@@ -789,7 +789,7 @@ async fn test_delete_get() {
789789

790790
let epoch1 = initial_epoch + 1;
791791

792-
hummock_storage.init(epoch1);
792+
hummock_storage.init_for_test(epoch1).await.unwrap();
793793
let batch1 = vec![
794794
(Bytes::from("aa"), StorageValue::new_put("111")),
795795
(Bytes::from("bb"), StorageValue::new_put("222")),
@@ -866,7 +866,7 @@ async fn test_multiple_epoch_sync() {
866866
.max_committed_epoch();
867867

868868
let epoch1 = initial_epoch + 1;
869-
hummock_storage.init(epoch1);
869+
hummock_storage.init_for_test(epoch1).await.unwrap();
870870
let batch1 = vec![
871871
(Bytes::from("aa"), StorageValue::new_put("111")),
872872
(Bytes::from("bb"), StorageValue::new_put("222")),
@@ -1014,7 +1014,7 @@ async fn test_iter_with_min_epoch() {
10141014
})
10151015
.collect();
10161016

1017-
hummock_storage.init(epoch1);
1017+
hummock_storage.init_for_test(epoch1).await.unwrap();
10181018

10191019
hummock_storage
10201020
.ingest_batch(
@@ -1255,7 +1255,7 @@ async fn test_hummock_version_reader() {
12551255
})
12561256
.collect();
12571257
{
1258-
hummock_storage.init(epoch1);
1258+
hummock_storage.init_for_test(epoch1).await.unwrap();
12591259
hummock_storage
12601260
.ingest_batch(
12611261
batch_epoch1,
@@ -1657,7 +1657,7 @@ async fn test_get_with_min_epoch() {
16571657
.await;
16581658

16591659
let epoch1 = (31 * 1000) << 16;
1660-
hummock_storage.init(epoch1);
1660+
hummock_storage.init_for_test(epoch1).await.unwrap();
16611661

16621662
let gen_key = |index: usize| -> Vec<u8> {
16631663
UserKey::for_test(TEST_TABLE_ID, format!("key_{}", index)).encode()

src/storage/hummock_test/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#![feature(bound_map)]
1818
#![feature(type_alias_impl_trait)]
1919
#![feature(associated_type_bounds)]
20+
#![feature(return_position_impl_trait_in_trait)]
2021

2122
#[cfg(test)]
2223
mod compactor_tests;
@@ -39,4 +40,9 @@ mod hummock_storage_tests;
3940
mod mock_notification_client;
4041
#[cfg(all(test, feature = "sync_point"))]
4142
mod sync_point_tests;
43+
44+
// Not feature gated by #[cfg(test)] because it is used by test binaries e.g. compaction_test,
45+
// not just tests.
46+
pub mod local_state_store_test_utils;
47+
4248
pub use mock_notification_client::get_notification_client_for_test;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::future::Future;
16+
17+
use risingwave_common::util::epoch::EpochPair;
18+
use risingwave_storage::error::StorageResult;
19+
use risingwave_storage::store::{InitOptions, LocalStateStore};
20+
21+
pub trait LocalStateStoreTestExt: LocalStateStore {
22+
fn init_for_test(&mut self, epoch: u64) -> impl Future<Output = StorageResult<()>> + Send + '_ {
23+
self.init(InitOptions::new_with_epoch(EpochPair::new_test_epoch(
24+
epoch,
25+
)))
26+
}
27+
}
28+
impl<T: LocalStateStore> LocalStateStoreTestExt for T {}

src/storage/hummock_test/src/snapshot_tests.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14-
1514
use std::ops::Bound;
1615
use std::sync::Arc;
1716

@@ -27,6 +26,7 @@ use risingwave_storage::store::{
2726
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, WriteOptions,
2827
};
2928

29+
use crate::local_state_store_test_utils::LocalStateStoreTestExt;
3030
use crate::test_utils::{with_hummock_storage_v2, HummockStateStoreTestTrait, TestIngestBatch};
3131

3232
macro_rules! assert_count_range_scan {
@@ -42,13 +42,9 @@ macro_rules! assert_count_range_scan {
4242
bounds,
4343
$epoch,
4444
ReadOptions {
45-
ignore_range_tombstone: false,
46-
prefix_hint: None,
47-
table_id: Default::default(),
48-
retention_seconds: None,
49-
read_version_from_backup: false,
5045
prefetch_options: PrefetchOptions::new_for_exhaust_iter(),
5146
cache_policy: CachePolicy::Fill(CachePriority::High),
47+
..Default::default()
5248
},
5349
)
5450
.await
@@ -110,7 +106,7 @@ async fn test_snapshot_inner(
110106
.await;
111107

112108
let epoch1: u64 = 1;
113-
local.init(epoch1);
109+
local.init_for_test(epoch1).await.unwrap();
114110
local
115111
.ingest_batch(
116112
vec![
@@ -231,7 +227,7 @@ async fn test_snapshot_range_scan_inner(
231227
let mut local = hummock_storage
232228
.new_local(NewLocalOptions::for_test(Default::default()))
233229
.await;
234-
local.init(epoch);
230+
local.init_for_test(epoch).await.unwrap();
235231

236232
local
237233
.ingest_batch(

0 commit comments

Comments
 (0)