Skip to content

Commit 52fcc55

Browse files
authored
[ISSUE #1249]🍻Optimize RouteInfoManager logic (#1250)
1 parent b9b7d88 commit 52fcc55

File tree

2 files changed

+33
-49
lines changed

2 files changed

+33
-49
lines changed

rocketmq-namesrv/src/processor/default_request_processor.rs

+12-13
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,14 @@ impl DefaultRequestProcessor {
180180
);
181181
let mut rim_write = self.route_info_manager.write();
182182
rim_write.update_broker_info_update_timestamp(
183-
request_header.cluster_name.as_str(),
184-
request_header.broker_addr.as_str(),
183+
request_header.cluster_name.clone(),
184+
request_header.broker_addr.clone(),
185185
);
186186
let mut command = RemotingCommand::create_response_command()
187187
.set_command_custom_header(QueryDataVersionResponseHeader::new(changed));
188-
if let Some(value) = rim_write.query_broker_topic_config(
189-
request_header.cluster_name.as_str(),
190-
request_header.broker_addr.as_str(),
191-
) {
188+
if let Some(value) = rim_write
189+
.query_broker_topic_config(request_header.cluster_name, request_header.broker_addr)
190+
{
192191
command = command.set_body(value.encode());
193192
}
194193
drop(rim_write);
@@ -311,8 +310,8 @@ impl DefaultRequestProcessor {
311310
self.route_info_manager
312311
.write()
313312
.update_broker_info_update_timestamp(
314-
request_header.cluster_name.as_str(),
315-
request_header.broker_addr.as_str(),
313+
request_header.cluster_name,
314+
request_header.broker_addr,
316315
);
317316
RemotingCommand::create_response_command()
318317
}
@@ -322,10 +321,10 @@ impl DefaultRequestProcessor {
322321
.decode_command_custom_header::<GetBrokerMemberGroupRequestHeader>()
323322
.unwrap();
324323

325-
let broker_member_group = self.route_info_manager.write().get_broker_member_group(
326-
request_header.cluster_name.as_str(),
327-
request_header.broker_name.as_str(),
328-
);
324+
let broker_member_group = self
325+
.route_info_manager
326+
.write()
327+
.get_broker_member_group(&request_header.cluster_name, &request_header.broker_name);
329328
let response_body = GetBrokerMemberGroupResponseBody {
330329
broker_member_group,
331330
};
@@ -397,7 +396,7 @@ impl DefaultRequestProcessor {
397396
if !topic_route_data.queue_datas.is_empty() {
398397
self.route_info_manager
399398
.write()
400-
.register_topic(request_header.topic.as_str(), topic_route_data.queue_datas)
399+
.register_topic(request_header.topic, topic_route_data.queue_datas)
401400
}
402401
}
403402
RemotingCommand::create_response_command()

rocketmq-namesrv/src/route/route_info_manager.rs

+21-36
Original file line numberDiff line numberDiff line change
@@ -480,10 +480,10 @@ impl RouteInfoManager {
480480

481481
pub(crate) fn is_topic_config_changed(
482482
&mut self,
483-
cluster_name: &str,
484-
broker_addr: &str,
483+
cluster_name: &CheetahString,
484+
broker_addr: &CheetahString,
485485
data_version: &DataVersion,
486-
broker_name: &str,
486+
broker_name: &CheetahString,
487487
topic: &str,
488488
) -> bool {
489489
let is_change =
@@ -504,11 +504,11 @@ impl RouteInfoManager {
504504

505505
pub(crate) fn is_broker_topic_config_changed(
506506
&self,
507-
cluster_name: &str,
508-
broker_addr: &str,
507+
cluster_name: &CheetahString,
508+
broker_addr: &CheetahString,
509509
data_version: &DataVersion,
510510
) -> bool {
511-
let option = self.query_broker_topic_config(cluster_name, broker_addr);
511+
let option = self.query_broker_topic_config(cluster_name.clone(), broker_addr.clone());
512512
if let Some(pre) = option {
513513
if pre != data_version {
514514
return true;
@@ -519,10 +519,10 @@ impl RouteInfoManager {
519519

520520
pub(crate) fn query_broker_topic_config(
521521
&self,
522-
cluster_name: &str,
523-
broker_addr: &str,
522+
cluster_name: CheetahString,
523+
broker_addr: CheetahString,
524524
) -> Option<&DataVersion> {
525-
let info = BrokerAddrInfo::new(cluster_name.to_string(), broker_addr.to_string());
525+
let info = BrokerAddrInfo::new(cluster_name, broker_addr);
526526
let pre = self.broker_live_table.get(info.as_ref());
527527
if let Some(live_info) = pre {
528528
return Some(live_info.data_version());
@@ -638,8 +638,8 @@ impl RouteInfoManager {
638638

639639
pub(crate) fn update_broker_info_update_timestamp(
640640
&mut self,
641-
cluster_name: impl Into<CheetahString>,
642-
broker_addr: impl Into<CheetahString>,
641+
cluster_name: CheetahString,
642+
broker_addr: CheetahString,
643643
) {
644644
let broker_addr_info = BrokerAddrInfo::new(cluster_name, broker_addr);
645645
if let Some(value) = self.broker_live_table.get_mut(broker_addr_info.as_ref()) {
@@ -649,13 +649,10 @@ impl RouteInfoManager {
649649

650650
pub(crate) fn get_broker_member_group(
651651
&mut self,
652-
cluster_name: &str,
653-
broker_name: &str,
652+
cluster_name: &CheetahString,
653+
broker_name: &CheetahString,
654654
) -> Option<BrokerMemberGroup> {
655-
let mut group_member = BrokerMemberGroup::new(
656-
CheetahString::from_slice(cluster_name),
657-
CheetahString::from_slice(broker_name),
658-
);
655+
let mut group_member = BrokerMemberGroup::new(cluster_name.clone(), broker_name.clone());
659656
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
660657
let map = broker_data.broker_addrs().clone();
661658
for (key, value) in map {
@@ -737,21 +734,15 @@ impl RouteInfoManager {
737734
}
738735
}
739736

740-
pub(crate) fn register_topic(
741-
&mut self,
742-
topic: impl Into<CheetahString>,
743-
queue_data_vec: Vec<QueueData>,
744-
) {
737+
pub(crate) fn register_topic(&mut self, topic: CheetahString, queue_data_vec: Vec<QueueData>) {
745738
if queue_data_vec.is_empty() {
746739
return;
747740
}
748-
let topic_inner = topic.into();
749741

750-
if !self.topic_queue_table.contains_key(topic_inner.as_str()) {
751-
self.topic_queue_table
752-
.insert(topic_inner.clone(), HashMap::new());
742+
if !self.topic_queue_table.contains_key(&topic) {
743+
self.topic_queue_table.insert(topic.clone(), HashMap::new());
753744
}
754-
let queue_data_map = self.topic_queue_table.get_mut(&topic_inner).unwrap();
745+
let queue_data_map = self.topic_queue_table.get_mut(&topic).unwrap();
755746
let vec_length = queue_data_vec.len();
756747
for queue_data in queue_data_vec {
757748
if !self
@@ -760,23 +751,17 @@ impl RouteInfoManager {
760751
{
761752
warn!(
762753
"Register topic contains illegal broker, {}, {:?}",
763-
topic_inner, queue_data
754+
topic, queue_data
764755
);
765756
return;
766757
}
767758
queue_data_map.insert(queue_data.broker_name().clone(), queue_data);
768759
}
769760

770761
if queue_data_map.len() > vec_length {
771-
info!(
772-
"Topic route already exist.{}, {:?}",
773-
&topic_inner, queue_data_map
774-
)
762+
info!("Topic route already exist.{}, {:?}", topic, queue_data_map)
775763
} else {
776-
info!(
777-
"Register topic route:{}, {:?}",
778-
&topic_inner, queue_data_map
779-
)
764+
info!("Register topic route:{}, {:?}", topic, queue_data_map)
780765
}
781766
}
782767

0 commit comments

Comments
 (0)