Skip to content

Commit b0d2ea4

Browse files
committed
[ISSUE #1232]Optimize DefaultRequestProcessor code
1 parent bd2b04f commit b0d2ea4

File tree

4 files changed

+41
-56
lines changed

4 files changed

+41
-56
lines changed

rocketmq-namesrv/src/bootstrap.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ impl NameServerRuntime {
101101
Duration::from_secs(5),
102102
);
103103
NameServerRequestProcessor {
104-
client_request_processor: Arc::new(client_request_processor),
105-
default_request_processor: Arc::new(default_request_processor),
104+
client_request_processor: ArcMut::new(client_request_processor),
105+
default_request_processor: ArcMut::new(default_request_processor),
106106
}
107107
}
108108
}

rocketmq-namesrv/src/processor.rs

+13-20
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::sync::Arc;
19-
2018
use rocketmq_remoting::code::request_code::RequestCode;
2119
use rocketmq_remoting::net::channel::Channel;
2220
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
2321
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
2422
use rocketmq_remoting::runtime::processor::RequestProcessor;
2523
use rocketmq_remoting::Result;
24+
use rocketmq_rust::ArcMut;
2625
use tracing::info;
2726

2827
pub use self::client_request_processor::ClientRequestProcessor;
@@ -31,18 +30,10 @@ use crate::processor::default_request_processor::DefaultRequestProcessor;
3130
mod client_request_processor;
3231
pub mod default_request_processor;
3332

33+
#[derive(Clone)]
3434
pub struct NameServerRequestProcessor {
35-
pub(crate) client_request_processor: Arc<ClientRequestProcessor>,
36-
pub(crate) default_request_processor: Arc<DefaultRequestProcessor>,
37-
}
38-
39-
impl Clone for NameServerRequestProcessor {
40-
fn clone(&self) -> Self {
41-
Self {
42-
client_request_processor: self.client_request_processor.clone(),
43-
default_request_processor: self.default_request_processor.clone(),
44-
}
45-
}
35+
pub(crate) client_request_processor: ArcMut<ClientRequestProcessor>,
36+
pub(crate) default_request_processor: ArcMut<DefaultRequestProcessor>,
4637
}
4738

4839
impl RequestProcessor for NameServerRequestProcessor {
@@ -53,14 +44,16 @@ impl RequestProcessor for NameServerRequestProcessor {
5344
request: RemotingCommand,
5445
) -> Result<Option<RemotingCommand>> {
5546
let request_code = RequestCode::from(request.code());
56-
info!("process_request: {:?}", request_code);
47+
info!("Name server Received request code: {:?}", request_code);
5748
let result = match request_code {
58-
RequestCode::GetRouteinfoByTopic => self
59-
.client_request_processor
60-
.process_request(channel, ctx, request),
61-
_ => self
62-
.default_request_processor
63-
.process_request(channel, ctx, request),
49+
RequestCode::GetRouteinfoByTopic => {
50+
self.client_request_processor
51+
.process_request(channel, ctx, request_code, request)
52+
}
53+
_ => {
54+
self.default_request_processor
55+
.process_request(channel, ctx, request_code, request)
56+
}
6457
};
6558
Ok(result)
6659
}

rocketmq-namesrv/src/processor/client_request_processor.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::time::Duration;
2323
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
2424
use rocketmq_common::common::FAQUrl;
2525
use rocketmq_common::TimeUtils;
26+
use rocketmq_remoting::code::request_code::RequestCode;
2627
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
2728
use rocketmq_remoting::code::response_code::ResponseCode;
2829
use rocketmq_remoting::net::channel::Channel;
@@ -118,9 +119,10 @@ impl ClientRequestProcessor {
118119

119120
impl ClientRequestProcessor {
120121
pub fn process_request(
121-
&self,
122+
&mut self,
122123
_channel: Channel,
123124
_ctx: ConnectionHandlerContext,
125+
_request_code: RequestCode,
124126
request: RemotingCommand,
125127
) -> Option<RemotingCommand> {
126128
Some(self.get_route_info_by_topic(request))

rocketmq-namesrv/src/processor/default_request_processor.rs

+23-33
Original file line numberDiff line numberDiff line change
@@ -54,61 +54,51 @@ use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
5454
use rocketmq_remoting::protocol::DataVersion;
5555
use rocketmq_remoting::protocol::RemotingSerializable;
5656
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
57-
use tracing::info;
5857
use tracing::warn;
5958

6059
use crate::route::route_info_manager::RouteInfoManager;
6160
use crate::KVConfigManager;
6261

63-
#[derive(Clone)]
6462
pub struct DefaultRequestProcessor {
6563
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
6664
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
6765
}
6866

6967
impl DefaultRequestProcessor {
7068
pub fn process_request(
71-
&self,
69+
&mut self,
7270
channel: Channel,
7371
_ctx: ConnectionHandlerContext,
72+
request_code: RequestCode,
7473
request: RemotingCommand,
7574
) -> Option<RemotingCommand> {
76-
let code = request.code();
77-
let broker_request_code = RequestCode::value_of(code);
78-
info!(
79-
"Received request code:{}-{:?}",
80-
code,
81-
broker_request_code.as_ref()
82-
);
83-
let response = match broker_request_code {
84-
Some(RequestCode::PutKvConfig) => self.put_kv_config(request),
85-
Some(RequestCode::GetKvConfig) => self.get_kv_config(request),
86-
Some(RequestCode::DeleteKvConfig) => self.delete_kv_config(request),
87-
Some(RequestCode::QueryDataVersion) => self.query_broker_topic_config(request),
75+
let response = match request_code {
76+
RequestCode::PutKvConfig => self.put_kv_config(request),
77+
RequestCode::GetKvConfig => self.get_kv_config(request),
78+
RequestCode::DeleteKvConfig => self.delete_kv_config(request),
79+
RequestCode::QueryDataVersion => self.query_broker_topic_config(request),
8880
//handle register broker
89-
Some(RequestCode::RegisterBroker) => {
81+
RequestCode::RegisterBroker => {
9082
self.process_register_broker(channel.remote_address(), request)
9183
}
92-
Some(RequestCode::UnregisterBroker) => self.process_unregister_broker(request),
93-
Some(RequestCode::BrokerHeartbeat) => self.process_broker_heartbeat(request),
94-
Some(RequestCode::GetBrokerMemberGroup) => self.get_broker_member_group(request),
84+
RequestCode::UnregisterBroker => self.process_unregister_broker(request),
85+
RequestCode::BrokerHeartbeat => self.process_broker_heartbeat(request),
86+
RequestCode::GetBrokerMemberGroup => self.get_broker_member_group(request),
9587
//handle get broker cluster info
96-
Some(RequestCode::GetBrokerClusterInfo) => self.get_broker_cluster_info(request),
97-
Some(RequestCode::WipeWritePermOfBroker) => self.wipe_write_perm_of_broker(request),
98-
Some(RequestCode::AddWritePermOfBroker) => self.add_write_perm_of_broker(request),
99-
Some(RequestCode::GetAllTopicListFromNameserver) => {
88+
RequestCode::GetBrokerClusterInfo => self.get_broker_cluster_info(request),
89+
RequestCode::WipeWritePermOfBroker => self.wipe_write_perm_of_broker(request),
90+
RequestCode::AddWritePermOfBroker => self.add_write_perm_of_broker(request),
91+
RequestCode::GetAllTopicListFromNameserver => {
10092
self.get_all_topic_list_from_nameserver(request)
10193
}
102-
Some(RequestCode::DeleteTopicInNamesrv) => self.delete_topic_in_name_srv(request),
103-
Some(RequestCode::RegisterTopicInNamesrv) => self.register_topic_to_name_srv(request),
104-
Some(RequestCode::GetKvlistByNamespace) => self.get_kv_list_by_namespace(request),
105-
Some(RequestCode::GetTopicsByCluster) => self.get_topics_by_cluster(request),
106-
Some(RequestCode::GetSystemTopicListFromNs) => {
107-
self.get_system_topic_list_from_ns(request)
108-
}
109-
Some(RequestCode::GetUnitTopicList) => self.get_unit_topic_list(request),
110-
Some(RequestCode::GetHasUnitSubTopicList) => self.get_has_unit_sub_topic_list(request),
111-
Some(RequestCode::GetHasUnitSubUnunitTopicList) => {
94+
RequestCode::DeleteTopicInNamesrv => self.delete_topic_in_name_srv(request),
95+
RequestCode::RegisterTopicInNamesrv => self.register_topic_to_name_srv(request),
96+
RequestCode::GetKvlistByNamespace => self.get_kv_list_by_namespace(request),
97+
RequestCode::GetTopicsByCluster => self.get_topics_by_cluster(request),
98+
RequestCode::GetSystemTopicListFromNs => self.get_system_topic_list_from_ns(request),
99+
RequestCode::GetUnitTopicList => self.get_unit_topic_list(request),
100+
RequestCode::GetHasUnitSubTopicList => self.get_has_unit_sub_topic_list(request),
101+
RequestCode::GetHasUnitSubUnunitTopicList => {
112102
self.get_has_unit_sub_un_unit_topic_list(request)
113103
}
114104
_ => RemotingCommand::create_response_command_with_code(

0 commit comments

Comments
 (0)