Skip to content

Commit 6e8aaac

Browse files
authored
[ISSUE ##1307]🚀Implement MessageRequestModeManager (#1308)
1 parent 5a5312a commit 6e8aaac

File tree

3 files changed

+212
-0
lines changed

3 files changed

+212
-0
lines changed

rocketmq-broker/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub(crate) mod controller;
3636
pub(crate) mod error;
3737
pub(crate) mod filter;
3838
pub(crate) mod hook;
39+
pub(crate) mod load_balance;
3940
pub(crate) mod long_polling;
4041
pub(crate) mod mqtrace;
4142
pub(crate) mod offset;

rocketmq-broker/src/load_balance.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub(crate) mod message_request_mode_manager;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::collections::HashMap;
18+
use std::sync::Arc;
19+
20+
use cheetah_string::CheetahString;
21+
use rocketmq_common::common::config_manager::ConfigManager;
22+
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
23+
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
24+
use rocketmq_rust::ArcMut;
25+
use rocketmq_store::config::message_store_config::MessageStoreConfig;
26+
use tracing::info;
27+
28+
use crate::broker_path_config_helper;
29+
30+
pub(crate) struct MessageRequestModeManager {
31+
message_store_config: ArcMut<MessageStoreConfig>,
32+
message_request_mode_map: Arc<
33+
parking_lot::Mutex<
34+
HashMap<
35+
CheetahString, /* topic */
36+
HashMap<CheetahString /* consumerGroup */, SetMessageRequestModeRequestBody>,
37+
>,
38+
>,
39+
>,
40+
}
41+
42+
impl MessageRequestModeManager {
43+
pub fn new(message_store_config: ArcMut<MessageStoreConfig>) -> Self {
44+
Self {
45+
message_store_config,
46+
message_request_mode_map: Arc::new(parking_lot::Mutex::new(HashMap::new())),
47+
}
48+
}
49+
50+
pub fn set_message_request_mode(
51+
&self,
52+
topic: CheetahString,
53+
consumer_group: CheetahString,
54+
request_body: SetMessageRequestModeRequestBody,
55+
) {
56+
let mut message_request_mode_map = self.message_request_mode_map.lock();
57+
let consumer_group_map = message_request_mode_map.entry(topic.clone()).or_default();
58+
consumer_group_map.insert(consumer_group, request_body);
59+
}
60+
61+
pub fn get_message_request_mode(
62+
&self,
63+
topic: &CheetahString,
64+
consumer_group: &CheetahString,
65+
) -> Option<SetMessageRequestModeRequestBody> {
66+
let message_request_mode_map = self.message_request_mode_map.lock();
67+
if let Some(consumer_group_map) = message_request_mode_map.get(topic) {
68+
if let Some(message_request_mode) = consumer_group_map.get(consumer_group) {
69+
return Some(message_request_mode.clone());
70+
}
71+
}
72+
None
73+
}
74+
}
75+
76+
impl ConfigManager for MessageRequestModeManager {
77+
fn config_file_path(&self) -> String {
78+
broker_path_config_helper::get_message_request_mode_path(
79+
self.message_store_config.store_path_root_dir.as_str(),
80+
)
81+
}
82+
83+
fn encode_pretty(&self, pretty_format: bool) -> String {
84+
if pretty_format {
85+
SerdeJsonUtils::to_json_pretty(&*self.message_request_mode_map.lock())
86+
.expect("encode failed")
87+
} else {
88+
SerdeJsonUtils::to_json(&*self.message_request_mode_map.lock()).expect("encode failed")
89+
}
90+
}
91+
92+
fn decode(&self, json_string: &str) {
93+
info!(
94+
"decode MessageRequestModeManager from json string:{}",
95+
json_string
96+
);
97+
if json_string.is_empty() {
98+
return;
99+
}
100+
let message_request_mode_map: HashMap<
101+
CheetahString,
102+
HashMap<CheetahString, SetMessageRequestModeRequestBody>,
103+
> = SerdeJsonUtils::from_json_str(json_string).expect("decode failed");
104+
let mut message_request_mode_map_ = self.message_request_mode_map.lock();
105+
*message_request_mode_map_ = message_request_mode_map;
106+
}
107+
}
108+
109+
#[cfg(test)]
110+
mod tests {
111+
use std::sync::Arc;
112+
113+
use cheetah_string::CheetahString;
114+
use rocketmq_common::common::message::message_enum::MessageRequestMode;
115+
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
116+
use rocketmq_rust::ArcMut;
117+
use rocketmq_store::config::message_store_config::MessageStoreConfig;
118+
119+
use super::*;
120+
121+
#[test]
122+
fn set_message_request_mode_adds_entry() {
123+
let message_store_config = ArcMut::new(MessageStoreConfig::default());
124+
let manager = MessageRequestModeManager::new(message_store_config);
125+
let topic = CheetahString::from("test_topic");
126+
let consumer_group = CheetahString::from("test_group");
127+
let request_body = SetMessageRequestModeRequestBody::default();
128+
129+
manager.set_message_request_mode(
130+
topic.clone(),
131+
consumer_group.clone(),
132+
request_body.clone(),
133+
);
134+
let result = manager.get_message_request_mode(&topic, &consumer_group);
135+
136+
//assert_eq!(result, Some(request_body));
137+
}
138+
139+
#[test]
140+
fn get_message_request_mode_returns_none_for_nonexistent_entry() {
141+
let message_store_config = ArcMut::new(MessageStoreConfig::default());
142+
let manager = MessageRequestModeManager::new(message_store_config);
143+
let topic = CheetahString::from("nonexistent_topic");
144+
let consumer_group = CheetahString::from("nonexistent_group");
145+
146+
let result = manager.get_message_request_mode(&topic, &consumer_group);
147+
148+
assert!(result.is_none());
149+
}
150+
151+
#[test]
152+
fn encode_pretty_returns_pretty_json() {
153+
let message_store_config = ArcMut::new(MessageStoreConfig::default());
154+
let manager = MessageRequestModeManager::new(message_store_config);
155+
let topic = CheetahString::from("test_topic");
156+
let consumer_group = CheetahString::from("test_group");
157+
let request_body = SetMessageRequestModeRequestBody::default();
158+
159+
manager.set_message_request_mode(
160+
topic.clone(),
161+
consumer_group.clone(),
162+
request_body.clone(),
163+
);
164+
let json = manager.encode_pretty(true);
165+
166+
assert!(json.contains("\n"));
167+
assert!(json.contains("\"test_topic\""));
168+
}
169+
170+
#[test]
171+
fn decode_populates_message_request_mode_map() {
172+
let message_store_config = ArcMut::new(MessageStoreConfig::default());
173+
let manager = MessageRequestModeManager::new(message_store_config);
174+
let json = r#"{
175+
"test_topic": {
176+
"test_group": {
177+
"topic": "test_topic",
178+
"consumerGroup": "test_group",
179+
"mode": "PULL",
180+
"popShareQueueNum": 0
181+
}
182+
}
183+
}"#;
184+
185+
manager.decode(json);
186+
let result = manager.get_message_request_mode(
187+
&CheetahString::from("test_topic"),
188+
&CheetahString::from("test_group"),
189+
);
190+
191+
assert!(result.is_some());
192+
assert_eq!(result.unwrap().mode, MessageRequestMode::Pull);
193+
}
194+
}

0 commit comments

Comments
 (0)