Skip to content

Commit f483f1c

Browse files
authored
[ISSUE #90]Support update namesrv config (#1316)
* feat: add initial update implementation * fix: clippy * fix: clippy body reference
1 parent e0ba433 commit f483f1c

File tree

5 files changed

+356
-0
lines changed

5 files changed

+356
-0
lines changed

rocketmq-common/src/common/mix_all.rs

+53
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
use std::collections::HashMap;
1819
use std::env;
1920

2021
use cheetah_string::CheetahString;
@@ -159,6 +160,29 @@ pub fn human_readable_byte_count(bytes: i64, si: bool) -> String {
159160
format!("{:.1} {}B", bytes / unit.powi(exp), pre)
160161
}
161162

163+
pub fn string_to_properties(input: &str) -> Option<HashMap<CheetahString, CheetahString>> {
164+
let mut properties = HashMap::new();
165+
166+
for line in input.lines() {
167+
let line = line.trim();
168+
if line.is_empty() || line.starts_with('#') {
169+
// Skip empty lines or comments
170+
continue;
171+
}
172+
173+
if let Some((key, value)) = line.split_once('=') {
174+
// Convert key and value to CheetahString
175+
let key = CheetahString::from(key.trim());
176+
let value = CheetahString::from(value.trim());
177+
properties.insert(key, value);
178+
} else {
179+
return None; // Return None if the line isn't in `key=value` format
180+
}
181+
}
182+
183+
Some(properties)
184+
}
185+
162186
#[cfg(test)]
163187
mod tests {
164188
use super::*;
@@ -241,4 +265,33 @@ mod tests {
241265
fn returns_false_for_none_metadata() {
242266
assert!(!is_lmq(None));
243267
}
268+
269+
#[test]
270+
fn test_string_to_properties_valid_input() {
271+
let input = r#"
272+
# This is a comment
273+
key1=value1
274+
key2 = value2
275+
key3=value3
276+
"#;
277+
278+
let result = string_to_properties(input).expect("Parsing should succeed");
279+
let mut expected = HashMap::new();
280+
expected.insert(CheetahString::from("key1"), CheetahString::from("value1"));
281+
expected.insert(CheetahString::from("key2"), CheetahString::from("value2"));
282+
expected.insert(CheetahString::from("key3"), CheetahString::from("value3"));
283+
284+
assert_eq!(result, expected);
285+
}
286+
287+
#[test]
288+
fn test_string_to_properties_invalid_line() {
289+
let input = r#"
290+
key1=value1
291+
invalid_line
292+
"#;
293+
294+
let result = string_to_properties(input);
295+
assert!(result.is_none(), "Parsing should fail for invalid input");
296+
}
244297
}

rocketmq-common/src/common/namesrv/namesrv_config.rs

+230
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18+
use std::collections::HashMap;
1819
use std::env;
1920

21+
use cheetah_string::CheetahString;
2022
use serde::Deserialize;
2123

2224
use crate::common::mix_all::ROCKETMQ_HOME_ENV;
@@ -144,6 +146,118 @@ impl NamesrvConfig {
144146
pub fn new() -> NamesrvConfig {
145147
Self::default()
146148
}
149+
150+
/// Splits the `config_black_list` into a `Vec<CheetahString>` for easier usage.
151+
pub fn get_config_blacklist(&self) -> Vec<CheetahString> {
152+
self.config_black_list
153+
.split(';')
154+
.map(|s| CheetahString::from(s.trim()))
155+
.collect()
156+
}
157+
158+
pub fn update(
159+
&mut self,
160+
properties: HashMap<CheetahString, CheetahString>,
161+
) -> Result<(), String> {
162+
for (key, value) in properties {
163+
match key.as_str() {
164+
"rocketmqHome" => self.rocketmq_home = value.to_string(),
165+
"kvConfigPath" => self.kv_config_path = value.to_string(),
166+
"configStorePath" => self.config_store_path = value.to_string(),
167+
"productEnvName" => self.product_env_name = value.to_string(),
168+
"clusterTest" => {
169+
self.cluster_test = value
170+
.parse()
171+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
172+
}
173+
"orderMessageEnable" => {
174+
self.order_message_enable = value
175+
.parse()
176+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
177+
}
178+
"clientRequestThreadPoolNums" => {
179+
self.client_request_thread_pool_nums = value
180+
.parse()
181+
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
182+
}
183+
"defaultThreadPoolNums" => {
184+
self.default_thread_pool_nums = value
185+
.parse()
186+
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
187+
}
188+
"clientRequestThreadPoolQueueCapacity" => {
189+
self.client_request_thread_pool_queue_capacity = value
190+
.parse()
191+
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
192+
}
193+
"defaultThreadPoolQueueCapacity" => {
194+
self.default_thread_pool_queue_capacity = value
195+
.parse()
196+
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
197+
}
198+
"scanNotActiveBrokerInterval" => {
199+
self.scan_not_active_broker_interval = value
200+
.parse()
201+
.map_err(|_| format!("Invalid value for key '{}'", key))?
202+
}
203+
"unRegisterBrokerQueueCapacity" => {
204+
self.unregister_broker_queue_capacity = value
205+
.parse()
206+
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
207+
}
208+
"supportActingMaster" => {
209+
self.support_acting_master = value
210+
.parse()
211+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
212+
}
213+
"enableAllTopicList" => {
214+
self.enable_all_topic_list = value
215+
.parse()
216+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
217+
}
218+
"enableTopicList" => {
219+
self.enable_topic_list = value
220+
.parse()
221+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
222+
}
223+
"notifyMinBrokerIdChanged" => {
224+
self.notify_min_broker_id_changed = value
225+
.parse()
226+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
227+
}
228+
"enableControllerInNamesrv" => {
229+
self.enable_controller_in_namesrv = value
230+
.parse()
231+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
232+
}
233+
"needWaitForService" => {
234+
self.need_wait_for_service = value
235+
.parse()
236+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
237+
}
238+
"waitSecondsForService" => {
239+
self.wait_seconds_for_service = value
240+
.parse()
241+
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
242+
}
243+
"deleteTopicWithBrokerRegistration" => {
244+
self.delete_topic_with_broker_registration = value
245+
.parse()
246+
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
247+
}
248+
"configBlackList" => {
249+
self.config_black_list = value
250+
.parse()
251+
.map_err(|_| format!("Invalid string value for key '{}'", key))?
252+
}
253+
_ => {
254+
return Err(format!("Unknown configuration key: '{}'", key));
255+
}
256+
}
257+
}
258+
259+
Ok(())
260+
}
147261
}
148262

149263
#[cfg(test)]
@@ -204,4 +318,120 @@ mod tests {
204318
"configBlackList;configStorePath;kvConfigPath".to_string()
205319
);
206320
}
321+
322+
#[test]
323+
fn test_namesrv_config_update() {
324+
let mut config = NamesrvConfig::new();
325+
326+
let mut properties = HashMap::new();
327+
properties.insert(
328+
CheetahString::from("rocketmqHome"),
329+
CheetahString::from("/new/path"),
330+
);
331+
properties.insert(
332+
CheetahString::from("kvConfigPath"),
333+
CheetahString::from("/new/kvConfigPath"),
334+
);
335+
properties.insert(
336+
CheetahString::from("configStorePath"),
337+
CheetahString::from("/new/configStorePath"),
338+
);
339+
properties.insert(
340+
CheetahString::from("productEnvName"),
341+
CheetahString::from("new_env"),
342+
);
343+
properties.insert(
344+
CheetahString::from("clusterTest"),
345+
CheetahString::from("true"),
346+
);
347+
properties.insert(
348+
CheetahString::from("orderMessageEnable"),
349+
CheetahString::from("true"),
350+
);
351+
properties.insert(
352+
CheetahString::from("clientRequestThreadPoolNums"),
353+
CheetahString::from("10"),
354+
);
355+
properties.insert(
356+
CheetahString::from("defaultThreadPoolNums"),
357+
CheetahString::from("20"),
358+
);
359+
properties.insert(
360+
CheetahString::from("clientRequestThreadPoolQueueCapacity"),
361+
CheetahString::from("10000"),
362+
);
363+
properties.insert(
364+
CheetahString::from("defaultThreadPoolQueueCapacity"),
365+
CheetahString::from("20000"),
366+
);
367+
properties.insert(
368+
CheetahString::from("scanNotActiveBrokerInterval"),
369+
CheetahString::from("15000"),
370+
);
371+
properties.insert(
372+
CheetahString::from("unRegisterBrokerQueueCapacity"),
373+
CheetahString::from("4000"),
374+
);
375+
properties.insert(
376+
CheetahString::from("supportActingMaster"),
377+
CheetahString::from("true"),
378+
);
379+
properties.insert(
380+
CheetahString::from("enableAllTopicList"),
381+
CheetahString::from("false"),
382+
);
383+
properties.insert(
384+
CheetahString::from("enableTopicList"),
385+
CheetahString::from("false"),
386+
);
387+
properties.insert(
388+
CheetahString::from("notifyMinBrokerIdChanged"),
389+
CheetahString::from("true"),
390+
);
391+
properties.insert(
392+
CheetahString::from("enableControllerInNamesrv"),
393+
CheetahString::from("true"),
394+
);
395+
properties.insert(
396+
CheetahString::from("needWaitForService"),
397+
CheetahString::from("true"),
398+
);
399+
properties.insert(
400+
CheetahString::from("waitSecondsForService"),
401+
CheetahString::from("30"),
402+
);
403+
properties.insert(
404+
CheetahString::from("deleteTopicWithBrokerRegistration"),
405+
CheetahString::from("true"),
406+
);
407+
properties.insert(
408+
CheetahString::from("configBlackList"),
409+
CheetahString::from("newBlackList"),
410+
);
411+
412+
let result = config.update(properties);
413+
assert!(result.is_ok());
414+
415+
assert_eq!(config.rocketmq_home, "/new/path");
416+
assert_eq!(config.kv_config_path, "/new/kvConfigPath");
417+
assert_eq!(config.config_store_path, "/new/configStorePath");
418+
assert_eq!(config.product_env_name, "new_env");
419+
assert_eq!(config.cluster_test, true);
420+
assert_eq!(config.order_message_enable, true);
421+
assert_eq!(config.client_request_thread_pool_nums, 10);
422+
assert_eq!(config.default_thread_pool_nums, 20);
423+
assert_eq!(config.client_request_thread_pool_queue_capacity, 10000);
424+
assert_eq!(config.default_thread_pool_queue_capacity, 20000);
425+
assert_eq!(config.scan_not_active_broker_interval, 15000);
426+
assert_eq!(config.unregister_broker_queue_capacity, 4000);
427+
assert_eq!(config.support_acting_master, true);
428+
assert_eq!(config.enable_all_topic_list, false);
429+
assert_eq!(config.enable_topic_list, false);
430+
assert_eq!(config.notify_min_broker_id_changed, true);
431+
assert_eq!(config.enable_controller_in_namesrv, true);
432+
assert_eq!(config.need_wait_for_service, true);
433+
assert_eq!(config.wait_seconds_for_service, 30);
434+
assert_eq!(config.delete_topic_with_broker_registration, true);
435+
assert_eq!(config.config_black_list, "newBlackList");
436+
}
207437
}

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

+8
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ impl KVConfigManager {
9696
}
9797
}
9898

99+
/// Updates the Namesrv configuration.
100+
pub fn update_namesrv_config(
101+
&mut self,
102+
updates: HashMap<CheetahString, CheetahString>,
103+
) -> Result<(), String> {
104+
self.namesrv_config.update(updates)
105+
}
106+
99107
/// Persists the current key-value configurations to a file.
100108
pub fn persist(&mut self) {
101109
let wrapper =

0 commit comments

Comments
 (0)