Skip to content

feat(storage): refactor LocalStateStore::init and StateTable::init #12050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Sep 5, 2023
Merged
6 changes: 1 addition & 5 deletions src/java_binding/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,9 @@ impl HummockJavaBindingIterator {
),
read_plan.epoch,
ReadOptions {
prefix_hint: None,
ignore_range_tombstone: false,
retention_seconds: None,
table_id: read_plan.table_id.into(),
read_version_from_backup: false,
prefetch_options: Default::default(),
cache_policy: CachePolicy::NotFill,
..Default::default()
},
(vec![], vec![], pin_version.clone()),
)
Expand Down
5 changes: 4 additions & 1 deletion src/storage/hummock_test/benches/bench_hummock_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use futures::{pin_mut, TryStreamExt};
use risingwave_common::cache::CachePriority;
use risingwave_hummock_test::get_notification_client_for_test;
use risingwave_hummock_test::local_state_store_test_utils::LocalStateStoreTestExt;
use risingwave_hummock_test::test_utils::TestIngestBatch;
use risingwave_meta::hummock::test_utils::setup_compute_env;
use risingwave_meta::hummock::MockHummockMetaClient;
Expand Down Expand Up @@ -78,7 +79,9 @@ fn criterion_benchmark(c: &mut Criterion) {
});

let epoch = 100;
hummock_storage.init(epoch);
runtime
.block_on(hummock_storage.init_for_test(epoch))
.unwrap();

for batch in batches {
runtime
Expand Down
12 changes: 8 additions & 4 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use risingwave_common_service::observer_manager::{Channel, NotificationClient};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_trace::{
GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
ReplayWrite, Result, TraceError, TracedBytes, TracedNewLocalOptions, TracedReadOptions,
TracedSubResp,
ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
TracedReadOptions, TracedSubResp,
};
use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
use risingwave_meta::storage::{MemStore, MetaStore};
Expand All @@ -38,6 +38,7 @@ use risingwave_storage::store::{
};
use risingwave_storage::{StateStore, StateStoreReadIterStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};

pub(crate) struct GlobalReplayIter<S>
where
S: StateStoreReadIterStream,
Expand Down Expand Up @@ -199,8 +200,11 @@ pub(crate) struct LocalReplayImpl(LocalHummockStorage);

#[async_trait::async_trait]
impl LocalReplay for LocalReplayImpl {
fn init(&mut self, epoch: u64) {
self.0.init(epoch);
async fn init(&mut self, options: TracedInitOptions) -> Result<()> {
self.0
.init(options.into())
.await
.map_err(|_| TraceError::Other("init failed"))
}

fn seal_current_epoch(&mut self, next_epoch: u64) {
Expand Down
15 changes: 8 additions & 7 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub(crate) mod tests {
use risingwave_storage::store::*;

use crate::get_notification_client_for_test;
use crate::local_state_store_test_utils::LocalStateStoreTestExt;
use crate::test_utils::{register_tables_with_id_for_test, TestIngestBatch};

pub(crate) async fn get_hummock_storage<S: MetaStore>(
Expand Down Expand Up @@ -135,7 +136,7 @@ pub(crate) mod tests {
let mut local = storage.new_local(Default::default()).await;
// 1. add sstables
let val = b"0"[..].repeat(value_size);
local.init(epochs[0]);
local.init_for_test(epochs[0]).await.unwrap();
for (i, &epoch) in epochs.iter().enumerate() {
let mut new_val = val.clone();
new_val.extend_from_slice(&epoch.to_be_bytes());
Expand Down Expand Up @@ -551,7 +552,7 @@ pub(crate) mod tests {
epoch += 1;

if idx == 0 {
local.init(epoch);
local.init_for_test(epoch).await.unwrap();
}

for _ in 0..keys_per_epoch {
Expand Down Expand Up @@ -727,8 +728,8 @@ pub(crate) mod tests {
epoch += 1;
let next_epoch = epoch + 1;
if index == 0 {
storage_1.init(epoch);
storage_2.init(epoch);
storage_1.init_for_test(epoch).await.unwrap();
storage_2.init_for_test(epoch).await.unwrap();
}

let (storage, other) = if index % 2 == 0 {
Expand Down Expand Up @@ -919,7 +920,7 @@ pub(crate) mod tests {
epoch += millisec_interval_epoch;
let next_epoch = epoch + millisec_interval_epoch;
if i == 0 {
local.init(epoch);
local.init_for_test(epoch).await.unwrap();
}
epoch_set.insert(epoch);
let mut prefix = BytesMut::default();
Expand Down Expand Up @@ -1112,7 +1113,7 @@ pub(crate) mod tests {
for i in 0..kv_count {
epoch += millisec_interval_epoch;
if i == 0 {
local.init(epoch);
local.init_for_test(epoch).await.unwrap();
}
let next_epoch = epoch + millisec_interval_epoch;
epoch_set.insert(epoch);
Expand Down Expand Up @@ -1277,7 +1278,7 @@ pub(crate) mod tests {
let mut local = storage
.new_local(NewLocalOptions::for_test(existing_table_id.into()))
.await;
local.init(130);
local.init_for_test(130).await.unwrap();
let prefix_key_range = |k: u16| {
let key = k.to_be_bytes();
(
Expand Down
3 changes: 2 additions & 1 deletion src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_storage::store::{
use risingwave_storage::StateStore;

use crate::get_notification_client_for_test;
use crate::local_state_store_test_utils::LocalStateStoreTestExt;
use crate::test_utils::TestIngestBatch;

#[tokio::test]
Expand Down Expand Up @@ -74,7 +75,7 @@ async fn test_failpoints_state_store_read_upload() {
];
// Make sure the batch is sorted.
batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
local.init(1);
local.init_for_test(1).await.unwrap();
local
.ingest_batch(
batch1,
Expand Down
16 changes: 8 additions & 8 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound::{Excluded, Included, Unbounded};
use std::sync::Arc;

Expand All @@ -29,6 +28,7 @@ use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::*;
use risingwave_storage::StateStore;

use crate::local_state_store_test_utils::LocalStateStoreTestExt;
use crate::test_utils::{prepare_hummock_test_env, TestIngestBatch};

#[tokio::test]
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn test_storage_basic() {

// epoch 0 is reserved by storage service
let epoch1: u64 = 1;
hummock_storage.init(epoch1);
hummock_storage.init_for_test(epoch1).await.unwrap();

// Write the first batch.
hummock_storage
Expand Down Expand Up @@ -471,7 +471,7 @@ async fn test_state_store_sync() {
let read_version = hummock_storage.read_version();

let epoch1 = read_version.read().committed().max_committed_epoch() + 1;
hummock_storage.init(epoch1);
hummock_storage.init_for_test(epoch1).await.unwrap();

// ingest 16B batch
let mut batch1 = vec![
Expand Down Expand Up @@ -789,7 +789,7 @@ async fn test_delete_get() {

let epoch1 = initial_epoch + 1;

hummock_storage.init(epoch1);
hummock_storage.init_for_test(epoch1).await.unwrap();
let batch1 = vec![
(Bytes::from("aa"), StorageValue::new_put("111")),
(Bytes::from("bb"), StorageValue::new_put("222")),
Expand Down Expand Up @@ -866,7 +866,7 @@ async fn test_multiple_epoch_sync() {
.max_committed_epoch();

let epoch1 = initial_epoch + 1;
hummock_storage.init(epoch1);
hummock_storage.init_for_test(epoch1).await.unwrap();
let batch1 = vec![
(Bytes::from("aa"), StorageValue::new_put("111")),
(Bytes::from("bb"), StorageValue::new_put("222")),
Expand Down Expand Up @@ -1014,7 +1014,7 @@ async fn test_iter_with_min_epoch() {
})
.collect();

hummock_storage.init(epoch1);
hummock_storage.init_for_test(epoch1).await.unwrap();

hummock_storage
.ingest_batch(
Expand Down Expand Up @@ -1255,7 +1255,7 @@ async fn test_hummock_version_reader() {
})
.collect();
{
hummock_storage.init(epoch1);
hummock_storage.init_for_test(epoch1).await.unwrap();
hummock_storage
.ingest_batch(
batch_epoch1,
Expand Down Expand Up @@ -1657,7 +1657,7 @@ async fn test_get_with_min_epoch() {
.await;

let epoch1 = (31 * 1000) << 16;
hummock_storage.init(epoch1);
hummock_storage.init_for_test(epoch1).await.unwrap();

let gen_key = |index: usize| -> Vec<u8> {
UserKey::for_test(TEST_TABLE_ID, format!("key_{}", index)).encode()
Expand Down
6 changes: 6 additions & 0 deletions src/storage/hummock_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![feature(bound_map)]
#![feature(type_alias_impl_trait)]
#![feature(associated_type_bounds)]
#![feature(return_position_impl_trait_in_trait)]

#[cfg(test)]
mod compactor_tests;
Expand All @@ -39,4 +40,9 @@ mod hummock_storage_tests;
mod mock_notification_client;
#[cfg(all(test, feature = "sync_point"))]
mod sync_point_tests;

// Not feature gated by #[cfg(test)] because it is used by test binaries e.g. compaction_test,
// not just tests.
pub mod local_state_store_test_utils;

pub use mock_notification_client::get_notification_client_for_test;
28 changes: 28 additions & 0 deletions src/storage/hummock_test/src/local_state_store_test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::error::StorageResult;
use risingwave_storage::store::{InitOptions, LocalStateStore};

pub trait LocalStateStoreTestExt: LocalStateStore {
fn init_for_test(&mut self, epoch: u64) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.init(InitOptions::new_with_epoch(EpochPair::new_test_epoch(
epoch,
)))
}
}
impl<T: LocalStateStore> LocalStateStoreTestExt for T {}
12 changes: 4 additions & 8 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use std::sync::Arc;

Expand All @@ -27,6 +26,7 @@ use risingwave_storage::store::{
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, WriteOptions,
};

use crate::local_state_store_test_utils::LocalStateStoreTestExt;
use crate::test_utils::{with_hummock_storage_v2, HummockStateStoreTestTrait, TestIngestBatch};

macro_rules! assert_count_range_scan {
Expand All @@ -42,13 +42,9 @@ macro_rules! assert_count_range_scan {
bounds,
$epoch,
ReadOptions {
ignore_range_tombstone: false,
prefix_hint: None,
table_id: Default::default(),
retention_seconds: None,
read_version_from_backup: false,
prefetch_options: PrefetchOptions::new_for_exhaust_iter(),
cache_policy: CachePolicy::Fill(CachePriority::High),
..Default::default()
},
)
.await
Expand Down Expand Up @@ -110,7 +106,7 @@ async fn test_snapshot_inner(
.await;

let epoch1: u64 = 1;
local.init(epoch1);
local.init_for_test(epoch1).await.unwrap();
local
.ingest_batch(
vec![
Expand Down Expand Up @@ -231,7 +227,7 @@ async fn test_snapshot_range_scan_inner(
let mut local = hummock_storage
.new_local(NewLocalOptions::for_test(Default::default()))
.await;
local.init(epoch);
local.init_for_test(epoch).await.unwrap();

local
.ingest_batch(
Expand Down
Loading