Skip to content

[ISSUE #1253]⚡️Optimize RouteInfoManager in NameServerBootstrap, remove RouteInfoManager parking_lot::RwLock #1259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Nov 21, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1253

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Enhanced broker registration process with improved error handling and logging.
    • Introduced a new method for creating request commands in the RemotingCommand struct.
  • Bug Fixes

    • Improved error reporting for various methods, ensuring clearer error messages.
  • Documentation

    • Updated method signatures and structures to reflect changes in data types and ownership models.
  • Tests

    • Added unit tests for critical functions to ensure correct handling of request body scenarios.

Copy link
Contributor

coderabbitai bot commented Nov 21, 2024

Walkthrough

The pull request introduces several modifications across multiple files, primarily focusing on the BrokerOuterAPI, NameServerRuntime, and RouteInfoManager. Key changes include simplifying method logic for handling broker registration, enhancing error handling, and improving logging. Additionally, the RouteInfoManager and related structs have been updated to remove unnecessary locking mechanisms, transitioning to more efficient mutable ownership models. The RegisterBrokerResult struct has also been modified to utilize a new string type, CheetahString, enhancing type safety. Overall, the changes streamline the code and improve clarity and performance.

Changes

File Path Change Summary
rocketmq-broker/src/out_api/broker_outer_api.rs Updated register_broker method logic for clarity; improved error handling in lock_batch_mq_async and unlock_batch_mq_async; enhanced logging in dns_lookup_address_by_domain.
rocketmq-namesrv/src/bootstrap.rs Changed route_info_manager type from Arc<RwLock<RouteInfoManager>> to RouteInfoManager, simplifying access and management.
rocketmq-namesrv/src/processor/client_request_processor.rs Updated route_info_manager type; removed locking in get_route_info_by_topic method for direct access.
rocketmq-namesrv/src/processor/default_request_processor.rs Changed route_info_manager type; updated several method signatures to allow mutable access and improved error handling.
rocketmq-namesrv/src/route/route_info_manager.rs Modified internal data structures to use ArcMut for shared mutable ownership; updated methods for better concurrency handling.
rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs Added Default trait to RegisterBrokerRequestHeader and RegisterBrokerResponseHeader structs for easier initialization.
rocketmq-remoting/src/protocol/namesrv.rs Changed ha_server_addr and master_addr types from String to CheetahString in RegisterBrokerResult.
rocketmq-remoting/src/protocol/remoting_command.rs Added new method new_request to RemotingCommand for convenient request creation.

Assessment against linked issues

Objective Addressed Explanation
Optimize RouteInfoManager in NameServerBootstrap (1253)

Possibly related PRs

Suggested labels

enhancement, approved, auto merge, AI review first

Suggested reviewers

  • TeslaRustor
  • SpaceXCN

Poem

In the code where rabbits hop,
Changes made, we’ll never stop.
With CheetahString swift and bright,
Our logic shines, a pure delight!
Error handling, clear as day,
Hopping forward, we code away! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot rocketmq-rust-bot added this to the v0.4.0 milestone Nov 21, 2024
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

@rocketmq-rust-bot rocketmq-rust-bot added AI review first Ai review pr first ready to review waiting-review waiting review this PR enhancement⚡️ New feature or request labels Nov 21, 2024
Copy link

codecov bot commented Nov 21, 2024

Codecov Report

Attention: Patch coverage is 38.73239% with 87 lines in your changes missing coverage. Please review.

Project coverage is 18.41%. Comparing base (2c86e0e) to head (e87414c).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...namesrv/src/processor/default_request_processor.rs 44.82% 64 Missing ⚠️
rocketmq-namesrv/src/route/route_info_manager.rs 0.00% 13 Missing ⚠️
rocketmq-namesrv/src/bootstrap.rs 0.00% 5 Missing ⚠️
rocketmq-broker/src/out_api/broker_outer_api.rs 0.00% 3 Missing ⚠️
...-namesrv/src/processor/client_request_processor.rs 0.00% 1 Missing ⚠️
.../protocol/header/namesrv/register_broker_header.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1259      +/-   ##
==========================================
+ Coverage   18.25%   18.41%   +0.15%     
==========================================
  Files         428      428              
  Lines       53609    53609              
==========================================
+ Hits         9789     9874      +85     
+ Misses      43820    43735      -85     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Outside diff range and nitpick comments (10)
rocketmq-remoting/src/protocol/namesrv.rs (1)

Line range hint 24-33: Consider documenting CheetahString usage

The struct fields using CheetahString would benefit from documentation comments explaining:

  1. Why CheetahString is preferred over String
  2. Any performance characteristics or constraints

Add documentation like this:

 #[derive(Debug, Clone, Serialize, Deserialize, Default)]
 pub struct RegisterBrokerResult {
     #[serde(rename = "haServerAddr")]
+    /// Uses CheetahString for improved performance in broker address handling
     pub ha_server_addr: CheetahString,
 
     #[serde(rename = "masterAddr")]
+    /// Uses CheetahString for improved performance in master address handling
     pub master_addr: CheetahString,
rocketmq-namesrv/src/processor/client_request_processor.rs (1)

Line range hint 82-98: Clean up commented-out version-specific JSON handling code

The implementation looks good, but there's commented-out code for version-specific JSON handling. Either:

  1. Remove it if it's no longer needed
  2. Implement it if it's still required
  3. Add a TODO comment explaining why it's preserved
rocketmq-broker/src/out_api/broker_outer_api.rs (1)

279-281: Consider reducing duplication in default value handling

The current implementation repeats the empty string creation. Consider extracting it to improve maintainability.

-                        result.ha_server_addr = header
-                            .ha_server_addr
-                            .clone()
-                            .unwrap_or(CheetahString::empty());
-                        result.master_addr =
-                            header.master_addr.clone().unwrap_or(CheetahString::empty());
+                        let default_addr = CheetahString::empty();
+                        result.ha_server_addr = header.ha_server_addr.clone().unwrap_or_else(|| default_addr.clone());
+                        result.master_addr = header.master_addr.clone().unwrap_or_else(|| default_addr.clone());
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 279-281: rocketmq-broker/src/out_api/broker_outer_api.rs#L279-L281
Added lines #L279 - L281 were not covered by tests

rocketmq-namesrv/src/processor/default_request_processor.rs (6)

216-216: Provide a more specific error code for CRC32 mismatch

When the CRC32 checksum does not match, returning a generic SystemError may not provide enough information for debugging. Consider using a more specific error code or defining a new one to indicate a checksum validation failure.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 216-216: rocketmq-namesrv/src/processor/default_request_processor.rs#L216
Added line #L216 was not covered by tests


238-246: Avoid cloning if unnecessary in register_broker parameters

Passing owned String values like cluster_name, broker_addr, and broker_name may lead to unnecessary cloning if the register_broker method does not require ownership. Consider passing references (&str) to these parameters if possible to improve performance.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 238-241: rocketmq-namesrv/src/processor/default_request_processor.rs#L238-L241
Added lines #L238 - L241 were not covered by tests


[warning] 243-246: rocketmq-namesrv/src/processor/default_request_processor.rs#L243-L246
Added lines #L243 - L246 were not covered by tests


256-256: Handle None result explicitly with a descriptive error

Returning a generic system error when register_broker fails may obscure the underlying issue. Provide a more detailed error message or specific error code to help diagnose the problem.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 256-256: rocketmq-namesrv/src/processor/default_request_processor.rs#L256
Added line #L256 was not covered by tests


295-299: Avoid unnecessary cloning of cluster_name and broker_addr

In update_broker_info_update_timestamp, cloning cluster_name and broker_addr may not be necessary if the method can accept references. Passing &request_header.cluster_name and &request_header.broker_addr can help avoid extra allocations.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 295-299: rocketmq-namesrv/src/processor/default_request_processor.rs#L295-L299
Added lines #L295 - L299 were not covered by tests


164-184: Increase unit test coverage for query_broker_topic_config

The query_broker_topic_config function is important for processing broker topic configurations but lacks unit tests. Adding tests will help ensure its correctness and robustness.

Would you like assistance in writing unit tests for this function?

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 164-164: rocketmq-namesrv/src/processor/default_request_processor.rs#L164
Added line #L164 was not covered by tests


[warning] 167-167: rocketmq-namesrv/src/processor/default_request_processor.rs#L167
Added line #L167 was not covered by tests


[warning] 172-178: rocketmq-namesrv/src/processor/default_request_processor.rs#L172-L178
Added lines #L172 - L178 were not covered by tests


Line range hint 211-277: Add unit tests for process_register_broker

The process_register_broker function plays a critical role in handling broker registration but is not currently covered by unit tests. Adding tests for this function will help prevent regressions and ensure reliable behavior.

Would you like assistance in creating unit tests for this function?

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 238-241: rocketmq-namesrv/src/processor/default_request_processor.rs#L238-L241
Added lines #L238 - L241 were not covered by tests


[warning] 243-246: rocketmq-namesrv/src/processor/default_request_processor.rs#L243-L246
Added lines #L243 - L246 were not covered by tests

rocketmq-namesrv/src/route/route_info_manager.rs (1)

355-356: Avoid Cloning Large Data Structures for Performance

In the get_all_cluster_info method, cloning the entire broker_addr_table and cluster_addr_table may lead to performance issues due to unnecessary data duplication. Consider returning references or using read locks to access the data without cloning.

Suggested Change: Return References Using Read Locks

-pub(crate) fn get_all_cluster_info(&self) -> ClusterInfo {
-    ClusterInfo::new(
-        Some(self.broker_addr_table.as_ref().clone()),
-        Some(self.cluster_addr_table.as_ref().clone()),
-    )
+pub(crate) fn get_all_cluster_info(&self) -> ClusterInfo {
+    let broker_addr_table = self.broker_addr_table.read().unwrap();
+    let cluster_addr_table = self.cluster_addr_table.read().unwrap();
+    ClusterInfo::new(
+        Some(broker_addr_table.clone()),
+        Some(cluster_addr_table.clone()),
+    )
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2c86e0e and e87414c.

📒 Files selected for processing (8)
  • rocketmq-broker/src/out_api/broker_outer_api.rs (1 hunks)
  • rocketmq-namesrv/src/bootstrap.rs (3 hunks)
  • rocketmq-namesrv/src/processor/client_request_processor.rs (2 hunks)
  • rocketmq-namesrv/src/processor/default_request_processor.rs (13 hunks)
  • rocketmq-namesrv/src/route/route_info_manager.rs (6 hunks)
  • rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/namesrv.rs (2 hunks)
  • rocketmq-remoting/src/protocol/remoting_command.rs (1 hunks)
🧰 Additional context used
🪛 GitHub Check: codecov/patch
rocketmq-broker/src/out_api/broker_outer_api.rs

[warning] 279-281: rocketmq-broker/src/out_api/broker_outer_api.rs#L279-L281
Added lines #L279 - L281 were not covered by tests

rocketmq-namesrv/src/bootstrap.rs

[warning] 111-111: rocketmq-namesrv/src/bootstrap.rs#L111
Added line #L111 was not covered by tests


[warning] 115-115: rocketmq-namesrv/src/bootstrap.rs#L115
Added line #L115 was not covered by tests


[warning] 117-117: rocketmq-namesrv/src/bootstrap.rs#L117
Added line #L117 was not covered by tests


[warning] 175-175: rocketmq-namesrv/src/bootstrap.rs#L175
Added line #L175 was not covered by tests


[warning] 178-178: rocketmq-namesrv/src/bootstrap.rs#L178
Added line #L178 was not covered by tests

rocketmq-namesrv/src/processor/client_request_processor.rs

[warning] 51-51: rocketmq-namesrv/src/processor/client_request_processor.rs#L51
Added line #L51 was not covered by tests

rocketmq-namesrv/src/processor/default_request_processor.rs

[warning] 117-117: rocketmq-namesrv/src/processor/default_request_processor.rs#L117
Added line #L117 was not covered by tests


[warning] 137-137: rocketmq-namesrv/src/processor/default_request_processor.rs#L137
Added line #L137 was not covered by tests


[warning] 157-157: rocketmq-namesrv/src/processor/default_request_processor.rs#L157
Added line #L157 was not covered by tests


[warning] 164-164: rocketmq-namesrv/src/processor/default_request_processor.rs#L164
Added line #L164 was not covered by tests


[warning] 167-167: rocketmq-namesrv/src/processor/default_request_processor.rs#L167
Added line #L167 was not covered by tests


[warning] 172-178: rocketmq-namesrv/src/processor/default_request_processor.rs#L172-L178
Added lines #L172 - L178 were not covered by tests


[warning] 184-185: rocketmq-namesrv/src/processor/default_request_processor.rs#L184-L185
Added lines #L184 - L185 were not covered by tests


[warning] 196-196: rocketmq-namesrv/src/processor/default_request_processor.rs#L196
Added line #L196 was not covered by tests


[warning] 211-211: rocketmq-namesrv/src/processor/default_request_processor.rs#L211
Added line #L211 was not covered by tests


[warning] 216-216: rocketmq-namesrv/src/processor/default_request_processor.rs#L216
Added line #L216 was not covered by tests


[warning] 220-220: rocketmq-namesrv/src/processor/default_request_processor.rs#L220
Added line #L220 was not covered by tests


[warning] 238-241: rocketmq-namesrv/src/processor/default_request_processor.rs#L238-L241
Added lines #L238 - L241 were not covered by tests


[warning] 243-246: rocketmq-namesrv/src/processor/default_request_processor.rs#L243-L246
Added lines #L243 - L246 were not covered by tests


[warning] 256-256: rocketmq-namesrv/src/processor/default_request_processor.rs#L256
Added line #L256 was not covered by tests


[warning] 276-277: rocketmq-namesrv/src/processor/default_request_processor.rs#L276-L277
Added lines #L276 - L277 were not covered by tests


[warning] 281-281: rocketmq-namesrv/src/processor/default_request_processor.rs#L281
Added line #L281 was not covered by tests


[warning] 284-284: rocketmq-namesrv/src/processor/default_request_processor.rs#L284
Added line #L284 was not covered by tests


[warning] 292-292: rocketmq-namesrv/src/processor/default_request_processor.rs#L292
Added line #L292 was not covered by tests


[warning] 295-299: rocketmq-namesrv/src/processor/default_request_processor.rs#L295-L299
Added lines #L295 - L299 were not covered by tests


[warning] 303-303: rocketmq-namesrv/src/processor/default_request_processor.rs#L303
Added line #L303 was not covered by tests


[warning] 306-306: rocketmq-namesrv/src/processor/default_request_processor.rs#L306
Added line #L306 was not covered by tests


[warning] 318-318: rocketmq-namesrv/src/processor/default_request_processor.rs#L318
Added line #L318 was not covered by tests


[warning] 323-323: rocketmq-namesrv/src/processor/default_request_processor.rs#L323
Added line #L323 was not covered by tests

🔇 Additional comments (15)
rocketmq-remoting/src/protocol/namesrv.rs (2)

26-26: Verify the impact of type changes across the codebase

The change from String to CheetahString for ha_server_addr and master_addr fields looks good from a performance perspective. However, we should verify:

  1. All consumers of this struct are updated to handle CheetahString
  2. No string conversion overhead is introduced elsewhere

Also applies to: 29-29

✅ Verification successful

Based on the search results, I can now generate the final response since I have enough information to verify the impact of type changes:

Type changes from String to CheetahString are consistently applied across the codebase

All consumers of ha_server_addr and master_addr fields are already using CheetahString. The search results show:

  1. All struct definitions use CheetahString consistently:

    • rocketmq-namesrv/src/route_info/broker_addr_info.rs
    • rocketmq-remoting/src/protocol/namesrv.rs
    • rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs
  2. All usages handle CheetahString directly without conversions:

    • No to_string() or String::from() calls on these fields
    • No string conversion overhead introduced
    • All consumers use the CheetahString type natively
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check usage of RegisterBrokerResult fields

# Find all usages of ha_server_addr and master_addr
rg -A 3 "ha_server_addr|master_addr" 

# Find potential String to CheetahString conversions that might indicate inefficient usage
rg "to_string\(\)|String::from|into\(\)" --type rust

Length of output: 104660


17-17: Verify CheetahString compatibility with serde

The addition of CheetahString type needs verification for proper serialization/deserialization behavior since the struct implements Serialize and Deserialize.

✅ Verification successful

Based on the search results, I can now generate the final response since we have found sufficient evidence that CheetahString is properly integrated with serde serialization/deserialization:

CheetahString is compatible with serde serialization

The RegisterBrokerResult struct and numerous other structs in the codebase successfully use CheetahString with serde's Serialize and Deserialize traits. The type is widely used in serializable structures throughout the codebase, particularly in:

  • Protocol messages and headers
  • Configuration structures
  • Data transfer objects
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if CheetahString implements necessary serde traits
# and verify its usage in other parts of the codebase

# Check for serde implementations
ast-grep --pattern 'impl $_ for CheetahString {
  $$$
}'

# Check other usages of CheetahString with serde
rg -l "CheetahString.*Serialize|Serialize.*CheetahString"

Length of output: 118


Script:

#!/bin/bash
# Let's try a different approach to verify CheetahString's serde compatibility

# First, find the CheetahString definition
fd "cheetah_string" --type f

# Look for any derive macros on CheetahString
rg -A 5 "CheetahString" --type rust

# Check for any serde-related imports in files using CheetahString
rg "use.*serde" --type rust $(rg -l "CheetahString")

# Look for other structs using CheetahString with serde
rg "#\[derive.*Serialize.*\]" -A 5 --type rust $(rg -l "CheetahString")

Length of output: 138394


Script:

#!/bin/bash
# Let's check if CheetahString implements serde traits and verify its usage with serde
# First, find the CheetahString definition and its trait implementations
fd -t f "cheetah_string" -x cat {}

# Look for serde derives or implementations specifically
rg -B2 -A2 "impl.*Serialize|impl.*Deserialize|#\[derive.*Serialize|#\[derive.*Deserialize" $(fd -t f "cheetah_string")

# Check RegisterBrokerResult struct specifically
rg -B2 -A5 "RegisterBrokerResult" rocketmq-remoting/src/protocol/namesrv.rs

Length of output: 83565

rocketmq-namesrv/src/processor/client_request_processor.rs (2)

Line range hint 65-81: LGTM! Performance improvement through direct access

The changes to get_route_info_by_topic maintain correct functionality while improving performance by removing lock operations. The error handling and response generation remain robust.


42-42: Verify concurrent access safety after lock removal

The change from Arc<parking_lot::RwLock<RouteInfoManager>> to RouteInfoManager aligns with the PR objectives. However, we should verify that this doesn't introduce thread-safety issues.

✅ Verification successful

Thread safety is maintained despite lock removal

The removal of parking_lot::RwLock is safe because:

  1. RouteInfoManager internally uses thread-safe containers:
    • All internal tables (topic_queue_table, broker_addr_table, etc.) use ArcMut wrappers
    • Concurrent access is handled at a finer granularity within the struct itself
  2. Concurrent operations are properly managed:
    • The manager is started in a separate tokio task with tokio::spawn
    • Connection events are processed asynchronously
    • Internal state modifications are protected by ArcMut
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for concurrent access patterns to RouteInfoManager
rg -A 3 "route_info_manager.*process|process.*route_info_manager" rocketmq-namesrv/src/

# Search for other potential concurrent operations
ast-grep --pattern 'async fn $$$process$$$($$$) { $$$ route_info_manager $$$ }'

Length of output: 171


Script:

#!/bin/bash
# Let's try a broader search for RouteInfoManager usage patterns
rg -A 5 "RouteInfoManager" rocketmq-namesrv/src/

# Search for async functions in the client request processor
rg -A 5 "async fn" rocketmq-namesrv/src/processor/client_request_processor.rs

# Look for concurrent access patterns with spawn or parallel operations
rg -A 3 "tokio::(spawn|join|select)" rocketmq-namesrv/src/

Length of output: 10119

rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs (1)

29-29: LGTM: Adding Default trait is safe and improves ergonomics.

The addition of Default trait is appropriate here as all fields have types that implement Default. This change:

  • Improves struct initialization ergonomics
  • Maintains backward compatibility
  • Aligns with Rust best practices

Let's verify that all fields have proper default values:

rocketmq-remoting/src/protocol/remoting_command.rs (1)

181-183: 🛠️ Refactor suggestion

Add version setting and documentation to the new request method.

The new method provides a simpler way to create requests, but it's missing some important elements:

  1. It doesn't set the command version unlike create_request_command, which could cause compatibility issues.
  2. The method lacks documentation explaining its purpose and usage.

Consider applying these improvements:

+    /// Creates a new request command with the specified code and body.
+    /// 
+    /// # Arguments
+    /// * `code` - The command code
+    /// * `body` - The command body
+    /// 
+    /// # Example
+    /// ```
+    /// let cmd = RemotingCommand::new_request(1, "hello".into());
+    /// ```
     pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
-        Self::default().set_code(code).set_body(body)
+        let mut cmd = Self::default().set_code(code).set_body(body);
+        set_cmd_version(&mut cmd);
+        cmd
     }

Also, let's verify if there are any tests for this new method:

rocketmq-namesrv/src/bootstrap.rs (2)

55-55: Ensure Thread Safety After Removing RwLock

The route_info_manager field has been changed from Arc<parking_lot::RwLock<RouteInfoManager>> to a plain RouteInfoManager. This removal of the locking mechanism means that any concurrent access to route_info_manager must be carefully managed to prevent data races. Please verify that all accesses to route_info_manager are either confined to a single thread or appropriately synchronized.


175-178: Verify Synchronization of route_info_manager Usage

The route_info_manager is initialized without any locking or reference counting:

route_info_manager: RouteInfoManager::new(
    name_server_config.clone(),
    remoting_client.clone(),
),

Please ensure that all components interacting with route_info_manager are aware of this change and that concurrent access does not lead to unsafe behavior.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 175-175: rocketmq-namesrv/src/bootstrap.rs#L175
Added line #L175 was not covered by tests


[warning] 178-178: rocketmq-namesrv/src/bootstrap.rs#L178
Added line #L178 was not covered by tests

rocketmq-namesrv/src/processor/default_request_processor.rs (6)

137-137: [Duplicate Comment]: Replace .expect() with proper error handling to prevent panics

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 137-137: rocketmq-namesrv/src/processor/default_request_processor.rs#L137
Added line #L137 was not covered by tests


157-157: [Duplicate Comment]: Replace .expect() with proper error handling to prevent panics

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 157-157: rocketmq-namesrv/src/processor/default_request_processor.rs#L157
Added line #L157 was not covered by tests


167-167: [Duplicate Comment]: Replace .expect() with proper error handling to prevent panics

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 167-167: rocketmq-namesrv/src/processor/default_request_processor.rs#L167
Added line #L167 was not covered by tests


211-211: [Duplicate Comment]: Replace .expect() with proper error handling to prevent panics

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 211-211: rocketmq-namesrv/src/processor/default_request_processor.rs#L211
Added line #L211 was not covered by tests


459-459: [Duplicate Comment]: Replace .expect() with proper error handling to prevent panics


281-284: [Duplicate Comment]: Add unit tests for process_unregister_broker

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 281-281: rocketmq-namesrv/src/processor/default_request_processor.rs#L281
Added line #L281 was not covered by tests


[warning] 284-284: rocketmq-namesrv/src/processor/default_request_processor.rs#L284
Added line #L284 was not covered by tests

rocketmq-namesrv/src/route/route_info_manager.rs (1)

330-331: Verify Correct Assignment to master_addr Field

Both result.ha_server_addr and result.master_addr are being assigned the value of info.ha_server_addr().clone(). Please confirm if this is intentional. If result.master_addr should reflect the master's address rather than the HA server address, ensure that the correct value is assigned.

@@ -49,7 +48,7 @@

impl ClientRequestProcessor {
pub fn new(
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
route_info_manager: RouteInfoManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add test coverage for the updated constructor

The constructor changes look correct, but test coverage is missing for this critical change. Please add unit tests to verify the behavior with the new RouteInfoManager type.

Would you like me to help generate unit tests for the constructor and its error cases?

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 51-51: rocketmq-namesrv/src/processor/client_request_processor.rs#L51
Added line #L51 was not covered by tests

Comment on lines +279 to +281
.unwrap_or(CheetahString::empty());
result.master_addr =
header.master_addr.clone().unwrap_or(CheetahString::empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add test coverage for broker registration response handling

The verification confirms that while there are tests in broker_outer_api.rs, they only cover DNS lookup functionality. The critical broker registration response handling code remains untested. Please add unit tests to cover:

  • Response with both addresses present
  • Response with missing addresses (testing the unwrap_or fallback)
  • Response with empty addresses

Example test location: Add to the existing tests module in broker_outer_api.rs.

🔗 Analysis chain

Add test coverage for broker registration response handling

While the implementation is correct, these critical lines handling broker registration responses lack test coverage. Consider adding unit tests for various scenarios:

  • Response with both addresses present
  • Response with missing addresses
  • Response with empty addresses
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there are any existing tests for register_broker
rg -l "fn.*test.*register_broker" 

Length of output: 35


Script:

#!/bin/bash
# Let's check for any test files in the broker module
fd "test.*\.rs" rocketmq-broker/src

# Also check for any test modules or test functions in the broker_outer_api.rs file
rg -A 5 "#\[test\]|mod test" rocketmq-broker/src/out_api/broker_outer_api.rs

# Check for any test files containing "broker" in their name
fd ".*broker.*test.*\.rs" 

# Look for any test functions containing "broker" in their name
rg "fn.*test.*broker"

Length of output: 983

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 279-281: rocketmq-broker/src/out_api/broker_outer_api.rs#L279-L281
Added lines #L279 - L281 were not covered by tests

Comment on lines +111 to +117
let mut route_info_manager_arc = self.route_info_manager.clone();
self.name_server_runtime
.as_ref()
.unwrap()
.schedule_at_fixed_rate(
.schedule_at_fixed_rate_mut(
move || {
route_info_manager_arc.write().scan_not_active_broker();
route_info_manager_arc.scan_not_active_broker();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential Concurrency Issue in Scheduled Task

In the init_processors method, the closure passed to schedule_at_fixed_rate_mut calls route_info_manager_arc.scan_not_active_broker(). Since route_info_manager no longer uses a RwLock, this could lead to data races if scan_not_active_broker() modifies shared state that is accessed elsewhere concurrently.

Consider reintroducing synchronization mechanisms (e.g., Arc<Mutex<RouteInfoManager>>) or ensuring that route_info_manager is only accessed from a single thread to prevent concurrency issues.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 111-111: rocketmq-namesrv/src/bootstrap.rs#L111
Added line #L111 was not covered by tests


[warning] 115-115: rocketmq-namesrv/src/bootstrap.rs#L115
Added line #L115 was not covered by tests


[warning] 117-117: rocketmq-namesrv/src/bootstrap.rs#L117
Added line #L117 was not covered by tests

@@ -115,7 +114,7 @@
fn put_kv_config(&mut self, request: RemotingCommand) -> RemotingCommand {
let request_header = request
.decode_command_custom_header::<PutKVConfigRequestHeader>()
.unwrap();
.expect("decode PutKVConfigRequestHeader failed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace .expect() with proper error handling to prevent panics

Using .expect("decode PutKVConfigRequestHeader failed") will cause the application to panic if decoding fails. Consider handling the error gracefully by returning an appropriate error response to prevent unexpected crashes.

Apply this diff to improve error handling:

-let request_header = request
-    .decode_command_custom_header::<PutKVConfigRequestHeader>()
-    .expect("decode PutKVConfigRequestHeader failed");
+let request_header = match request.decode_command_custom_header::<PutKVConfigRequestHeader>() {
+    Ok(header) => header,
+    Err(e) => {
+        return RemotingCommand::create_response_command_with_code(
+            RemotingSysResponseCode::SystemError,
+        )
+        .set_remark(format!("Failed to decode PutKVConfigRequestHeader: {}", e));
+    }
+};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.expect("decode PutKVConfigRequestHeader failed");
let request_header = match request.decode_command_custom_header::<PutKVConfigRequestHeader>() {
Ok(header) => header,
Err(e) => {
return RemotingCommand::create_response_command_with_code(
RemotingSysResponseCode::SystemError,
)
.set_remark(format!("Failed to decode PutKVConfigRequestHeader: {}", e));
}
};
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 117-117: rocketmq-namesrv/src/processor/default_request_processor.rs#L117
Added line #L117 was not covered by tests

Comment on lines +1121 to +1125
pub fn start(mut route_info_manager: Self, receiver: broadcast::Receiver<SocketAddr>) {
let mut receiver = receiver;
tokio::spawn(async move {
while let Ok(socket_addr) = receiver.recv().await {
route_info_manager
.write()
.connection_disconnected(socket_addr);
route_info_manager.connection_disconnected(socket_addr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential Data Race When Moving route_info_manager into Async Task

Moving route_info_manager (which owns mutable data) into a spawned async task may lead to data races if it's accessed concurrently elsewhere. Consider wrapping route_info_manager in an Arc<Mutex<Self>> to safely share it between threads.

Suggested Change: Use Arc<Mutex<RouteInfoManager>> for Thread Safety

First, update the start method signature and parameters:

-pub fn start(mut route_info_manager: Self, receiver: broadcast::Receiver<SocketAddr>) {
+pub fn start(route_info_manager: Arc<Mutex<Self>>, receiver: broadcast::Receiver<SocketAddr>) {
     let mut receiver = receiver;
     tokio::spawn(async move {
         while let Ok(socket_addr) = receiver.recv().await {
-            route_info_manager.connection_disconnected(socket_addr);
+            let mut rim = route_info_manager.lock().await;
+            rim.connection_disconnected(socket_addr);
         }
     });
 }

Ensure that all methods accessed within the async task are compatible with the Mutex locking mechanism.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn start(mut route_info_manager: Self, receiver: broadcast::Receiver<SocketAddr>) {
let mut receiver = receiver;
tokio::spawn(async move {
while let Ok(socket_addr) = receiver.recv().await {
route_info_manager
.write()
.connection_disconnected(socket_addr);
route_info_manager.connection_disconnected(socket_addr);
pub fn start(route_info_manager: Arc<Mutex<Self>>, receiver: broadcast::Receiver<SocketAddr>) {
let mut receiver = receiver;
tokio::spawn(async move {
while let Ok(socket_addr) = receiver.recv().await {
let mut rim = route_info_manager.lock().await;
rim.connection_disconnected(socket_addr);
}
});

Comment on lines +1102 to 1105
for (bai, bli) in self.broker_live_table.as_ref() {
if bli.remote_addr == socket_addr {
broker_addr_info = Some(bai.clone());
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure Thread-Safe Access to broker_live_table

When iterating over broker_live_table, ensure that access is synchronized to prevent data races. Wrap the access in a read lock if using RwLock, so multiple reads can occur concurrently without issues.

Suggested Change: Use Read Lock When Accessing broker_live_table

-pub fn connection_disconnected(&mut self, socket_addr: SocketAddr) {
-    let mut broker_addr_info = None;
-    for (bai, bli) in self.broker_live_table.as_ref() {
+pub fn connection_disconnected(&self, socket_addr: SocketAddr) {
+    let mut broker_addr_info = None;
+    let broker_live_table = self.broker_live_table.read().unwrap();
+    for (bai, bli) in broker_live_table.iter() {
         if bli.remote_addr == socket_addr {
             broker_addr_info = Some(bai.clone());
             break;
         }
     }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +59 to +72
type TopicQueueTable = ArcMut<
HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>,
>;
type BrokerAddrTable = ArcMut<HashMap<CheetahString /* brokerName */, BrokerData>>;
type ClusterAddrTable =
HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>;
type BrokerLiveTable = HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>;
ArcMut<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>;
type BrokerLiveTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>;
type FilterServerTable =
HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>;
type TopicQueueMappingInfoTable = HashMap<
CheetahString, /* topic */
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>,
ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>;
type TopicQueueMappingInfoTable = ArcMut<
HashMap<
CheetahString, /* topic */
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>,
>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure Proper Synchronization with ArcMut Usage

The types TopicQueueTable, BrokerAddrTable, ClusterAddrTable, BrokerLiveTable, FilterServerTable, and TopicQueueMappingInfoTable have been changed to use ArcMut<HashMap<...>>, allowing mutable shared ownership across threads. Please verify that ArcMut provides the necessary synchronization mechanisms to prevent data races and ensure thread safety. If ArcMut does not internally handle synchronization, consider using standard thread-safe primitives like Arc<RwLock<...>> or Arc<Mutex<...>>.

Suggested Change: Replace ArcMut with Arc<RwLock<...>>

If ArcMut does not handle synchronization, you can modify the type definitions as follows:

-type TopicQueueTable = ArcMut<
+type TopicQueueTable = Arc<RwLock<
     HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>,
->
+>>;

Similarly, apply this change to the other type definitions:

-type BrokerAddrTable = ArcMut<HashMap<CheetahString /* brokerName */, BrokerData>>;
+type BrokerAddrTable = Arc<RwLock<HashMap<CheetahString /* brokerName */, BrokerData>>>;

-type ClusterAddrTable = ArcMut<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>;
+type ClusterAddrTable = Arc<RwLock<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>>;

-type BrokerLiveTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>;
+type BrokerLiveTable = Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>>;

-type FilterServerTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>;
+type FilterServerTable = Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>>>;

-type TopicQueueMappingInfoTable = ArcMut<
+type TopicQueueMappingInfoTable = Arc<RwLock<
     HashMap<
         CheetahString, /* topic */
         HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>,
     >,
->>;
+>>;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type TopicQueueTable = ArcMut<
HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>,
>;
type BrokerAddrTable = ArcMut<HashMap<CheetahString /* brokerName */, BrokerData>>;
type ClusterAddrTable =
HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>;
type BrokerLiveTable = HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>;
ArcMut<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>;
type BrokerLiveTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>;
type FilterServerTable =
HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>;
type TopicQueueMappingInfoTable = HashMap<
CheetahString, /* topic */
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>,
ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>;
type TopicQueueMappingInfoTable = ArcMut<
HashMap<
CheetahString, /* topic */
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>,
>,
type TopicQueueTable = Arc<RwLock<
HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>,
>>;
type BrokerAddrTable = Arc<RwLock<HashMap<CheetahString /* brokerName */, BrokerData>>>;
type ClusterAddrTable =
Arc<RwLock<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>>;
type BrokerLiveTable = Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>>;
type FilterServerTable =
Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>>>;
type TopicQueueMappingInfoTable = Arc<RwLock<
HashMap<
CheetahString, /* topic */
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>,
>,
>>;

Comment on lines +94 to +99
topic_queue_table: ArcMut::new(HashMap::new()),
broker_addr_table: ArcMut::new(HashMap::new()),
cluster_addr_table: ArcMut::new(HashMap::new()),
broker_live_table: ArcMut::new(HashMap::new()),
filter_server_table: ArcMut::new(HashMap::new()),
topic_queue_mapping_info_table: ArcMut::new(HashMap::new()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Initialize Shared Data Structures with Appropriate Locks

In the RouteInfoManager::new method, the data structures are initialized using ArcMut::new(HashMap::new()). If you adopt Arc<RwLock<...>>, initialize them as follows to ensure thread safety:

-route_info_manager {
-    topic_queue_table: ArcMut::new(HashMap::new()),
-    broker_addr_table: ArcMut::new(HashMap::new()),
-    cluster_addr_table: ArcMut::new(HashMap::new()),
-    broker_live_table: ArcMut::new(HashMap::new()),
-    filter_server_table: ArcMut::new(HashMap::new()),
-    topic_queue_mapping_info_table: ArcMut::new(HashMap::new()),
+RouteInfoManager {
+    topic_queue_table: Arc::new(RwLock::new(HashMap::new())),
+    broker_addr_table: Arc::new(RwLock::new(HashMap::new())),
+    cluster_addr_table: Arc::new(RwLock::new(HashMap::new())),
+    broker_live_table: Arc::new(RwLock::new(HashMap::new())),
+    filter_server_table: Arc::new(RwLock::new(HashMap::new())),
+    topic_queue_mapping_info_table: Arc::new(RwLock::new(HashMap::new())),
     namesrv_config,
     remoting_client,
 }

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge enhancement⚡️ New feature or request
Projects
None yet
3 participants