Skip to content

Commit 428354d

Browse files
zwang28Gun9niR
andauthored
feat(backup): support mutating backup config (risingwavelabs#8505)
Co-authored-by: Zhidong Guo <[email protected]>
1 parent 85e450d commit 428354d

File tree

11 files changed

+341
-80
lines changed

11 files changed

+341
-80
lines changed

src/common/src/system_param/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,16 @@ impl ValidateOnSet for OverrideValidateOnSet {
281281
fn checkpoint_frequency(v: &u64) -> Result<()> {
282282
Self::expect_range(*v, 1..)
283283
}
284+
285+
fn backup_storage_directory(_v: &String) -> Result<()> {
286+
// TODO
287+
Ok(())
288+
}
289+
290+
fn backup_storage_url(_v: &String) -> Result<()> {
291+
// TODO
292+
Ok(())
293+
}
284294
}
285295

286296
for_all_undeprecated_params!(impl_default_from_other_params);

src/compute/src/server.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,13 @@ pub async fn compute_node_serve(
170170
.await
171171
.unwrap();
172172

173+
// Initialize observer manager.
174+
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
175+
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
176+
let observer_manager =
177+
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
178+
observer_manager.start().await;
179+
173180
let mut extra_info_sources: Vec<ExtraInfoSourceRef> = vec![];
174181
if let Some(storage) = state_store.as_hummock_trait() {
175182
extra_info_sources.push(storage.sstable_id_manager().clone());
@@ -206,6 +213,12 @@ pub async fn compute_node_serve(
206213
memory_limiter,
207214
));
208215
monitor_cache(memory_collector, &registry).unwrap();
216+
let backup_reader = storage.backup_reader();
217+
tokio::spawn(async move {
218+
backup_reader
219+
.watch_config_change(system_params_manager.watch_params())
220+
.await;
221+
});
209222
}
210223

211224
sub_tasks.push(MetaClient::start_heartbeat_loop(
@@ -253,13 +266,6 @@ pub async fn compute_node_serve(
253266
// of lru manager.
254267
stream_mgr.set_watermark_epoch(watermark_epoch).await;
255268

256-
// Initialize observer manager.
257-
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params));
258-
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
259-
let observer_manager =
260-
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
261-
observer_manager.start().await;
262-
263269
let grpc_await_tree_reg = await_tree_config
264270
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));
265271
let dml_mgr = Arc::new(DmlManager::default());

src/meta/src/backup_restore/backup_manager.rs

Lines changed: 136 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,24 @@
1515
use std::sync::Arc;
1616
use std::time::Instant;
1717

18+
use arc_swap::ArcSwap;
1819
use itertools::Itertools;
1920
use prometheus::Registry;
2021
use risingwave_backup::error::BackupError;
21-
use risingwave_backup::storage::MetaSnapshotStorageRef;
22+
use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
2223
use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
2324
use risingwave_common::bail;
2425
use risingwave_hummock_sdk::HummockSstableId;
26+
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
27+
use risingwave_object_store::object::parse_remote_object_store;
2528
use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
2629
use risingwave_pb::meta::subscribe_response::{Info, Operation};
2730
use tokio::task::JoinHandle;
2831

2932
use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder;
3033
use crate::backup_restore::metrics::BackupManagerMetrics;
3134
use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
32-
use crate::manager::{IdCategory, MetaSrvEnv};
35+
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
3336
use crate::storage::MetaStore;
3437
use crate::MetaResult;
3538

@@ -57,40 +60,118 @@ impl BackupJobHandle {
5760
}
5861

5962
pub type BackupManagerRef<S> = Arc<BackupManager<S>>;
63+
/// (url, dir)
64+
type StoreConfig = (String, String);
6065

6166
/// `BackupManager` manages lifecycle of all existent backups and the running backup job.
6267
pub struct BackupManager<S: MetaStore> {
6368
env: MetaSrvEnv<S>,
6469
hummock_manager: HummockManagerRef<S>,
65-
backup_store: MetaSnapshotStorageRef,
70+
backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>,
6671
/// Tracks the running backup job. Concurrent jobs is not supported.
6772
running_backup_job: tokio::sync::Mutex<Option<BackupJobHandle>>,
6873
metrics: BackupManagerMetrics,
6974
}
7075

7176
impl<S: MetaStore> BackupManager<S> {
72-
pub fn new(
77+
pub async fn new(
7378
env: MetaSrvEnv<S>,
7479
hummock_manager: HummockManagerRef<S>,
75-
backup_store: MetaSnapshotStorageRef,
7680
registry: Registry,
81+
store_url: &str,
82+
store_dir: &str,
83+
) -> MetaResult<Arc<Self>> {
84+
let store_config = (store_url.to_string(), store_dir.to_string());
85+
let store = create_snapshot_store(&store_config).await?;
86+
tracing::info!(
87+
"backup manager initialized: url={}, dir={}",
88+
store_config.0,
89+
store_config.1
90+
);
91+
let instance = Arc::new(Self::with_store(
92+
env.clone(),
93+
hummock_manager,
94+
registry,
95+
(store, store_config),
96+
));
97+
let (local_notification_tx, mut local_notification_rx) =
98+
tokio::sync::mpsc::unbounded_channel();
99+
env.notification_manager()
100+
.insert_local_sender(local_notification_tx)
101+
.await;
102+
let this = instance.clone();
103+
tokio::spawn(async move {
104+
loop {
105+
match local_notification_rx.recv().await {
106+
Some(notification) => {
107+
if let LocalNotification::SystemParamsChange(p) = notification {
108+
let new_config = (
109+
p.backup_storage_url().to_string(),
110+
p.backup_storage_directory().to_string(),
111+
);
112+
this.handle_new_config(new_config).await;
113+
}
114+
}
115+
None => {
116+
return;
117+
}
118+
}
119+
}
120+
});
121+
Ok(instance)
122+
}
123+
124+
async fn handle_new_config(&self, new_config: StoreConfig) {
125+
if self.backup_store.load().1 == new_config {
126+
return;
127+
}
128+
if let Err(e) = self.set_store(new_config.clone()).await {
129+
// Retry is driven by periodic system params notification.
130+
tracing::warn!(
131+
"failed to apply new backup config: url={}, dir={}, {:#?}",
132+
new_config.0,
133+
new_config.1,
134+
e
135+
);
136+
}
137+
}
138+
139+
fn with_store(
140+
env: MetaSrvEnv<S>,
141+
hummock_manager: HummockManagerRef<S>,
142+
registry: Registry,
143+
backup_store: (BoxedMetaSnapshotStorage, StoreConfig),
77144
) -> Self {
78145
Self {
79146
env,
80147
hummock_manager,
81-
backup_store,
148+
backup_store: ArcSwap::from_pointee(backup_store),
82149
running_backup_job: tokio::sync::Mutex::new(None),
83150
metrics: BackupManagerMetrics::new(registry),
84151
}
85152
}
86153

154+
pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> {
155+
let new_store = create_snapshot_store(&config).await?;
156+
tracing::info!(
157+
"new backup config is applied: url={}, dir={}",
158+
config.0,
159+
config.1
160+
);
161+
self.backup_store.store(Arc::new((new_store, config)));
162+
Ok(())
163+
}
164+
87165
#[cfg(test)]
88166
pub fn for_test(env: MetaSrvEnv<S>, hummock_manager: HummockManagerRef<S>) -> Self {
89-
Self::new(
167+
Self::with_store(
90168
env,
91169
hummock_manager,
92-
Arc::new(risingwave_backup::storage::DummyMetaSnapshotStorage::default()),
93170
Registry::new(),
171+
(
172+
Box::<risingwave_backup::storage::DummyMetaSnapshotStorage>::default(),
173+
StoreConfig::default(),
174+
),
94175
)
95176
}
96177

@@ -104,6 +185,26 @@ impl<S: MetaStore> BackupManager<S> {
104185
job.job_id
105186
));
106187
}
188+
// The reasons to limit number of meta snapshot are:
189+
// 1. limit size of `MetaSnapshotManifest`, which is kept in memory by
190+
// `ObjectStoreMetaSnapshotStorage`.
191+
// 2. limit number of pinned SSTs returned by
192+
// `list_pinned_ssts`, which subsequently is used by GC.
193+
const MAX_META_SNAPSHOT_NUM: usize = 100;
194+
let current_number = self
195+
.backup_store
196+
.load()
197+
.0
198+
.manifest()
199+
.snapshot_metadata
200+
.len();
201+
if current_number > MAX_META_SNAPSHOT_NUM {
202+
bail!(format!(
203+
"too many existent meta snapshots, expect at most {}",
204+
MAX_META_SNAPSHOT_NUM
205+
))
206+
}
207+
107208
let job_id = self
108209
.env
109210
.id_gen_manager()
@@ -134,6 +235,8 @@ impl<S: MetaStore> BackupManager<S> {
134235
}
135236
if self
136237
.backup_store
238+
.load()
239+
.0
137240
.manifest()
138241
.snapshot_metadata
139242
.iter()
@@ -160,7 +263,7 @@ impl<S: MetaStore> BackupManager<S> {
160263
.notify_hummock_without_version(
161264
Operation::Update,
162265
Info::MetaBackupManifestId(MetaBackupManifestId {
163-
id: self.backup_store.manifest().manifest_id,
266+
id: self.backup_store.load().0.manifest().manifest_id,
164267
}),
165268
);
166269
}
@@ -188,13 +291,13 @@ impl<S: MetaStore> BackupManager<S> {
188291

189292
/// Deletes existent backups from backup storage.
190293
pub async fn delete_backups(&self, ids: &[MetaSnapshotId]) -> MetaResult<()> {
191-
self.backup_store.delete(ids).await?;
294+
self.backup_store.load().0.delete(ids).await?;
192295
self.env
193296
.notification_manager()
194297
.notify_hummock_without_version(
195298
Operation::Update,
196299
Info::MetaBackupManifestId(MetaBackupManifestId {
197-
id: self.backup_store.manifest().manifest_id,
300+
id: self.backup_store.load().0.manifest().manifest_id,
198301
}),
199302
);
200303
Ok(())
@@ -203,6 +306,8 @@ impl<S: MetaStore> BackupManager<S> {
203306
/// List all `SSTables` required by backups.
204307
pub fn list_pinned_ssts(&self) -> Vec<HummockSstableId> {
205308
self.backup_store
309+
.load()
310+
.0
206311
.manifest()
207312
.snapshot_metadata
208313
.iter()
@@ -212,7 +317,7 @@ impl<S: MetaStore> BackupManager<S> {
212317
}
213318

214319
pub fn manifest(&self) -> Arc<MetaSnapshotManifest> {
215-
self.backup_store.manifest()
320+
self.backup_store.load().0.manifest()
216321
}
217322
}
218323

@@ -234,7 +339,12 @@ impl<S: MetaStore> BackupWorker<S> {
234339
// Reuse job id as snapshot id.
235340
snapshot_builder.build(job_id).await?;
236341
let snapshot = snapshot_builder.finish()?;
237-
backup_manager_clone.backup_store.create(&snapshot).await?;
342+
backup_manager_clone
343+
.backup_store
344+
.load()
345+
.0
346+
.create(&snapshot)
347+
.await?;
238348
Ok(BackupJobResult::Succeeded)
239349
};
240350
tokio::spawn(async move {
@@ -245,3 +355,16 @@ impl<S: MetaStore> BackupWorker<S> {
245355
})
246356
}
247357
}
358+
359+
async fn create_snapshot_store(config: &StoreConfig) -> MetaResult<BoxedMetaSnapshotStorage> {
360+
let object_store = Arc::new(
361+
parse_remote_object_store(
362+
&config.0,
363+
Arc::new(ObjectStoreMetrics::unused()),
364+
"Meta Backup",
365+
)
366+
.await,
367+
);
368+
let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?;
369+
Ok(Box::new(store))
370+
}

src/meta/src/rpc/server.rs

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@ use std::time::Duration;
1818

1919
use either::Either;
2020
use etcd_client::ConnectOptions;
21-
use risingwave_backup::storage::ObjectStoreMetaSnapshotStorage;
2221
use risingwave_common::monitor::process_linux::monitor_process;
2322
use risingwave_common_service::metrics_manager::MetricsManager;
24-
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
25-
use risingwave_object_store::object::parse_remote_object_store;
2623
use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
2724
use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
2825
use risingwave_pb::health::health_server::HealthServer;
@@ -426,31 +423,17 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
426423
.await
427424
.expect("list_table_fragments"),
428425
)
429-
.await
430-
.unwrap();
426+
.await?;
431427

432428
// Initialize services.
433-
let backup_object_store = Arc::new(
434-
parse_remote_object_store(
435-
system_params_reader.backup_storage_url(),
436-
Arc::new(ObjectStoreMetrics::unused()),
437-
"Meta Backup",
438-
)
439-
.await,
440-
);
441-
let backup_storage = Arc::new(
442-
ObjectStoreMetaSnapshotStorage::new(
443-
system_params_reader.backup_storage_directory(),
444-
backup_object_store,
445-
)
446-
.await?,
447-
);
448-
let backup_manager = Arc::new(BackupManager::new(
429+
let backup_manager = BackupManager::new(
449430
env.clone(),
450431
hummock_manager.clone(),
451-
backup_storage,
452432
meta_metrics.registry().clone(),
453-
));
433+
system_params_reader.backup_storage_url(),
434+
system_params_reader.backup_storage_directory(),
435+
)
436+
.await?;
454437
let vacuum_manager = Arc::new(hummock::VacuumManager::new(
455438
env.clone(),
456439
hummock_manager.clone(),

src/storage/backup/integration_tests/run_all.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ tests=( \
66
"test_basic.sh" \
77
"test_pin_sst.sh" \
88
"test_query_backup.sh" \
9+
"test_set_config.sh" \
910
)
1011
for t in "${tests[@]}"
1112
do

0 commit comments

Comments
 (0)