Skip to content

Commit 4a094be

Browse files
committed
[ISSUE #1239]🔥Optimize KVConfigManager code⚡️
1 parent 3998417 commit 4a094be

File tree

6 files changed

+175
-109
lines changed

6 files changed

+175
-109
lines changed

rocketmq-namesrv/src/bootstrap.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ pub struct Builder {
4949
}
5050

5151
struct NameServerRuntime {
52-
name_server_config: Arc<NamesrvConfig>,
52+
name_server_config: ArcMut<NamesrvConfig>,
5353
tokio_client_config: Arc<TokioClientConfig>,
5454
server_config: Arc<ServerConfig>,
5555
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
56-
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
56+
kvconfig_manager: KVConfigManager,
5757
name_server_runtime: Option<RocketMQRuntime>,
5858
remoting_client: ArcMut<RocketmqDefaultClient>,
5959
}
@@ -159,7 +159,7 @@ impl Builder {
159159
}
160160

161161
pub fn build(self) -> NameServerBootstrap {
162-
let name_server_config = Arc::new(self.name_server_config.unwrap());
162+
let name_server_config = ArcMut::new(self.name_server_config.unwrap_or_default());
163163
let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread");
164164
let tokio_client_config = Arc::new(TokioClientConfig::default());
165165
let remoting_client = ArcMut::new(RocketmqDefaultClient::new(
@@ -176,9 +176,7 @@ impl Builder {
176176
name_server_config.clone(),
177177
remoting_client.clone(),
178178
))),
179-
kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new(
180-
name_server_config,
181-
))),
179+
kvconfig_manager: KVConfigManager::new(name_server_config),
182180
name_server_runtime: Some(runtime),
183181
remoting_client,
184182
},

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

+132-66
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,29 @@ use std::collections::HashMap;
1818
use std::sync::Arc;
1919

2020
use cheetah_string::CheetahString;
21+
use parking_lot::RwLock;
2122
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
2223
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2324
use rocketmq_common::FileUtils;
2425
use rocketmq_remoting::protocol::body::kv_table::KVTable;
2526
use rocketmq_remoting::protocol::RemotingSerializable;
27+
use rocketmq_rust::ArcMut;
2628
use tracing::error;
2729
use tracing::info;
2830

2931
use crate::kvconfig::KVConfigSerializeWrapper;
3032

31-
#[derive(Debug, Clone)]
33+
#[derive(Clone)]
3234
pub struct KVConfigManager {
33-
pub(crate) config_table: HashMap<
34-
CheetahString, /* Namespace */
35-
HashMap<CheetahString /* Key */, CheetahString /* Value */>,
35+
pub(crate) config_table: Arc<
36+
RwLock<
37+
HashMap<
38+
CheetahString, /* Namespace */
39+
HashMap<CheetahString /* Key */, CheetahString /* Value */>,
40+
>,
41+
>,
3642
>,
37-
38-
pub(crate) namesrv_config: Arc<NamesrvConfig>,
43+
pub(crate) namesrv_config: ArcMut<NamesrvConfig>,
3944
}
4045

4146
impl KVConfigManager {
@@ -48,9 +53,9 @@ impl KVConfigManager {
4853
/// # Returns
4954
///
5055
/// A new `KVConfigManager` instance.
51-
pub fn new(namesrv_config: Arc<NamesrvConfig>) -> KVConfigManager {
56+
pub fn new(namesrv_config: ArcMut<NamesrvConfig>) -> KVConfigManager {
5257
KVConfigManager {
53-
config_table: HashMap::new(),
58+
config_table: Arc::new(RwLock::new(HashMap::new())),
5459
namesrv_config,
5560
}
5661
}
@@ -62,8 +67,8 @@ impl KVConfigManager {
6267
/// A reference to the configuration table.
6368
pub fn get_config_table(
6469
&self,
65-
) -> &HashMap<CheetahString, HashMap<CheetahString, CheetahString>> {
66-
&self.config_table
70+
) -> HashMap<CheetahString, HashMap<CheetahString, CheetahString>> {
71+
self.config_table.read().clone()
6772
}
6873

6974
/// Gets a reference to the Namesrv configuration.
@@ -83,18 +88,18 @@ impl KVConfigManager {
8388
if let Ok(content) = result {
8489
let wrapper =
8590
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
86-
if let Some(ref config_table) = wrapper.config_table {
87-
for (namespace, config) in config_table {
88-
self.config_table.insert(namespace.clone(), config.clone());
89-
}
91+
if let Some(config_table) = wrapper.config_table {
92+
let mut table = self.config_table.write();
93+
table.extend(config_table);
9094
info!("load KV config success");
9195
}
9296
}
9397
}
9498

9599
/// Persists the current key-value configurations to a file.
96100
pub fn persist(&mut self) {
97-
let wrapper = KVConfigSerializeWrapper::new_with_config_table(self.config_table.clone());
101+
let wrapper =
102+
KVConfigSerializeWrapper::new_with_config_table(self.config_table.write().clone());
98103
let content = serde_json::to_string(&wrapper).unwrap();
99104

100105
let result = FileUtils::string_to_file(
@@ -109,96 +114,157 @@ impl KVConfigManager {
109114
/// Adds or updates a key-value configuration.
110115
pub fn put_kv_config(
111116
&mut self,
112-
namespace: impl Into<CheetahString>,
113-
key: impl Into<CheetahString>,
114-
value: impl Into<CheetahString>,
117+
namespace: CheetahString,
118+
key: CheetahString,
119+
value: CheetahString,
115120
) {
116-
let namespace_inner = namespace.into();
117-
if !self.config_table.contains_key(namespace_inner.as_str()) {
118-
self.config_table
119-
.insert(namespace_inner.clone(), HashMap::new());
120-
}
121-
122-
let key = key.into();
123-
let value = value.into();
124-
let pre_value = self
125-
.config_table
126-
.get_mut(namespace_inner.as_str())
127-
.unwrap()
128-
.insert(key.clone(), value.clone());
121+
let mut config_table = self.config_table.write();
122+
let namespace_entry = config_table.entry(namespace.clone()).or_default();
123+
let pre_value = namespace_entry.insert(key.clone(), value.clone());
129124
match pre_value {
130125
None => {
131126
info!(
132127
"putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
133-
namespace_inner, key, value
128+
namespace, key, value
134129
)
135130
}
136131
Some(_) => {
137132
info!(
138133
"putKVConfig update config item, Namespace: {} Key: {} Value: {}",
139-
namespace_inner, key, value
134+
namespace, key, value
140135
)
141136
}
142137
}
138+
drop(config_table);
143139
self.persist();
144140
}
145141

146142
/// Deletes a key-value configuration.
147-
pub fn delete_kv_config(&mut self, namespace: impl Into<String>, key: impl Into<String>) {
148-
let namespace_inner = namespace.into();
149-
if !self.config_table.contains_key(namespace_inner.as_str()) {
143+
pub fn delete_kv_config(&mut self, namespace: &CheetahString, key: &CheetahString) {
144+
let mut config_table = self.config_table.write();
145+
if !config_table.contains_key(namespace) {
150146
return;
151147
}
152148

153-
let key = key.into();
154-
let pre_value = self
155-
.config_table
156-
.get_mut(namespace_inner.as_str())
157-
.unwrap()
158-
.remove(key.as_str());
149+
let pre_value = config_table.get_mut(namespace).unwrap().remove(key);
159150
match pre_value {
160151
None => {}
161152
Some(value) => {
162153
info!(
163154
"deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
164-
namespace_inner, key, value
155+
namespace, key, value
165156
)
166157
}
167158
}
159+
drop(config_table);
168160
self.persist();
169161
}
170162

171163
/// Gets the key-value list for a specific namespace.
172-
pub fn get_kv_list_by_namespace(
173-
&mut self,
174-
namespace: impl Into<CheetahString>,
175-
) -> Option<Vec<u8>> {
176-
let namespace_inner = namespace.into();
177-
match self.config_table.get(namespace_inner.as_str()) {
178-
None => None,
179-
Some(kv_table) => {
180-
let table = KVTable {
181-
table: kv_table.clone(),
182-
};
183-
Some(table.encode())
184-
}
185-
}
164+
pub fn get_kv_list_by_namespace(&self, namespace: &CheetahString) -> Option<Vec<u8>> {
165+
let config_table = self.config_table.read();
166+
config_table.get(namespace).map(|kv_table| {
167+
let table = KVTable {
168+
table: kv_table.clone(),
169+
};
170+
table.encode()
171+
})
186172
}
187173

188174
// Gets the value for a specific key in a namespace.
189175
pub fn get_kvconfig(
190176
&self,
191-
namespace: impl Into<CheetahString>,
192-
key: impl Into<CheetahString>,
177+
namespace: &CheetahString,
178+
key: &CheetahString,
193179
) -> Option<CheetahString> {
194-
match self.config_table.get(namespace.into().as_str()) {
180+
let config_table = self.config_table.read();
181+
match config_table.get(namespace) {
195182
None => None,
196-
Some(kv_table) => {
197-
if let Some(value) = kv_table.get(key.into().as_str()) {
198-
return Some(value.clone());
199-
}
200-
None
201-
}
183+
Some(kv_table) => kv_table.get(key).cloned(),
202184
}
203185
}
204186
}
187+
188+
#[cfg(test)]
189+
mod tests {
190+
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
191+
use rocketmq_rust::ArcMut;
192+
193+
use super::*;
194+
195+
fn create_kv_config_manager() -> KVConfigManager {
196+
let namesrv_config = ArcMut::new(NamesrvConfig::default());
197+
KVConfigManager::new(namesrv_config)
198+
}
199+
200+
#[test]
201+
fn new_kv_config_manager_initializes_empty_config_table() {
202+
let manager = create_kv_config_manager();
203+
assert!(manager.get_config_table().is_empty());
204+
}
205+
206+
#[test]
207+
fn put_kv_config_creates_new_entry() {
208+
let mut manager = create_kv_config_manager();
209+
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
210+
let config_table = manager.get_config_table();
211+
assert_eq!(config_table["namespace"]["key"], "value");
212+
}
213+
214+
#[test]
215+
fn put_kv_config_updates_existing_entry() {
216+
let mut manager = create_kv_config_manager();
217+
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
218+
manager.put_kv_config("namespace".into(), "key".into(), "new_value".into());
219+
let config_table = manager.get_config_table();
220+
assert_eq!(config_table["namespace"]["key"], "new_value");
221+
}
222+
223+
#[test]
224+
fn delete_kv_config_removes_entry() {
225+
let mut manager = create_kv_config_manager();
226+
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
227+
manager.delete_kv_config(&"namespace".into(), &"key".into());
228+
let config_table = manager.get_config_table();
229+
assert!(config_table["namespace"].get("key").is_none());
230+
}
231+
232+
#[test]
233+
fn delete_kv_config_does_nothing_if_key_does_not_exist() {
234+
let mut manager = create_kv_config_manager();
235+
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
236+
manager.delete_kv_config(&"namespace".into(), &"non_existent_key".into());
237+
let config_table = manager.get_config_table();
238+
assert_eq!(config_table["namespace"]["key"], "value");
239+
}
240+
241+
#[test]
242+
fn get_kv_list_by_namespace_returns_encoded_list() {
243+
let mut manager = create_kv_config_manager();
244+
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
245+
let kv_list = manager.get_kv_list_by_namespace(&"namespace".into());
246+
assert!(kv_list.is_some());
247+
}
248+
249+
#[test]
250+
fn get_kv_list_by_namespace_returns_none_if_namespace_does_not_exist() {
251+
let manager = create_kv_config_manager();
252+
let kv_list = manager.get_kv_list_by_namespace(&"non_existent_namespace".into());
253+
assert!(kv_list.is_none());
254+
}
255+
256+
#[test]
257+
fn get_kvconfig_returns_value_if_key_exists() {
258+
let mut manager = create_kv_config_manager();
259+
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
260+
let value = manager.get_kvconfig(&"namespace".into(), &"key".into());
261+
assert_eq!(value, Some("value".into()));
262+
}
263+
264+
#[test]
265+
fn get_kvconfig_returns_none_if_key_does_not_exist() {
266+
let manager = create_kv_config_manager();
267+
let value = manager.get_kvconfig(&"namespace".into(), &"non_existent_key".into());
268+
assert!(value.is_none());
269+
}
270+
}

rocketmq-namesrv/src/processor.rs

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use crate::processor::default_request_processor::DefaultRequestProcessor;
3030
mod client_request_processor;
3131
pub mod default_request_processor;
3232

33+
const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
34+
3335
#[derive(Clone)]
3436
pub struct NameServerRequestProcessor {
3537
pub(crate) client_request_processor: ArcMut<ClientRequestProcessor>,

rocketmq-namesrv/src/processor/client_request_processor.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::atomic::Ordering;
2020
use std::sync::Arc;
2121
use std::time::Duration;
2222

23+
use cheetah_string::CheetahString;
2324
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
2425
use rocketmq_common::common::FAQUrl;
2526
use rocketmq_common::TimeUtils;
@@ -31,24 +32,26 @@ use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequ
3132
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
3233
use rocketmq_remoting::protocol::RemotingSerializable;
3334
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
35+
use rocketmq_rust::ArcMut;
3436
use tracing::warn;
3537

3638
use crate::kvconfig::kvconfig_mananger::KVConfigManager;
39+
use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
3740
use crate::route::route_info_manager::RouteInfoManager;
3841

3942
pub struct ClientRequestProcessor {
4043
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
41-
namesrv_config: Arc<NamesrvConfig>,
44+
namesrv_config: ArcMut<NamesrvConfig>,
4245
need_check_namesrv_ready: AtomicBool,
4346
startup_time_millis: u64,
44-
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
47+
kvconfig_manager: KVConfigManager,
4548
}
4649

4750
impl ClientRequestProcessor {
4851
pub fn new(
4952
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
50-
namesrv_config: Arc<NamesrvConfig>,
51-
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
53+
namesrv_config: ArcMut<NamesrvConfig>,
54+
kvconfig_manager: KVConfigManager,
5255
) -> Self {
5356
Self {
5457
route_info_manager,
@@ -94,10 +97,10 @@ impl ClientRequestProcessor {
9497
}
9598
if self.namesrv_config.order_message_enable {
9699
//get kv config
97-
let order_topic_config = self
98-
.kvconfig_manager
99-
.read()
100-
.get_kvconfig("ORDER_TOPIC_CONFIG", request_header.topic.clone());
100+
let order_topic_config = self.kvconfig_manager.get_kvconfig(
101+
&CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
102+
&request_header.topic,
103+
);
101104
topic_route_data.order_topic_conf = order_topic_config;
102105
};
103106
/*let standard_json_only = request_header.accept_standard_json_only.unwrap_or(false);

0 commit comments

Comments
 (0)