Skip to content

Commit b0f276b

Browse files
authored
fix(config): remove system params from config file (risingwavelabs#8366)
1 parent 24fe1e8 commit b0f276b

File tree

15 files changed

+112
-198
lines changed

15 files changed

+112
-198
lines changed

src/common/src/config.rs

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,6 @@ pub struct RwConfig {
112112
#[serde(default)]
113113
pub storage: StorageConfig,
114114

115-
#[serde(default)]
116-
pub backup: BackupConfig,
117-
118115
#[serde(flatten)]
119116
pub unrecognized: HashMap<String, Value>,
120117
}
@@ -252,18 +249,10 @@ impl Default for BatchConfig {
252249
/// The section `[streaming]` in `risingwave.toml`.
253250
#[derive(Clone, Debug, Serialize, Deserialize)]
254251
pub struct StreamingConfig {
255-
/// The interval of periodic barrier.
256-
#[serde(default = "default::streaming::barrier_interval_ms")]
257-
pub barrier_interval_ms: u32,
258-
259252
/// The maximum number of barriers in-flight in the compute nodes.
260253
#[serde(default = "default::streaming::in_flight_barrier_nums")]
261254
pub in_flight_barrier_nums: usize,
262255

263-
/// There will be a checkpoint for every n barriers
264-
#[serde(default = "default::streaming::checkpoint_frequency")]
265-
pub checkpoint_frequency: usize,
266-
267256
/// The thread number of the streaming actor runtime in the compute node. The default value is
268257
/// decided by `tokio`.
269258
#[serde(default)]
@@ -297,24 +286,6 @@ impl Default for StreamingConfig {
297286
/// The section `[storage]` in `risingwave.toml`.
298287
#[derive(Clone, Debug, Serialize, Deserialize)]
299288
pub struct StorageConfig {
300-
// TODO(zhidong): Remove in 0.1.18 release
301-
// NOTE: It is now a system parameter and should not be used directly.
302-
/// Target size of the Sstable.
303-
#[serde(default = "default::storage::sst_size_mb")]
304-
pub sstable_size_mb: u32,
305-
306-
// TODO(zhidong): Remove in 0.1.18 release
307-
// NOTE: It is now a system parameter and should not be used directly.
308-
/// Size of each block in bytes in SST.
309-
#[serde(default = "default::storage::block_size_kb")]
310-
pub block_size_kb: u32,
311-
312-
// TODO(zhidong): Remove in 0.1.18 release
313-
// NOTE: It is now a system parameter and should not be used directly.
314-
/// False positive probability of bloom filter.
315-
#[serde(default = "default::storage::bloom_false_positive")]
316-
pub bloom_false_positive: f64,
317-
318289
/// parallelism while syncing share buffers into L0 SST. Should NOT be 0.
319290
#[serde(default = "default::storage::share_buffers_sync_parallelism")]
320291
pub share_buffers_sync_parallelism: u32,
@@ -329,12 +300,6 @@ pub struct StorageConfig {
329300
#[serde(default = "default::storage::shared_buffer_capacity_mb")]
330301
pub shared_buffer_capacity_mb: usize,
331302

332-
// TODO(zhidong): Remove in 0.1.18 release
333-
// NOTE: It is now a system parameter and should not be used directly.
334-
/// Remote directory for storing data and metadata objects.
335-
#[serde(default = "default::storage::data_directory")]
336-
pub data_directory: String,
337-
338303
/// Whether to enable write conflict detection
339304
#[serde(default = "default::storage::write_conflict_detection_enabled")]
340305
pub write_conflict_detection_enabled: bool,
@@ -486,30 +451,6 @@ impl Default for DeveloperConfig {
486451
}
487452
}
488453

489-
/// Configs for meta node backup
490-
#[derive(Clone, Debug, Serialize, Deserialize)]
491-
pub struct BackupConfig {
492-
// TODO: Remove in 0.1.18 release
493-
// NOTE: It is now a system parameter and should not be used directly.
494-
/// Remote storage url for storing snapshots.
495-
#[serde(default = "default::backup::storage_url")]
496-
pub storage_url: String,
497-
// TODO: Remove in 0.1.18 release
498-
// NOTE: It is now a system parameter and should not be used directly.
499-
/// Remote directory for storing snapshots.
500-
#[serde(default = "default::backup::storage_directory")]
501-
pub storage_directory: String,
502-
503-
#[serde(flatten)]
504-
pub unrecognized: HashMap<String, Value>,
505-
}
506-
507-
impl Default for BackupConfig {
508-
fn default() -> Self {
509-
toml::from_str("").unwrap()
510-
}
511-
}
512-
513454
mod default {
514455
pub mod meta {
515456
use crate::config::MetaBackend;
@@ -576,18 +517,6 @@ mod default {
576517

577518
pub mod storage {
578519

579-
pub fn sst_size_mb() -> u32 {
580-
256
581-
}
582-
583-
pub fn block_size_kb() -> u32 {
584-
64
585-
}
586-
587-
pub fn bloom_false_positive() -> f64 {
588-
0.001
589-
}
590-
591520
pub fn share_buffers_sync_parallelism() -> u32 {
592521
1
593522
}
@@ -600,10 +529,6 @@ mod default {
600529
1024
601530
}
602531

603-
pub fn data_directory() -> String {
604-
"hummock_001".to_string()
605-
}
606-
607532
pub fn write_conflict_detection_enabled() -> bool {
608533
cfg!(debug_assertions)
609534
}
@@ -657,20 +582,12 @@ mod default {
657582
pub mod streaming {
658583
use crate::config::AsyncStackTraceOption;
659584

660-
pub fn barrier_interval_ms() -> u32 {
661-
1000
662-
}
663-
664585
pub fn in_flight_barrier_nums() -> usize {
665586
// quick fix
666587
// TODO: remove this limitation from code
667588
10000
668589
}
669590

670-
pub fn checkpoint_frequency() -> usize {
671-
10
672-
}
673-
674591
pub fn enable_jaegar_tracing() -> bool {
675592
false
676593
}
@@ -745,14 +662,4 @@ mod default {
745662
1024
746663
}
747664
}
748-
749-
pub mod backup {
750-
pub fn storage_url() -> String {
751-
"memory".to_string()
752-
}
753-
754-
pub fn storage_directory() -> String {
755-
"backup".to_string()
756-
}
757-
}
758665
}

src/common/src/system_param/local_manager.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arc_swap::ArcSwap;
1919
use risingwave_pb::meta::SystemParams;
2020
use tokio::sync::watch::{channel, Receiver, Sender};
2121

22+
use super::default_system_params;
2223
use super::reader::SystemParamsReader;
2324

2425
pub type SystemParamsReaderRef = Arc<ArcSwap<SystemParamsReader>>;
@@ -29,6 +30,7 @@ pub type LocalSystemParamsManagerRef = Arc<LocalSystemParamsManager>;
2930
/// - `get_params` returns a reference to the latest parameters that is atomically updated.
3031
/// - `watch_params` returns a channel on which calling `recv` will get the latest parameters.
3132
/// Compared with `get_params`, the caller can be explicitly notified of parameter change.
33+
#[derive(Debug)]
3234
pub struct LocalSystemParamsManager {
3335
/// The latest parameters.
3436
params: SystemParamsReaderRef,
@@ -44,6 +46,10 @@ impl LocalSystemParamsManager {
4446
Self { params, tx }
4547
}
4648

49+
pub fn for_test() -> Self {
50+
Self::new(default_system_params().into())
51+
}
52+
4753
pub fn get_params(&self) -> SystemParamsReaderRef {
4854
self.params.clone()
4955
}

src/compute/src/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ pub async fn compute_node_serve(
214214
));
215215
monitor_cache(memory_collector, &registry).unwrap();
216216
let backup_reader = storage.backup_reader();
217+
let system_params_manager = system_params_manager.clone();
217218
tokio::spawn(async move {
218219
backup_reader
219220
.watch_config_change(system_params_manager.watch_params())
@@ -295,6 +296,7 @@ pub async fn compute_node_serve(
295296
worker_id,
296297
state_store,
297298
dml_mgr,
299+
system_params_manager,
298300
source_metrics,
299301
);
300302

src/config/ci-compaction-test-meta.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@ checkpoint_frequency = 99999999
1111

1212
[storage]
1313
shared_buffer_capacity_mb = 4096
14-
sstable_size_mb = 256
15-
block_size_kb = 1024
16-
bloom_false_positive = 0.001
17-
data_directory = "hummock_001"
1814
block_cache_capacity_mb = 4096
1915
meta_cache_capacity_mb = 1024
2016
compactor_memory_limit_mb = 5120

src/config/ci-compaction-test.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@ checkpoint_frequency = 10
99

1010
[storage]
1111
shared_buffer_capacity_mb = 4096
12-
sstable_size_mb = 256
13-
block_size_kb = 1024
14-
bloom_false_positive = 0.001
15-
data_directory = "hummock_001"
1612
block_cache_capacity_mb = 4096
1713
meta_cache_capacity_mb = 1024
1814
compactor_memory_limit_mb = 5120

src/config/example.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ checkpoint_frequency = 10
1414

1515
[storage]
1616
shared_buffer_capacity_mb = 4096
17-
sstable_size_mb = 256
18-
block_size_kb = 1024
19-
bloom_false_positive = 0.001
20-
data_directory = "hummock_001"
2117
block_cache_capacity_mb = 4096
2218
meta_cache_capacity_mb = 1024
2319
compactor_memory_limit_mb = 5120

src/meta/src/lib.rs

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use std::time::Duration;
5151

5252
use clap::Parser;
5353
pub use error::{MetaError, MetaResult};
54+
use risingwave_common::system_param::default;
5455
use risingwave_common::{GIT_SHA, RW_VERSION};
5556
use risingwave_common_proc_macro::OverrideConfig;
5657
use risingwave_pb::meta::SystemParams;
@@ -106,11 +107,42 @@ pub struct MetaNodeOpts {
106107
#[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
107108
prometheus_endpoint: Option<String>,
108109

109-
// TODO(zhidong): Make it required in v0.1.18
110110
/// State store url.
111111
#[clap(long, env = "RW_STATE_STORE")]
112112
state_store: Option<String>,
113113

114+
/// The interval of periodic barrier.
115+
#[clap(long, env = "RW_BARRIER_INTERVAL_MS", default_value_t = default::barrier_interval_ms())]
116+
barrier_interval_ms: u32,
117+
118+
/// There will be a checkpoint for every n barriers
119+
#[clap(long, env = "RW_CHECKPOINT_FREQUENCY", default_value_t = default::checkpoint_frequency())]
120+
pub checkpoint_frequency: u64,
121+
122+
/// Target size of the Sstable.
123+
#[clap(long, env = "RW_SSTABLE_SIZE_MB", default_value_t = default::sstable_size_mb())]
124+
sstable_size_mb: u32,
125+
126+
/// Size of each block in bytes in SST.
127+
#[clap(long, env = "RW_BLOCK_SIZE_KB", default_value_t = default::block_size_kb())]
128+
block_size_kb: u32,
129+
130+
/// False positive probability of bloom filter.
131+
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE", default_value_t = default::bloom_false_positive())]
132+
bloom_false_positive: f64,
133+
134+
/// Remote directory for storing data and metadata objects.
135+
#[clap(long, env = "RW_DATA_DIRECTORY", default_value_t = default::data_directory())]
136+
data_directory: String,
137+
138+
/// Remote storage url for storing snapshots.
139+
#[clap(long, env = "RW_BACKUP_STORAGE_URL", default_value_t = default::backup_storage_url())]
140+
backup_storage_url: String,
141+
142+
/// Remote directory for storing snapshots.
143+
#[clap(long, env = "RW_STORAGE_DIRECTORY", default_value_t = default::backup_storage_directory())]
144+
backup_storage_directory: String,
145+
114146
/// Endpoint of the connector node, there will be a sidecar connector node
115147
/// colocated with Meta node in the cloud environment
116148
#[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")]
@@ -132,36 +164,6 @@ pub struct OverrideConfigOpts {
132164
#[clap(long, env = "RW_BACKEND", value_enum)]
133165
#[override_opts(path = meta.backend)]
134166
backend: Option<MetaBackend>,
135-
136-
/// Target size of the Sstable.
137-
#[clap(long, env = "RW_SSTABLE_SIZE_MB")]
138-
#[override_opts(path = storage.sstable_size_mb)]
139-
sstable_size_mb: Option<u32>,
140-
141-
/// Size of each block in bytes in SST.
142-
#[clap(long, env = "RW_BLOCK_SIZE_KB")]
143-
#[override_opts(path = storage.block_size_kb)]
144-
block_size_kb: Option<u32>,
145-
146-
/// False positive probability of bloom filter.
147-
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")]
148-
#[override_opts(path = storage.bloom_false_positive)]
149-
bloom_false_positive: Option<f64>,
150-
151-
/// Remote directory for storing data and metadata objects.
152-
#[clap(long, env = "RW_DATA_DIRECTORY")]
153-
#[override_opts(path = storage.data_directory)]
154-
data_directory: Option<String>,
155-
156-
/// Remote storage url for storing snapshots.
157-
#[clap(long, env = "RW_BACKUP_STORAGE_URL")]
158-
#[override_opts(path = backup.storage_url)]
159-
backup_storage_url: Option<String>,
160-
161-
/// Remote directory for storing snapshots.
162-
#[clap(long, env = "RW_STORAGE_DIRECTORY")]
163-
#[override_opts(path = backup.storage_directory)]
164-
backup_storage_directory: Option<String>,
165167
}
166168

167169
use std::future::Future;
@@ -246,15 +248,15 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
246248
.periodic_ttl_reclaim_compaction_interval_sec,
247249
},
248250
SystemParams {
249-
barrier_interval_ms: Some(config.streaming.barrier_interval_ms),
250-
checkpoint_frequency: Some(config.streaming.checkpoint_frequency as u64),
251-
sstable_size_mb: Some(config.storage.sstable_size_mb),
252-
block_size_kb: Some(config.storage.block_size_kb),
253-
bloom_false_positive: Some(config.storage.bloom_false_positive),
251+
barrier_interval_ms: Some(opts.barrier_interval_ms),
252+
checkpoint_frequency: Some(opts.checkpoint_frequency),
253+
sstable_size_mb: Some(opts.sstable_size_mb),
254+
block_size_kb: Some(opts.block_size_kb),
255+
bloom_false_positive: Some(opts.bloom_false_positive),
254256
state_store: Some(opts.state_store.unwrap_or_default()),
255-
data_directory: Some(config.storage.data_directory),
256-
backup_storage_url: Some(config.backup.storage_url),
257-
backup_storage_directory: Some(config.backup.storage_directory),
257+
data_directory: Some(opts.data_directory),
258+
backup_storage_url: Some(opts.backup_storage_url),
259+
backup_storage_directory: Some(opts.backup_storage_directory),
258260
},
259261
)
260262
.await

src/meta/src/manager/system_param/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919
use std::time::Duration;
2020

2121
use risingwave_common::system_param::reader::SystemParamsReader;
22-
use risingwave_common::system_param::{default, set_system_param};
22+
use risingwave_common::system_param::set_system_param;
2323
use risingwave_common::{for_all_undeprecated_params, key_of};
2424
use risingwave_pb::meta::subscribe_response::{Info, Operation};
2525
use risingwave_pb::meta::SystemParams;
@@ -145,7 +145,7 @@ impl<S: MetaStore> SystemParamsManager<S> {
145145
// 2. Some, Some: Check equality and warn if they differ.
146146
// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
147147
// params are not set. Use init value.
148-
// 4. None, None: Same as 3, but the init param is not from CLI. Use default value.
148+
// 4. None, None: Impossible.
149149
macro_rules! impl_merge_params {
150150
($({ $field:ident, $type:ty, $default:expr },)*) => {
151151
fn merge_params(mut persisted: SystemParams, init: SystemParams) -> SystemParams {
@@ -157,7 +157,7 @@ macro_rules! impl_merge_params {
157157
}
158158
},
159159
(None, Some(init)) => persisted.$field = Some(init),
160-
(None, None) => { persisted.$field = Some(default::$field()) },
160+
(None, None) => unreachable!(),
161161
_ => {},
162162
}
163163
)*

0 commit comments

Comments
 (0)