Skip to content

Commit 1c8d3e2

Browse files
committed
[ISSUE #1362]🎨Refactor QueryAssignmentProcessor
1 parent fcee468 commit 1c8d3e2

File tree

4 files changed

+92
-27
lines changed

4 files changed

+92
-27
lines changed

rocketmq-broker/src/broker_runtime.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use crate::processor::default_pull_message_result_handler::DefaultPullMessageRes
7272
use crate::processor::end_transaction_processor::EndTransactionProcessor;
7373
use crate::processor::pull_message_processor::PullMessageProcessor;
7474
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
75+
use crate::processor::query_assignment_processor::QueryAssignmentProcessor;
7576
use crate::processor::query_message_processor::QueryMessageProcessor;
7677
use crate::processor::reply_message_processor::ReplyMessageProcessor;
7778
use crate::processor::send_message_processor::SendMessageProcessor;
@@ -537,7 +538,9 @@ impl BrokerRuntime {
537538
self.subscription_group_manager.clone(),
538539
)),
539540
consumer_manage_processor: ArcMut::new(consumer_manage_processor),
540-
query_assignment_processor: Default::default(),
541+
query_assignment_processor: ArcMut::new(QueryAssignmentProcessor::new(
542+
self.message_store_config.clone(),
543+
)),
541544
query_message_processor: ArcMut::new(query_message_processor),
542545
end_transaction_processor: ArcMut::new(EndTransactionProcessor::new(
543546
self.message_store_config.clone(),

rocketmq-broker/src/load_balance/message_request_mode_manager.rs

+15-17
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@ use cheetah_string::CheetahString;
2121
use rocketmq_common::common::config_manager::ConfigManager;
2222
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2323
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
24-
use rocketmq_rust::ArcMut;
2524
use rocketmq_store::config::message_store_config::MessageStoreConfig;
2625
use tracing::info;
2726

2827
use crate::broker_path_config_helper;
2928

3029
pub(crate) struct MessageRequestModeManager {
31-
message_store_config: ArcMut<MessageStoreConfig>,
30+
message_store_config: Arc<MessageStoreConfig>,
3231
message_request_mode_map: Arc<
3332
parking_lot::Mutex<
3433
HashMap<
@@ -40,7 +39,7 @@ pub(crate) struct MessageRequestModeManager {
4039
}
4140

4241
impl MessageRequestModeManager {
43-
pub fn new(message_store_config: ArcMut<MessageStoreConfig>) -> Self {
42+
pub fn new(message_store_config: Arc<MessageStoreConfig>) -> Self {
4443
Self {
4544
message_store_config,
4645
message_request_mode_map: Arc::new(parking_lot::Mutex::new(HashMap::new())),
@@ -113,14 +112,13 @@ mod tests {
113112
use cheetah_string::CheetahString;
114113
use rocketmq_common::common::message::message_enum::MessageRequestMode;
115114
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
116-
use rocketmq_rust::ArcMut;
117115
use rocketmq_store::config::message_store_config::MessageStoreConfig;
118116

119117
use super::*;
120118

121119
#[test]
122120
fn set_message_request_mode_adds_entry() {
123-
let message_store_config = ArcMut::new(MessageStoreConfig::default());
121+
let message_store_config = Arc::new(MessageStoreConfig::default());
124122
let manager = MessageRequestModeManager::new(message_store_config);
125123
let topic = CheetahString::from("test_topic");
126124
let consumer_group = CheetahString::from("test_group");
@@ -138,7 +136,7 @@ mod tests {
138136

139137
#[test]
140138
fn get_message_request_mode_returns_none_for_nonexistent_entry() {
141-
let message_store_config = ArcMut::new(MessageStoreConfig::default());
139+
let message_store_config = Arc::new(MessageStoreConfig::default());
142140
let manager = MessageRequestModeManager::new(message_store_config);
143141
let topic = CheetahString::from("nonexistent_topic");
144142
let consumer_group = CheetahString::from("nonexistent_group");
@@ -150,7 +148,7 @@ mod tests {
150148

151149
#[test]
152150
fn encode_pretty_returns_pretty_json() {
153-
let message_store_config = ArcMut::new(MessageStoreConfig::default());
151+
let message_store_config = Arc::new(MessageStoreConfig::default());
154152
let manager = MessageRequestModeManager::new(message_store_config);
155153
let topic = CheetahString::from("test_topic");
156154
let consumer_group = CheetahString::from("test_group");
@@ -169,18 +167,18 @@ mod tests {
169167

170168
#[test]
171169
fn decode_populates_message_request_mode_map() {
172-
let message_store_config = ArcMut::new(MessageStoreConfig::default());
170+
let message_store_config = Arc::new(MessageStoreConfig::default());
173171
let manager = MessageRequestModeManager::new(message_store_config);
174172
let json = r#"{
175-
"test_topic": {
176-
"test_group": {
177-
"topic": "test_topic",
178-
"consumerGroup": "test_group",
179-
"mode": "PULL",
180-
"popShareQueueNum": 0
181-
}
182-
}
183-
}"#;
173+
"test_topic": {
174+
"test_group": {
175+
"topic": "test_topic",
176+
"consumerGroup": "test_group",
177+
"mode": "PULL",
178+
"popShareQueueNum": 0
179+
}
180+
}
181+
}"#;
184182

185183
manager.decode(json);
186184
let result = manager.get_message_request_mode(

rocketmq-broker/src/processor/query_assignment_processor.rs

+72-8
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,83 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use crate::load_balance::message_request_mode_manager::MessageRequestModeManager;
18+
use cheetah_string::CheetahString;
19+
use rocketmq_client_rust::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
20+
use rocketmq_client_rust::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
21+
use rocketmq_client_rust::consumer::rebalance_strategy::allocate_message_queue_averagely_by_circle::AllocateMessageQueueAveragelyByCircle;
22+
use rocketmq_common::common::config_manager::ConfigManager;
23+
use rocketmq_remoting::code::request_code::RequestCode;
24+
use rocketmq_remoting::net::channel::Channel;
25+
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
26+
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
27+
use rocketmq_store::config::message_store_config::MessageStoreConfig;
28+
use std::collections::HashMap;
29+
use std::sync::Arc;
1730

18-
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
19-
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
31+
pub struct QueryAssignmentProcessor {
32+
message_request_mode_manager: MessageRequestModeManager,
33+
load_strategy: HashMap<CheetahString, Arc<dyn AllocateMessageQueueStrategy>>,
34+
message_store_config: Arc<MessageStoreConfig>,
35+
}
2036

21-
#[derive(Default)]
22-
pub struct QueryAssignmentProcessor {}
37+
impl QueryAssignmentProcessor {
38+
pub fn new(message_store_config: Arc<MessageStoreConfig>) -> Self {
39+
let allocate_message_queue_averagely: Arc<dyn AllocateMessageQueueStrategy> =
40+
Arc::new(AllocateMessageQueueAveragely);
41+
let allocate_message_queue_averagely_by_circle: Arc<dyn AllocateMessageQueueStrategy> =
42+
Arc::new(AllocateMessageQueueAveragelyByCircle);
43+
let mut load_strategy = HashMap::new();
44+
load_strategy.insert(
45+
CheetahString::from_static_str(allocate_message_queue_averagely.get_name()),
46+
allocate_message_queue_averagely,
47+
);
48+
load_strategy.insert(
49+
CheetahString::from_static_str(allocate_message_queue_averagely_by_circle.get_name()),
50+
allocate_message_queue_averagely_by_circle,
51+
);
52+
let manager = MessageRequestModeManager::new(message_store_config.clone());
53+
let _ = manager.load();
54+
Self {
55+
message_request_mode_manager: manager,
56+
load_strategy,
57+
message_store_config,
58+
}
59+
}
60+
}
2361

2462
impl QueryAssignmentProcessor {
25-
fn process_request(
26-
&self,
63+
pub async fn process_request(
64+
&mut self,
65+
channel: Channel,
66+
ctx: ConnectionHandlerContext,
67+
request_code: RequestCode,
68+
request: RemotingCommand,
69+
) -> Option<RemotingCommand> {
70+
match request_code {
71+
RequestCode::QueryAssignment => self.query_assignment(channel, ctx, request).await,
72+
RequestCode::SetMessageRequestMode => {
73+
self.set_message_request_mode(channel, ctx, request).await
74+
}
75+
_ => None,
76+
}
77+
}
78+
79+
async fn query_assignment(
80+
&mut self,
81+
_channel: Channel,
82+
_ctx: ConnectionHandlerContext,
83+
_request: RemotingCommand,
84+
) -> Option<RemotingCommand> {
85+
unimplemented!()
86+
}
87+
88+
async fn set_message_request_mode(
89+
&mut self,
90+
_channel: Channel,
2791
_ctx: ConnectionHandlerContext,
2892
_request: RemotingCommand,
29-
) -> RemotingCommand {
30-
todo!()
93+
) -> Option<RemotingCommand> {
94+
unimplemented!()
3195
}
3296
}

rocketmq-client/src/consumer/rebalance_strategy.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717
pub mod allocate_message_queue_averagely;
18-
mod allocate_message_queue_averagely_by_circle;
18+
pub mod allocate_message_queue_averagely_by_circle;
1919

2020
use std::collections::HashSet;
2121

0 commit comments

Comments
 (0)