From 72e4899e1b93308ca986ed94116704b994d33069 Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 21 Nov 2024 00:00:33 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1249]=F0=9F=8D=BBOptimize=20RouteInfo?= =?UTF-8?q?Manager=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../processor/default_request_processor.rs | 25 ++++---- .../src/route/route_info_manager.rs | 57 +++++++------------ 2 files changed, 33 insertions(+), 49 deletions(-) diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index 39895054..4051d525 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -180,15 +180,14 @@ impl DefaultRequestProcessor { ); let mut rim_write = self.route_info_manager.write(); rim_write.update_broker_info_update_timestamp( - request_header.cluster_name.as_str(), - request_header.broker_addr.as_str(), + request_header.cluster_name.clone(), + request_header.broker_addr.clone(), ); let mut command = RemotingCommand::create_response_command() .set_command_custom_header(QueryDataVersionResponseHeader::new(changed)); - if let Some(value) = rim_write.query_broker_topic_config( - request_header.cluster_name.as_str(), - request_header.broker_addr.as_str(), - ) { + if let Some(value) = rim_write + .query_broker_topic_config(request_header.cluster_name, request_header.broker_addr) + { command = command.set_body(value.encode()); } drop(rim_write); @@ -311,8 +310,8 @@ impl DefaultRequestProcessor { self.route_info_manager .write() .update_broker_info_update_timestamp( - request_header.cluster_name.as_str(), - request_header.broker_addr.as_str(), + request_header.cluster_name, + request_header.broker_addr, ); RemotingCommand::create_response_command() } @@ -322,10 +321,10 @@ impl DefaultRequestProcessor { .decode_command_custom_header::() .unwrap(); - let broker_member_group = self.route_info_manager.write().get_broker_member_group( - request_header.cluster_name.as_str(), - request_header.broker_name.as_str(), - ); + let broker_member_group = self + .route_info_manager + .write() + .get_broker_member_group(&request_header.cluster_name, &request_header.broker_name); let response_body = GetBrokerMemberGroupResponseBody { broker_member_group, }; @@ -397,7 +396,7 @@ impl DefaultRequestProcessor { if !topic_route_data.queue_datas.is_empty() { self.route_info_manager .write() - .register_topic(request_header.topic.as_str(), topic_route_data.queue_datas) + .register_topic(request_header.topic, topic_route_data.queue_datas) } } RemotingCommand::create_response_command() diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index 56f70d0a..fa06277c 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -480,10 +480,10 @@ impl RouteInfoManager { pub(crate) fn is_topic_config_changed( &mut self, - cluster_name: &str, - broker_addr: &str, + cluster_name: &CheetahString, + broker_addr: &CheetahString, data_version: &DataVersion, - broker_name: &str, + broker_name: &CheetahString, topic: &str, ) -> bool { let is_change = @@ -504,11 +504,11 @@ impl RouteInfoManager { pub(crate) fn is_broker_topic_config_changed( &self, - cluster_name: &str, - broker_addr: &str, + cluster_name: &CheetahString, + broker_addr: &CheetahString, data_version: &DataVersion, ) -> bool { - let option = self.query_broker_topic_config(cluster_name, broker_addr); + let option = self.query_broker_topic_config(cluster_name.clone(), broker_addr.clone()); if let Some(pre) = option { if pre != data_version { return true; @@ -519,10 +519,10 @@ impl RouteInfoManager { pub(crate) fn query_broker_topic_config( &self, - cluster_name: &str, - broker_addr: &str, + cluster_name: CheetahString, + broker_addr: CheetahString, ) -> Option<&DataVersion> { - let info = BrokerAddrInfo::new(cluster_name.to_string(), broker_addr.to_string()); + let info = BrokerAddrInfo::new(cluster_name, broker_addr); let pre = self.broker_live_table.get(info.as_ref()); if let Some(live_info) = pre { return Some(live_info.data_version()); @@ -638,8 +638,8 @@ impl RouteInfoManager { pub(crate) fn update_broker_info_update_timestamp( &mut self, - cluster_name: impl Into, - broker_addr: impl Into, + cluster_name: CheetahString, + broker_addr: CheetahString, ) { let broker_addr_info = BrokerAddrInfo::new(cluster_name, broker_addr); if let Some(value) = self.broker_live_table.get_mut(broker_addr_info.as_ref()) { @@ -649,13 +649,10 @@ impl RouteInfoManager { pub(crate) fn get_broker_member_group( &mut self, - cluster_name: &str, - broker_name: &str, + cluster_name: &CheetahString, + broker_name: &CheetahString, ) -> Option { - let mut group_member = BrokerMemberGroup::new( - CheetahString::from_slice(cluster_name), - CheetahString::from_slice(broker_name), - ); + let mut group_member = BrokerMemberGroup::new(cluster_name.clone(), broker_name.clone()); if let Some(broker_data) = self.broker_addr_table.get(broker_name) { let map = broker_data.broker_addrs().clone(); for (key, value) in map { @@ -737,21 +734,15 @@ impl RouteInfoManager { } } - pub(crate) fn register_topic( - &mut self, - topic: impl Into, - queue_data_vec: Vec, - ) { + pub(crate) fn register_topic(&mut self, topic: CheetahString, queue_data_vec: Vec) { if queue_data_vec.is_empty() { return; } - let topic_inner = topic.into(); - if !self.topic_queue_table.contains_key(topic_inner.as_str()) { - self.topic_queue_table - .insert(topic_inner.clone(), HashMap::new()); + if !self.topic_queue_table.contains_key(&topic) { + self.topic_queue_table.insert(topic.clone(), HashMap::new()); } - let queue_data_map = self.topic_queue_table.get_mut(&topic_inner).unwrap(); + let queue_data_map = self.topic_queue_table.get_mut(&topic).unwrap(); let vec_length = queue_data_vec.len(); for queue_data in queue_data_vec { if !self @@ -760,7 +751,7 @@ impl RouteInfoManager { { warn!( "Register topic contains illegal broker, {}, {:?}", - topic_inner, queue_data + topic, queue_data ); return; } @@ -768,15 +759,9 @@ impl RouteInfoManager { } if queue_data_map.len() > vec_length { - info!( - "Topic route already exist.{}, {:?}", - &topic_inner, queue_data_map - ) + info!("Topic route already exist.{}, {:?}", topic, queue_data_map) } else { - info!( - "Register topic route:{}, {:?}", - &topic_inner, queue_data_map - ) + info!("Register topic route:{}, {:?}", topic, queue_data_map) } }