Skip to content

[ISSUE #1269]⚡️Optimize name server DefaultRequestProcessor#process_register_broker #1274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,7 @@
topic_config_wrapper = register_broker_body
.topic_config_serialize_wrapper()
.clone();
register_broker_body
.filter_server_list()
.iter()
.for_each(|s| {
filter_server_list.push(s.clone());
});
filter_server_list = register_broker_body.filter_server_list().clone();

Check warning on line 228 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L228

Added line #L228 was not covered by tests
} else {
topic_config_wrapper = extract_register_topic_config_from_request(&request);
}
Expand Down
102 changes: 57 additions & 45 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::time::SystemTime;
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::config::TopicConfig;
Expand All @@ -28,6 +28,7 @@
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_common::common::TopicSysFlag;
use rocketmq_common::TimeUtils;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::clients::RemotingClient;
use rocketmq_remoting::code::request_code::RequestCode;
Expand Down Expand Up @@ -82,6 +83,7 @@
pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable,
pub(crate) namesrv_config: ArcMut<NamesrvConfig>,
pub(crate) remoting_client: ArcMut<RocketmqDefaultClient>,
lock: Arc<parking_lot::RwLock<()>>,
}

#[allow(private_interfaces)]
Expand All @@ -99,14 +101,15 @@
topic_queue_mapping_info_table: ArcMut::new(HashMap::new()),
namesrv_config,
remoting_client,
lock: Arc::new(Default::default()),

Check warning on line 104 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L104

Added line #L104 was not covered by tests
}
}
}

//impl register broker
impl RouteInfoManager {
pub fn register_broker(
&mut self,
&self,

Check warning on line 112 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L112

Added line #L112 was not covered by tests
cluster_name: CheetahString,
broker_addr: CheetahString,
broker_name: CheetahString,
Expand All @@ -120,35 +123,39 @@
remote_addr: SocketAddr,
) -> Option<RegisterBrokerResult> {
let mut result = RegisterBrokerResult::default();
let _write = self.lock.write();

Check warning on line 126 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L126

Added line #L126 was not covered by tests
//init or update cluster information
if !self.cluster_addr_table.contains_key(&cluster_name) {
self.cluster_addr_table
.insert(cluster_name.clone(), HashSet::new());
}
self.cluster_addr_table
.get_mut(&cluster_name)
.unwrap()
.mut_from_ref()
.entry(cluster_name.clone())
.or_default()

Check warning on line 131 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L129-L131

Added lines #L129 - L131 were not covered by tests
.insert(broker_name.clone());

let enable_acting_master_inner = enable_acting_master.unwrap_or_default();
let mut register_first =
if let Some(broker_data) = self.broker_addr_table.get_mut(&broker_name) {
broker_data.set_enable_acting_master(enable_acting_master_inner);
broker_data.set_zone_name(zone_name.clone());
false
} else {
let mut broker_data = BrokerData::new(
cluster_name.clone(),
broker_name.clone(),
HashMap::new(),
zone_name,
);
broker_data.set_enable_acting_master(enable_acting_master_inner);
self.broker_addr_table
.insert(broker_name.clone(), broker_data);
true
};
let broker_data = self.broker_addr_table.get_mut(&broker_name).unwrap();
let mut register_first = if let Some(broker_data) =
self.broker_addr_table.mut_from_ref().get_mut(&broker_name)

Check warning on line 136 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L135-L136

Added lines #L135 - L136 were not covered by tests
{
broker_data.set_enable_acting_master(enable_acting_master_inner);
broker_data.set_zone_name(zone_name.clone());
false

Check warning on line 140 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L138-L140

Added lines #L138 - L140 were not covered by tests
} else {
let mut broker_data = BrokerData::new(
cluster_name.clone(),
broker_name.clone(),
HashMap::new(),
zone_name,
);
broker_data.set_enable_acting_master(enable_acting_master_inner);
self.broker_addr_table
.mut_from_ref()
.insert(broker_name.clone(), broker_data);
true

Check warning on line 152 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L142-L152

Added lines #L142 - L152 were not covered by tests
};
let broker_data = self
.broker_addr_table
.mut_from_ref()
.get_mut(&broker_name)
Comment on lines +129 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use of mut_from_ref() undermines Rust's safety guarantees

Throughout the code (e.g., lines 129-157, 254-269, 550-587), mut_from_ref() is used to obtain mutable references from shared references. This practice violates Rust's mutability and borrowing principles, potentially causing undefined behavior and concurrency issues.

Refactor the code to use safe interior mutability patterns. Wrap each mutable data structure in Arc<RwLock<...>> and acquire the necessary locks when accessing them:

-self.broker_addr_table
-    .mut_from_ref()
+let mut broker_addr_table = self.broker_addr_table.write();
+broker_addr_table
    .get_mut(&broker_name)
    .unwrap();

Ensure that all mutations occur while holding the appropriate lock to maintain thread safety.

Also applies to: 254-269, 550-587

.unwrap();

Check warning on line 158 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L154-L158

Added lines #L154 - L158 were not covered by tests
let mut prev_min_broker_id = 0i64;
if !broker_data.broker_addrs().is_empty() {
prev_min_broker_id = broker_data.broker_addrs().keys().min().copied().unwrap();
Expand Down Expand Up @@ -186,7 +193,7 @@
&broker_addr,
new_state_version
);
self.broker_live_table.remove(
self.broker_live_table.mut_from_ref().remove(

Check warning on line 196 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L196

Added line #L196 was not covered by tests
BrokerAddrInfo::new(cluster_name.clone(), broker_addr.clone()).as_ref(),
);
return Some(result);
Expand Down Expand Up @@ -244,7 +251,10 @@
.map(|item| item.to_string())
.collect::<HashSet<String>>();
for to_delete_topic in to_delete_topics {
let queue_data_map = self.topic_queue_table.get_mut(to_delete_topic.as_str());
let queue_data_map = self
.topic_queue_table
.mut_from_ref()
.get_mut(to_delete_topic.as_str());

Check warning on line 257 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L254-L257

Added lines #L254 - L257 were not covered by tests
if let Some(queue_data) = queue_data_map {
let removed_qd = queue_data.remove(&broker_name);
if let Some(ref removed_qd_inner) = removed_qd {
Expand All @@ -254,7 +264,9 @@
);
}
if queue_data.is_empty() {
self.topic_queue_table.remove(to_delete_topic.as_str());
self.topic_queue_table
.mut_from_ref()
.remove(to_delete_topic.as_str());

Check warning on line 269 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L267-L269

Added lines #L267 - L269 were not covered by tests
}
}
}
Expand Down Expand Up @@ -285,9 +297,11 @@
for (topic, vtq_info) in topic_queue_mapping_info_map {
if !self.topic_queue_mapping_info_table.contains_key(topic) {
self.topic_queue_mapping_info_table
.mut_from_ref()

Check warning on line 300 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L300

Added line #L300 was not covered by tests
.insert(topic.clone(), HashMap::new());
}
self.topic_queue_mapping_info_table
.mut_from_ref()

Check warning on line 304 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L304

Added line #L304 was not covered by tests
.get_mut(topic)
.unwrap()
.insert(vtq_info.bname.as_ref().unwrap().clone(), vtq_info.clone());
Expand All @@ -297,13 +311,10 @@

let broker_addr_info = BrokerAddrInfo::new(cluster_name.clone(), broker_addr.clone());

self.broker_live_table.insert(
self.broker_live_table.mut_from_ref().insert(

Check warning on line 314 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L314

Added line #L314 was not covered by tests
broker_addr_info.clone(),
BrokerLiveInfo::new(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64,
get_current_millis() as i64,

Check warning on line 317 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L317

Added line #L317 was not covered by tests
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME,
topic_config_serialize_wrapper
.topic_config_serialize_wrapper
Expand All @@ -314,9 +325,12 @@
),
);
if filter_server_list.is_empty() {
self.filter_server_table.remove(&broker_addr_info);
self.filter_server_table
.mut_from_ref()
.remove(&broker_addr_info);

Check warning on line 330 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L328-L330

Added lines #L328 - L330 were not covered by tests
} else {
self.filter_server_table
.mut_from_ref()

Check warning on line 333 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L333

Added line #L333 was not covered by tests
.insert(broker_addr_info.clone(), filter_server_list);
}

Expand Down Expand Up @@ -345,6 +359,7 @@
),
)
}
drop(_write);

Check warning on line 362 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L362

Added line #L362 was not covered by tests
Some(result)
}
}
Expand Down Expand Up @@ -471,7 +486,7 @@
}

impl RouteInfoManager {
fn topic_set_of_broker_name(&mut self, broker_name: &str) -> HashSet<String> {
fn topic_set_of_broker_name(&self, broker_name: &str) -> HashSet<String> {

Check warning on line 489 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L489

Added line #L489 was not covered by tests
let mut topic_of_broker = HashSet::new();
for (key, value) in self.topic_queue_table.iter() {
if value.contains_key(broker_name) {
Expand All @@ -482,7 +497,7 @@
}

pub(crate) fn is_topic_config_changed(
&mut self,
&self,

Check warning on line 500 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L500

Added line #L500 was not covered by tests
cluster_name: &CheetahString,
broker_addr: &CheetahString,
data_version: &DataVersion,
Expand Down Expand Up @@ -532,11 +547,7 @@
None
}

fn create_and_update_queue_data(
&mut self,
broker_name: &CheetahString,
topic_config: TopicConfig,
) {
fn create_and_update_queue_data(&self, broker_name: &CheetahString, topic_config: TopicConfig) {

Check warning on line 550 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L550

Added line #L550 was not covered by tests
let queue_data = QueueData::new(
broker_name.clone(),
topic_config.write_queue_nums,
Expand All @@ -547,6 +558,7 @@

let queue_data_map = self
.topic_queue_table
.mut_from_ref()

Check warning on line 561 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L561

Added line #L561 was not covered by tests
.get_mut(topic_config.topic_name.as_ref().unwrap().as_str());
if let Some(queue_data_map_inner) = queue_data_map {
let existed_qd = queue_data_map_inner.get(broker_name);
Expand All @@ -572,15 +584,15 @@
&queue_data
);
queue_data_map_inner.insert(broker_name.clone(), queue_data);
self.topic_queue_table.insert(
self.topic_queue_table.mut_from_ref().insert(

Check warning on line 587 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L587

Added line #L587 was not covered by tests
topic_config.topic_name.as_ref().unwrap().clone(),
queue_data_map_inner,
);
}
}

fn notify_min_broker_id_changed(
&mut self,
&self,

Check warning on line 595 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L595

Added line #L595 was not covered by tests
broker_addr_map: &HashMap<i64, CheetahString>,
offline_broker_addr: Option<CheetahString>,
ha_broker_addr: Option<CheetahString>,
Expand Down Expand Up @@ -622,7 +634,7 @@
}

fn choose_broker_addrs_to_notify(
&mut self,
&self,

Check warning on line 637 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L637

Added line #L637 was not covered by tests
broker_addr_map: &HashMap<i64, CheetahString>,
offline_broker_addr: Option<CheetahString>,
) -> Option<Vec<CheetahString>> {
Expand Down
1 change: 1 addition & 0 deletions rocketmq-remoting/src/protocol/route/route_data_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl BrokerData {
self.zone_name = zone_name;
}

#[inline]
pub fn set_enable_acting_master(&mut self, enable_acting_master: bool) {
self.enable_acting_master = enable_acting_master;
}
Expand Down
Loading