Skip to content

[ISSUE #1249]🍻Optimize RouteInfoManager logic #1250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,14 @@
);
let mut rim_write = self.route_info_manager.write();
rim_write.update_broker_info_update_timestamp(
request_header.cluster_name.as_str(),
request_header.broker_addr.as_str(),
request_header.cluster_name.clone(),
request_header.broker_addr.clone(),

Check warning on line 184 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L183-L184

Added lines #L183 - L184 were not covered by tests
);
let mut command = RemotingCommand::create_response_command()
.set_command_custom_header(QueryDataVersionResponseHeader::new(changed));
if let Some(value) = rim_write.query_broker_topic_config(
request_header.cluster_name.as_str(),
request_header.broker_addr.as_str(),
) {
if let Some(value) = rim_write
.query_broker_topic_config(request_header.cluster_name, request_header.broker_addr)
{

Check warning on line 190 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L188-L190

Added lines #L188 - L190 were not covered by tests
command = command.set_body(value.encode());
}
drop(rim_write);
Expand Down Expand Up @@ -311,8 +310,8 @@
self.route_info_manager
.write()
.update_broker_info_update_timestamp(
request_header.cluster_name.as_str(),
request_header.broker_addr.as_str(),
request_header.cluster_name,
request_header.broker_addr,

Check warning on line 314 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L313-L314

Added lines #L313 - L314 were not covered by tests
Comment on lines +313 to +314
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing test coverage for broker heartbeat functionality

The verification reveals:

  • No dedicated test files exist in the namesrv module
  • The broker heartbeat functionality is critical for broker liveness detection (used in scan_not_active_broker)
  • The string optimization changes in process_broker_heartbeat affect core broker registration functionality

Action items:

  • Add unit tests for process_broker_heartbeat to verify the string handling changes
  • Include integration tests covering the broker heartbeat lifecycle with the route info manager
🔗 Analysis chain

Consistent string handling optimization, but needs test coverage.

The string handling changes are consistent with the optimization pattern seen elsewhere in the file. However, these changes are not covered by tests.

Let's verify the broker heartbeat functionality:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for broker heartbeat tests
rg -A 5 "test.*broker.*heartbeat" --type rust

Length of output: 47


Script:

#!/bin/bash
# Let's first check if there are any test files related to the default request processor
fd "test.*\.rs" rocketmq-namesrv/src

# Then let's look for any broker heartbeat related tests with a broader search
rg -l "broker.*heart|heart.*broker" --type rust

# Let's also check the implementation to understand the context better
ast-grep --pattern 'fn process_broker_heartbeat($$$)'

Length of output: 669


Script:

#!/bin/bash
# Let's examine the test coverage in the namesrv module
fd "test" rocketmq-namesrv/src -t f -x cat {}

# Let's also check how broker heartbeat is used in the route info manager
rg -C 5 "broker.*heart|heart.*broker" rocketmq-namesrv/src/route/route_info_manager.rs

# And check the actual implementation of process_broker_heartbeat
rg -C 10 "process_broker_heartbeat" rocketmq-namesrv/src/processor/default_request_processor.rs

Length of output: 2841

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 313-314: rocketmq-namesrv/src/processor/default_request_processor.rs#L313-L314
Added lines #L313 - L314 were not covered by tests

);
RemotingCommand::create_response_command()
}
Expand All @@ -322,10 +321,10 @@
.decode_command_custom_header::<GetBrokerMemberGroupRequestHeader>()
.unwrap();

let broker_member_group = self.route_info_manager.write().get_broker_member_group(
request_header.cluster_name.as_str(),
request_header.broker_name.as_str(),
);
let broker_member_group = self
.route_info_manager
.write()
.get_broker_member_group(&request_header.cluster_name, &request_header.broker_name);

Check warning on line 327 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L324-L327

Added lines #L324 - L327 were not covered by tests
Comment on lines +324 to +327
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add tests for broker member group functionality

The optimization to use references instead of cloned values in get_broker_member_group is good for performance, but there are no tests covering this functionality. Please add unit tests to verify:

  • Retrieving broker member group with valid cluster and broker names
  • Handling of non-existent cluster/broker combinations
  • Proper behavior with concurrent read/write operations
🔗 Analysis chain

Good optimization using references, but needs test coverage.

Using references instead of cloned values is a good optimization that can improve performance by reducing unnecessary memory allocations.

Let's verify the broker member group functionality:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for broker member group tests
rg -A 5 "test.*broker.*member.*group" --type rust

Length of output: 51


Script:

#!/bin/bash
# Let's search for any tests related to the RouteInfoManager and broker functionality
rg -l "RouteInfoManager" --type rust

# Also search for any test modules in the namesrv directory
fd "test" -e rs --type f rocketmq-namesrv/

Length of output: 309


Script:

#!/bin/bash
# Let's check the route_info_manager.rs file for existing tests
rg "mod test" -A 10 rocketmq-namesrv/src/route/route_info_manager.rs

# Also check for any test functions related to broker operations
ast-grep --pattern 'fn test_$_broker$_() { $$$ }' rocketmq-namesrv/src/route/route_info_manager.rs

Length of output: 171

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 324-327: rocketmq-namesrv/src/processor/default_request_processor.rs#L324-L327
Added lines #L324 - L327 were not covered by tests

let response_body = GetBrokerMemberGroupResponseBody {
broker_member_group,
};
Expand Down Expand Up @@ -397,7 +396,7 @@
if !topic_route_data.queue_datas.is_empty() {
self.route_info_manager
.write()
.register_topic(request_header.topic.as_str(), topic_route_data.queue_datas)
.register_topic(request_header.topic, topic_route_data.queue_datas)

Check warning on line 399 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L399

Added line #L399 was not covered by tests
}
}
RemotingCommand::create_response_command()
Expand Down
57 changes: 21 additions & 36 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,10 @@

pub(crate) fn is_topic_config_changed(
&mut self,
cluster_name: &str,
broker_addr: &str,
cluster_name: &CheetahString,
broker_addr: &CheetahString,

Check warning on line 484 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L483-L484

Added lines #L483 - L484 were not covered by tests
data_version: &DataVersion,
broker_name: &str,
broker_name: &CheetahString,

Check warning on line 486 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L486

Added line #L486 was not covered by tests
topic: &str,
) -> bool {
let is_change =
Expand All @@ -504,11 +504,11 @@

pub(crate) fn is_broker_topic_config_changed(
&self,
cluster_name: &str,
broker_addr: &str,
cluster_name: &CheetahString,
broker_addr: &CheetahString,

Check warning on line 508 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L507-L508

Added lines #L507 - L508 were not covered by tests
data_version: &DataVersion,
) -> bool {
let option = self.query_broker_topic_config(cluster_name, broker_addr);
let option = self.query_broker_topic_config(cluster_name.clone(), broker_addr.clone());

Check warning on line 511 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L511

Added line #L511 was not covered by tests
if let Some(pre) = option {
if pre != data_version {
return true;
Expand All @@ -519,10 +519,10 @@

pub(crate) fn query_broker_topic_config(
&self,
cluster_name: &str,
broker_addr: &str,
cluster_name: CheetahString,
broker_addr: CheetahString,

Check warning on line 523 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L522-L523

Added lines #L522 - L523 were not covered by tests
) -> Option<&DataVersion> {
let info = BrokerAddrInfo::new(cluster_name.to_string(), broker_addr.to_string());
let info = BrokerAddrInfo::new(cluster_name, broker_addr);

Check warning on line 525 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L525

Added line #L525 was not covered by tests
let pre = self.broker_live_table.get(info.as_ref());
if let Some(live_info) = pre {
return Some(live_info.data_version());
Expand Down Expand Up @@ -638,8 +638,8 @@

pub(crate) fn update_broker_info_update_timestamp(
&mut self,
cluster_name: impl Into<CheetahString>,
broker_addr: impl Into<CheetahString>,
cluster_name: CheetahString,
broker_addr: CheetahString,

Check warning on line 642 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L641-L642

Added lines #L641 - L642 were not covered by tests
) {
let broker_addr_info = BrokerAddrInfo::new(cluster_name, broker_addr);
if let Some(value) = self.broker_live_table.get_mut(broker_addr_info.as_ref()) {
Expand All @@ -649,13 +649,10 @@

pub(crate) fn get_broker_member_group(
&mut self,
cluster_name: &str,
broker_name: &str,
cluster_name: &CheetahString,
broker_name: &CheetahString,

Check warning on line 653 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L652-L653

Added lines #L652 - L653 were not covered by tests
) -> Option<BrokerMemberGroup> {
let mut group_member = BrokerMemberGroup::new(
CheetahString::from_slice(cluster_name),
CheetahString::from_slice(broker_name),
);
let mut group_member = BrokerMemberGroup::new(cluster_name.clone(), broker_name.clone());

Check warning on line 655 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L655

Added line #L655 was not covered by tests
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
let map = broker_data.broker_addrs().clone();
for (key, value) in map {
Expand Down Expand Up @@ -737,21 +734,15 @@
}
}

pub(crate) fn register_topic(
&mut self,
topic: impl Into<CheetahString>,
queue_data_vec: Vec<QueueData>,
) {
pub(crate) fn register_topic(&mut self, topic: CheetahString, queue_data_vec: Vec<QueueData>) {

Check warning on line 737 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L737

Added line #L737 was not covered by tests
if queue_data_vec.is_empty() {
return;
}
let topic_inner = topic.into();

if !self.topic_queue_table.contains_key(topic_inner.as_str()) {
self.topic_queue_table
.insert(topic_inner.clone(), HashMap::new());
if !self.topic_queue_table.contains_key(&topic) {
self.topic_queue_table.insert(topic.clone(), HashMap::new());

Check warning on line 743 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L742-L743

Added lines #L742 - L743 were not covered by tests
}
let queue_data_map = self.topic_queue_table.get_mut(&topic_inner).unwrap();
let queue_data_map = self.topic_queue_table.get_mut(&topic).unwrap();

Check warning on line 745 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L745

Added line #L745 was not covered by tests
let vec_length = queue_data_vec.len();
for queue_data in queue_data_vec {
if !self
Expand All @@ -760,23 +751,17 @@
{
warn!(
"Register topic contains illegal broker, {}, {:?}",
topic_inner, queue_data
topic, queue_data
);
return;
}
queue_data_map.insert(queue_data.broker_name().clone(), queue_data);
}

if queue_data_map.len() > vec_length {
info!(
"Topic route already exist.{}, {:?}",
&topic_inner, queue_data_map
)
info!("Topic route already exist.{}, {:?}", topic, queue_data_map)

Check warning on line 762 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L762

Added line #L762 was not covered by tests
} else {
info!(
"Register topic route:{}, {:?}",
&topic_inner, queue_data_map
)
info!("Register topic route:{}, {:?}", topic, queue_data_map)

Check warning on line 764 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L764

Added line #L764 was not covered by tests
}
}

Expand Down
Loading