Skip to content

Commit 9f02036

Browse files
wenym1Li0k
authored andcommitted
refactor(meta): introduce meta store ref and avoid pass generic parameter (#12114)
1 parent 23ac976 commit 9f02036

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+673
-918
lines changed

src/meta/src/backup_restore/backup_manager.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use crate::backup_restore::metrics::BackupManagerMetrics;
3333
use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
3434
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
3535
use crate::rpc::metrics::MetaMetrics;
36-
use crate::storage::MetaStore;
3736
use crate::MetaResult;
3837

3938
pub enum BackupJobResult {
@@ -59,25 +58,25 @@ impl BackupJobHandle {
5958
}
6059
}
6160

62-
pub type BackupManagerRef<S> = Arc<BackupManager<S>>;
61+
pub type BackupManagerRef = Arc<BackupManager>;
6362
/// (url, dir)
6463
type StoreConfig = (String, String);
6564

6665
/// `BackupManager` manages lifecycle of all existent backups and the running backup job.
67-
pub struct BackupManager<S: MetaStore> {
68-
env: MetaSrvEnv<S>,
69-
hummock_manager: HummockManagerRef<S>,
66+
pub struct BackupManager {
67+
env: MetaSrvEnv,
68+
hummock_manager: HummockManagerRef,
7069
backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>,
7170
/// Tracks the running backup job. Concurrent jobs is not supported.
7271
running_backup_job: tokio::sync::Mutex<Option<BackupJobHandle>>,
7372
metrics: BackupManagerMetrics,
7473
meta_metrics: Arc<MetaMetrics>,
7574
}
7675

77-
impl<S: MetaStore> BackupManager<S> {
76+
impl BackupManager {
7877
pub async fn new(
79-
env: MetaSrvEnv<S>,
80-
hummock_manager: HummockManagerRef<S>,
78+
env: MetaSrvEnv,
79+
hummock_manager: HummockManagerRef,
8180
metrics: Arc<MetaMetrics>,
8281
store_url: &str,
8382
store_dir: &str,
@@ -139,8 +138,8 @@ impl<S: MetaStore> BackupManager<S> {
139138
}
140139

141140
fn with_store(
142-
env: MetaSrvEnv<S>,
143-
hummock_manager: HummockManagerRef<S>,
141+
env: MetaSrvEnv,
142+
hummock_manager: HummockManagerRef,
144143
meta_metrics: Arc<MetaMetrics>,
145144
backup_store: (BoxedMetaSnapshotStorage, StoreConfig),
146145
) -> Self {
@@ -167,7 +166,7 @@ impl<S: MetaStore> BackupManager<S> {
167166
}
168167

169168
#[cfg(test)]
170-
pub fn for_test(env: MetaSrvEnv<S>, hummock_manager: HummockManagerRef<S>) -> Self {
169+
pub fn for_test(env: MetaSrvEnv, hummock_manager: HummockManagerRef) -> Self {
171170
Self::with_store(
172171
env,
173172
hummock_manager,
@@ -326,12 +325,12 @@ impl<S: MetaStore> BackupManager<S> {
326325
}
327326

328327
/// `BackupWorker` creates a database snapshot.
329-
struct BackupWorker<S: MetaStore> {
330-
backup_manager: BackupManagerRef<S>,
328+
struct BackupWorker {
329+
backup_manager: BackupManagerRef,
331330
}
332331

333-
impl<S: MetaStore> BackupWorker<S> {
334-
fn new(backup_manager: BackupManagerRef<S>) -> Self {
332+
impl BackupWorker {
333+
fn new(backup_manager: BackupManagerRef) -> Self {
335334
Self { backup_manager }
336335
}
337336

src/meta/src/backup_restore/meta_snapshot_builder.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use std::collections::HashMap;
1616
use std::future::Future;
17-
use std::sync::Arc;
1817

1918
use anyhow::anyhow;
2019
use risingwave_backup::error::{BackupError, BackupResult};
@@ -36,11 +35,11 @@ const VERSION: u32 = 1;
3635

3736
pub struct MetaSnapshotBuilder<S> {
3837
snapshot: MetaSnapshot,
39-
meta_store: Arc<S>,
38+
meta_store: S,
4039
}
4140

4241
impl<S: MetaStore> MetaSnapshotBuilder<S> {
43-
pub fn new(meta_store: Arc<S>) -> Self {
42+
pub fn new(meta_store: S) -> Self {
4443
Self {
4544
snapshot: MetaSnapshot::default(),
4645
meta_store,
@@ -161,8 +160,6 @@ impl<S: MetaStore> MetaSnapshotBuilder<S> {
161160

162161
#[cfg(test)]
163162
mod tests {
164-
use std::ops::Deref;
165-
use std::sync::Arc;
166163

167164
use assert_matches::assert_matches;
168165
use itertools::Itertools;
@@ -179,7 +176,7 @@ mod tests {
179176

180177
#[tokio::test]
181178
async fn test_snapshot_builder() {
182-
let meta_store = Arc::new(MemStore::new());
179+
let meta_store = MemStore::new();
183180

184181
let mut builder = MetaSnapshotBuilder::new(meta_store.clone());
185182
let hummock_version = HummockVersion {
@@ -190,7 +187,7 @@ mod tests {
190187
let v_ = v.clone();
191188
async move { v_ }
192189
};
193-
hummock_version.insert(meta_store.deref()).await.unwrap();
190+
hummock_version.insert(&meta_store).await.unwrap();
194191
let err = builder
195192
.build(1, get_ckpt_builder(&hummock_version))
196193
.await
@@ -205,21 +202,15 @@ mod tests {
205202
hummock_version_id: hummock_version.id,
206203
..Default::default()
207204
};
208-
hummock_version_stats
209-
.insert(meta_store.deref())
210-
.await
211-
.unwrap();
205+
hummock_version_stats.insert(&meta_store).await.unwrap();
212206
let err = builder
213207
.build(1, get_ckpt_builder(&hummock_version))
214208
.await
215209
.unwrap_err();
216210
let err = assert_matches!(err, BackupError::Other(e) => e);
217211
assert_eq!("system params not found in meta store", err.to_error_str());
218212

219-
system_params_for_test()
220-
.insert(meta_store.deref())
221-
.await
222-
.unwrap();
213+
system_params_for_test().insert(&meta_store).await.unwrap();
223214

224215
let err = builder
225216
.build(1, get_ckpt_builder(&hummock_version))
@@ -229,7 +220,7 @@ mod tests {
229220
assert_eq!("cluster id not found in meta store", err.to_error_str());
230221

231222
ClusterId::new()
232-
.put_at_meta_store(meta_store.deref())
223+
.put_at_meta_store(&meta_store)
233224
.await
234225
.unwrap();
235226

src/meta/src/barrier/command.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use super::trace::TracedEpoch;
3737
use crate::barrier::CommandChanges;
3838
use crate::manager::{FragmentManagerRef, WorkerId};
3939
use crate::model::{ActorId, DispatcherId, FragmentId, PausedReason, TableFragments};
40-
use crate::storage::MetaStore;
4140
use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment};
4241
use crate::MetaResult;
4342

@@ -214,8 +213,8 @@ impl Command {
214213

215214
/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
216215
/// [`Command`].
217-
pub struct CommandContext<S: MetaStore> {
218-
fragment_manager: FragmentManagerRef<S>,
216+
pub struct CommandContext {
217+
fragment_manager: FragmentManagerRef,
219218

220219
client_pool: StreamClientPoolRef,
221220

@@ -232,7 +231,7 @@ pub struct CommandContext<S: MetaStore> {
232231

233232
pub kind: BarrierKind,
234233

235-
source_manager: SourceManagerRef<S>,
234+
source_manager: SourceManagerRef,
236235

237236
/// The tracing span of this command.
238237
///
@@ -242,18 +241,18 @@ pub struct CommandContext<S: MetaStore> {
242241
pub span: tracing::Span,
243242
}
244243

245-
impl<S: MetaStore> CommandContext<S> {
244+
impl CommandContext {
246245
#[allow(clippy::too_many_arguments)]
247246
pub(super) fn new(
248-
fragment_manager: FragmentManagerRef<S>,
247+
fragment_manager: FragmentManagerRef,
249248
client_pool: StreamClientPoolRef,
250249
info: BarrierActorInfo,
251250
prev_epoch: TracedEpoch,
252251
curr_epoch: TracedEpoch,
253252
current_paused_reason: Option<PausedReason>,
254253
command: Command,
255254
kind: BarrierKind,
256-
source_manager: SourceManagerRef<S>,
255+
source_manager: SourceManagerRef,
257256
span: tracing::Span,
258257
) -> Self {
259258
Self {
@@ -271,10 +270,7 @@ impl<S: MetaStore> CommandContext<S> {
271270
}
272271
}
273272

274-
impl<S> CommandContext<S>
275-
where
276-
S: MetaStore,
277-
{
273+
impl CommandContext {
278274
/// Generate a mutation for the given command.
279275
pub async fn to_mutation(&self) -> MetaResult<Option<Mutation>> {
280276
let mutation = match &self.command {

0 commit comments

Comments
 (0)