-
Notifications
You must be signed in to change notification settings - Fork 144
[ISSUE #1253]⚡️Optimize RouteInfoManager in NameServerBootstrap, remove RouteInfoManager parking_lot::RwLock #1259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ve RouteInfoManager parking_lot::RwLock
WalkthroughThe pull request introduces several modifications across multiple files, primarily focusing on the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1259 +/- ##
==========================================
+ Coverage 18.25% 18.41% +0.15%
==========================================
Files 428 428
Lines 53609 53609
==========================================
+ Hits 9789 9874 +85
+ Misses 43820 43735 -85 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Outside diff range and nitpick comments (10)
rocketmq-remoting/src/protocol/namesrv.rs (1)
Line range hint
24-33
: Consider documenting CheetahString usageThe struct fields using
CheetahString
would benefit from documentation comments explaining:
- Why
CheetahString
is preferred overString
- Any performance characteristics or constraints
Add documentation like this:
#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct RegisterBrokerResult { #[serde(rename = "haServerAddr")] + /// Uses CheetahString for improved performance in broker address handling pub ha_server_addr: CheetahString, #[serde(rename = "masterAddr")] + /// Uses CheetahString for improved performance in master address handling pub master_addr: CheetahString,rocketmq-namesrv/src/processor/client_request_processor.rs (1)
Line range hint
82-98
: Clean up commented-out version-specific JSON handling codeThe implementation looks good, but there's commented-out code for version-specific JSON handling. Either:
- Remove it if it's no longer needed
- Implement it if it's still required
- Add a TODO comment explaining why it's preserved
rocketmq-broker/src/out_api/broker_outer_api.rs (1)
279-281
: Consider reducing duplication in default value handlingThe current implementation repeats the empty string creation. Consider extracting it to improve maintainability.
- result.ha_server_addr = header - .ha_server_addr - .clone() - .unwrap_or(CheetahString::empty()); - result.master_addr = - header.master_addr.clone().unwrap_or(CheetahString::empty()); + let default_addr = CheetahString::empty(); + result.ha_server_addr = header.ha_server_addr.clone().unwrap_or_else(|| default_addr.clone()); + result.master_addr = header.master_addr.clone().unwrap_or_else(|| default_addr.clone());🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 279-281: rocketmq-broker/src/out_api/broker_outer_api.rs#L279-L281
Added lines #L279 - L281 were not covered by testsrocketmq-namesrv/src/processor/default_request_processor.rs (6)
216-216
: Provide a more specific error code for CRC32 mismatchWhen the CRC32 checksum does not match, returning a generic
SystemError
may not provide enough information for debugging. Consider using a more specific error code or defining a new one to indicate a checksum validation failure.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 216-216: rocketmq-namesrv/src/processor/default_request_processor.rs#L216
Added line #L216 was not covered by tests
238-246
: Avoid cloning if unnecessary inregister_broker
parametersPassing owned
String
values likecluster_name
,broker_addr
, andbroker_name
may lead to unnecessary cloning if theregister_broker
method does not require ownership. Consider passing references (&str
) to these parameters if possible to improve performance.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 238-241: rocketmq-namesrv/src/processor/default_request_processor.rs#L238-L241
Added lines #L238 - L241 were not covered by tests
[warning] 243-246: rocketmq-namesrv/src/processor/default_request_processor.rs#L243-L246
Added lines #L243 - L246 were not covered by tests
256-256
: HandleNone
result explicitly with a descriptive errorReturning a generic system error when
register_broker
fails may obscure the underlying issue. Provide a more detailed error message or specific error code to help diagnose the problem.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 256-256: rocketmq-namesrv/src/processor/default_request_processor.rs#L256
Added line #L256 was not covered by tests
295-299
: Avoid unnecessary cloning ofcluster_name
andbroker_addr
In
update_broker_info_update_timestamp
, cloningcluster_name
andbroker_addr
may not be necessary if the method can accept references. Passing&request_header.cluster_name
and&request_header.broker_addr
can help avoid extra allocations.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 295-299: rocketmq-namesrv/src/processor/default_request_processor.rs#L295-L299
Added lines #L295 - L299 were not covered by tests
164-184
: Increase unit test coverage forquery_broker_topic_config
The
query_broker_topic_config
function is important for processing broker topic configurations but lacks unit tests. Adding tests will help ensure its correctness and robustness.Would you like assistance in writing unit tests for this function?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 164-164: rocketmq-namesrv/src/processor/default_request_processor.rs#L164
Added line #L164 was not covered by tests
[warning] 167-167: rocketmq-namesrv/src/processor/default_request_processor.rs#L167
Added line #L167 was not covered by tests
[warning] 172-178: rocketmq-namesrv/src/processor/default_request_processor.rs#L172-L178
Added lines #L172 - L178 were not covered by tests
Line range hint
211-277
: Add unit tests forprocess_register_broker
The
process_register_broker
function plays a critical role in handling broker registration but is not currently covered by unit tests. Adding tests for this function will help prevent regressions and ensure reliable behavior.Would you like assistance in creating unit tests for this function?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 238-241: rocketmq-namesrv/src/processor/default_request_processor.rs#L238-L241
Added lines #L238 - L241 were not covered by tests
[warning] 243-246: rocketmq-namesrv/src/processor/default_request_processor.rs#L243-L246
Added lines #L243 - L246 were not covered by testsrocketmq-namesrv/src/route/route_info_manager.rs (1)
355-356
: Avoid Cloning Large Data Structures for PerformanceIn the
get_all_cluster_info
method, cloning the entirebroker_addr_table
andcluster_addr_table
may lead to performance issues due to unnecessary data duplication. Consider returning references or using read locks to access the data without cloning.Suggested Change: Return References Using Read Locks
-pub(crate) fn get_all_cluster_info(&self) -> ClusterInfo { - ClusterInfo::new( - Some(self.broker_addr_table.as_ref().clone()), - Some(self.cluster_addr_table.as_ref().clone()), - ) +pub(crate) fn get_all_cluster_info(&self) -> ClusterInfo { + let broker_addr_table = self.broker_addr_table.read().unwrap(); + let cluster_addr_table = self.cluster_addr_table.read().unwrap(); + ClusterInfo::new( + Some(broker_addr_table.clone()), + Some(cluster_addr_table.clone()), + ) }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (8)
rocketmq-broker/src/out_api/broker_outer_api.rs
(1 hunks)rocketmq-namesrv/src/bootstrap.rs
(3 hunks)rocketmq-namesrv/src/processor/client_request_processor.rs
(2 hunks)rocketmq-namesrv/src/processor/default_request_processor.rs
(13 hunks)rocketmq-namesrv/src/route/route_info_manager.rs
(6 hunks)rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs
(1 hunks)rocketmq-remoting/src/protocol/namesrv.rs
(2 hunks)rocketmq-remoting/src/protocol/remoting_command.rs
(1 hunks)
🧰 Additional context used
🪛 GitHub Check: codecov/patch
rocketmq-broker/src/out_api/broker_outer_api.rs
[warning] 279-281: rocketmq-broker/src/out_api/broker_outer_api.rs#L279-L281
Added lines #L279 - L281 were not covered by tests
rocketmq-namesrv/src/bootstrap.rs
[warning] 111-111: rocketmq-namesrv/src/bootstrap.rs#L111
Added line #L111 was not covered by tests
[warning] 115-115: rocketmq-namesrv/src/bootstrap.rs#L115
Added line #L115 was not covered by tests
[warning] 117-117: rocketmq-namesrv/src/bootstrap.rs#L117
Added line #L117 was not covered by tests
[warning] 175-175: rocketmq-namesrv/src/bootstrap.rs#L175
Added line #L175 was not covered by tests
[warning] 178-178: rocketmq-namesrv/src/bootstrap.rs#L178
Added line #L178 was not covered by tests
rocketmq-namesrv/src/processor/client_request_processor.rs
[warning] 51-51: rocketmq-namesrv/src/processor/client_request_processor.rs#L51
Added line #L51 was not covered by tests
rocketmq-namesrv/src/processor/default_request_processor.rs
[warning] 117-117: rocketmq-namesrv/src/processor/default_request_processor.rs#L117
Added line #L117 was not covered by tests
[warning] 137-137: rocketmq-namesrv/src/processor/default_request_processor.rs#L137
Added line #L137 was not covered by tests
[warning] 157-157: rocketmq-namesrv/src/processor/default_request_processor.rs#L157
Added line #L157 was not covered by tests
[warning] 164-164: rocketmq-namesrv/src/processor/default_request_processor.rs#L164
Added line #L164 was not covered by tests
[warning] 167-167: rocketmq-namesrv/src/processor/default_request_processor.rs#L167
Added line #L167 was not covered by tests
[warning] 172-178: rocketmq-namesrv/src/processor/default_request_processor.rs#L172-L178
Added lines #L172 - L178 were not covered by tests
[warning] 184-185: rocketmq-namesrv/src/processor/default_request_processor.rs#L184-L185
Added lines #L184 - L185 were not covered by tests
[warning] 196-196: rocketmq-namesrv/src/processor/default_request_processor.rs#L196
Added line #L196 was not covered by tests
[warning] 211-211: rocketmq-namesrv/src/processor/default_request_processor.rs#L211
Added line #L211 was not covered by tests
[warning] 216-216: rocketmq-namesrv/src/processor/default_request_processor.rs#L216
Added line #L216 was not covered by tests
[warning] 220-220: rocketmq-namesrv/src/processor/default_request_processor.rs#L220
Added line #L220 was not covered by tests
[warning] 238-241: rocketmq-namesrv/src/processor/default_request_processor.rs#L238-L241
Added lines #L238 - L241 were not covered by tests
[warning] 243-246: rocketmq-namesrv/src/processor/default_request_processor.rs#L243-L246
Added lines #L243 - L246 were not covered by tests
[warning] 256-256: rocketmq-namesrv/src/processor/default_request_processor.rs#L256
Added line #L256 was not covered by tests
[warning] 276-277: rocketmq-namesrv/src/processor/default_request_processor.rs#L276-L277
Added lines #L276 - L277 were not covered by tests
[warning] 281-281: rocketmq-namesrv/src/processor/default_request_processor.rs#L281
Added line #L281 was not covered by tests
[warning] 284-284: rocketmq-namesrv/src/processor/default_request_processor.rs#L284
Added line #L284 was not covered by tests
[warning] 292-292: rocketmq-namesrv/src/processor/default_request_processor.rs#L292
Added line #L292 was not covered by tests
[warning] 295-299: rocketmq-namesrv/src/processor/default_request_processor.rs#L295-L299
Added lines #L295 - L299 were not covered by tests
[warning] 303-303: rocketmq-namesrv/src/processor/default_request_processor.rs#L303
Added line #L303 was not covered by tests
[warning] 306-306: rocketmq-namesrv/src/processor/default_request_processor.rs#L306
Added line #L306 was not covered by tests
[warning] 318-318: rocketmq-namesrv/src/processor/default_request_processor.rs#L318
Added line #L318 was not covered by tests
[warning] 323-323: rocketmq-namesrv/src/processor/default_request_processor.rs#L323
Added line #L323 was not covered by tests
🔇 Additional comments (15)
rocketmq-remoting/src/protocol/namesrv.rs (2)
26-26
: Verify the impact of type changes across the codebase
The change from String
to CheetahString
for ha_server_addr
and master_addr
fields looks good from a performance perspective. However, we should verify:
- All consumers of this struct are updated to handle
CheetahString
- No string conversion overhead is introduced elsewhere
Also applies to: 29-29
✅ Verification successful
Based on the search results, I can now generate the final response since I have enough information to verify the impact of type changes:
Type changes from String
to CheetahString
are consistently applied across the codebase
All consumers of ha_server_addr
and master_addr
fields are already using CheetahString
. The search results show:
-
All struct definitions use
CheetahString
consistently:rocketmq-namesrv/src/route_info/broker_addr_info.rs
rocketmq-remoting/src/protocol/namesrv.rs
rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs
-
All usages handle
CheetahString
directly without conversions:- No
to_string()
orString::from()
calls on these fields - No string conversion overhead introduced
- All consumers use the
CheetahString
type natively
- No
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check usage of RegisterBrokerResult fields
# Find all usages of ha_server_addr and master_addr
rg -A 3 "ha_server_addr|master_addr"
# Find potential String to CheetahString conversions that might indicate inefficient usage
rg "to_string\(\)|String::from|into\(\)" --type rust
Length of output: 104660
17-17
: Verify CheetahString compatibility with serde
The addition of CheetahString
type needs verification for proper serialization/deserialization behavior since the struct implements Serialize
and Deserialize
.
✅ Verification successful
Based on the search results, I can now generate the final response since we have found sufficient evidence that CheetahString is properly integrated with serde serialization/deserialization:
CheetahString is compatible with serde serialization
The RegisterBrokerResult
struct and numerous other structs in the codebase successfully use CheetahString with serde's Serialize
and Deserialize
traits. The type is widely used in serializable structures throughout the codebase, particularly in:
- Protocol messages and headers
- Configuration structures
- Data transfer objects
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if CheetahString implements necessary serde traits
# and verify its usage in other parts of the codebase
# Check for serde implementations
ast-grep --pattern 'impl $_ for CheetahString {
$$$
}'
# Check other usages of CheetahString with serde
rg -l "CheetahString.*Serialize|Serialize.*CheetahString"
Length of output: 118
Script:
#!/bin/bash
# Let's try a different approach to verify CheetahString's serde compatibility
# First, find the CheetahString definition
fd "cheetah_string" --type f
# Look for any derive macros on CheetahString
rg -A 5 "CheetahString" --type rust
# Check for any serde-related imports in files using CheetahString
rg "use.*serde" --type rust $(rg -l "CheetahString")
# Look for other structs using CheetahString with serde
rg "#\[derive.*Serialize.*\]" -A 5 --type rust $(rg -l "CheetahString")
Length of output: 138394
Script:
#!/bin/bash
# Let's check if CheetahString implements serde traits and verify its usage with serde
# First, find the CheetahString definition and its trait implementations
fd -t f "cheetah_string" -x cat {}
# Look for serde derives or implementations specifically
rg -B2 -A2 "impl.*Serialize|impl.*Deserialize|#\[derive.*Serialize|#\[derive.*Deserialize" $(fd -t f "cheetah_string")
# Check RegisterBrokerResult struct specifically
rg -B2 -A5 "RegisterBrokerResult" rocketmq-remoting/src/protocol/namesrv.rs
Length of output: 83565
rocketmq-namesrv/src/processor/client_request_processor.rs (2)
Line range hint 65-81
: LGTM! Performance improvement through direct access
The changes to get_route_info_by_topic
maintain correct functionality while improving performance by removing lock operations. The error handling and response generation remain robust.
42-42
: Verify concurrent access safety after lock removal
The change from Arc<parking_lot::RwLock<RouteInfoManager>>
to RouteInfoManager
aligns with the PR objectives. However, we should verify that this doesn't introduce thread-safety issues.
✅ Verification successful
Thread safety is maintained despite lock removal
The removal of parking_lot::RwLock
is safe because:
RouteInfoManager
internally uses thread-safe containers:- All internal tables (topic_queue_table, broker_addr_table, etc.) use
ArcMut
wrappers - Concurrent access is handled at a finer granularity within the struct itself
- All internal tables (topic_queue_table, broker_addr_table, etc.) use
- Concurrent operations are properly managed:
- The manager is started in a separate tokio task with
tokio::spawn
- Connection events are processed asynchronously
- Internal state modifications are protected by
ArcMut
- The manager is started in a separate tokio task with
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for concurrent access patterns to RouteInfoManager
rg -A 3 "route_info_manager.*process|process.*route_info_manager" rocketmq-namesrv/src/
# Search for other potential concurrent operations
ast-grep --pattern 'async fn $$$process$$$($$$) { $$$ route_info_manager $$$ }'
Length of output: 171
Script:
#!/bin/bash
# Let's try a broader search for RouteInfoManager usage patterns
rg -A 5 "RouteInfoManager" rocketmq-namesrv/src/
# Search for async functions in the client request processor
rg -A 5 "async fn" rocketmq-namesrv/src/processor/client_request_processor.rs
# Look for concurrent access patterns with spawn or parallel operations
rg -A 3 "tokio::(spawn|join|select)" rocketmq-namesrv/src/
Length of output: 10119
rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs (1)
29-29
: LGTM: Adding Default trait is safe and improves ergonomics.
The addition of Default
trait is appropriate here as all fields have types that implement Default. This change:
- Improves struct initialization ergonomics
- Maintains backward compatibility
- Aligns with Rust best practices
Let's verify that all fields have proper default values:
rocketmq-remoting/src/protocol/remoting_command.rs (1)
181-183
: 🛠️ Refactor suggestion
Add version setting and documentation to the new request method.
The new method provides a simpler way to create requests, but it's missing some important elements:
- It doesn't set the command version unlike
create_request_command
, which could cause compatibility issues. - The method lacks documentation explaining its purpose and usage.
Consider applying these improvements:
+ /// Creates a new request command with the specified code and body.
+ ///
+ /// # Arguments
+ /// * `code` - The command code
+ /// * `body` - The command body
+ ///
+ /// # Example
+ /// ```
+ /// let cmd = RemotingCommand::new_request(1, "hello".into());
+ /// ```
pub fn new_request(code: impl Into<i32>, body: impl Into<Bytes>) -> Self {
- Self::default().set_code(code).set_body(body)
+ let mut cmd = Self::default().set_code(code).set_body(body);
+ set_cmd_version(&mut cmd);
+ cmd
}
Also, let's verify if there are any tests for this new method:
rocketmq-namesrv/src/bootstrap.rs (2)
55-55
: Ensure Thread Safety After Removing RwLock
The route_info_manager
field has been changed from Arc<parking_lot::RwLock<RouteInfoManager>>
to a plain RouteInfoManager
. This removal of the locking mechanism means that any concurrent access to route_info_manager
must be carefully managed to prevent data races. Please verify that all accesses to route_info_manager
are either confined to a single thread or appropriately synchronized.
175-178
: Verify Synchronization of route_info_manager
Usage
The route_info_manager
is initialized without any locking or reference counting:
route_info_manager: RouteInfoManager::new(
name_server_config.clone(),
remoting_client.clone(),
),
Please ensure that all components interacting with route_info_manager
are aware of this change and that concurrent access does not lead to unsafe behavior.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 175-175: rocketmq-namesrv/src/bootstrap.rs#L175
Added line #L175 was not covered by tests
[warning] 178-178: rocketmq-namesrv/src/bootstrap.rs#L178
Added line #L178 was not covered by tests
rocketmq-namesrv/src/processor/default_request_processor.rs (6)
137-137
: [Duplicate Comment]: Replace .expect()
with proper error handling to prevent panics
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 137-137: rocketmq-namesrv/src/processor/default_request_processor.rs#L137
Added line #L137 was not covered by tests
157-157
: [Duplicate Comment]: Replace .expect()
with proper error handling to prevent panics
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 157-157: rocketmq-namesrv/src/processor/default_request_processor.rs#L157
Added line #L157 was not covered by tests
167-167
: [Duplicate Comment]: Replace .expect()
with proper error handling to prevent panics
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 167-167: rocketmq-namesrv/src/processor/default_request_processor.rs#L167
Added line #L167 was not covered by tests
211-211
: [Duplicate Comment]: Replace .expect()
with proper error handling to prevent panics
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 211-211: rocketmq-namesrv/src/processor/default_request_processor.rs#L211
Added line #L211 was not covered by tests
459-459
: [Duplicate Comment]: Replace .expect()
with proper error handling to prevent panics
281-284
: [Duplicate Comment]: Add unit tests for process_unregister_broker
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 281-281: rocketmq-namesrv/src/processor/default_request_processor.rs#L281
Added line #L281 was not covered by tests
[warning] 284-284: rocketmq-namesrv/src/processor/default_request_processor.rs#L284
Added line #L284 was not covered by tests
rocketmq-namesrv/src/route/route_info_manager.rs (1)
330-331
: Verify Correct Assignment to master_addr
Field
Both result.ha_server_addr
and result.master_addr
are being assigned the value of info.ha_server_addr().clone()
. Please confirm if this is intentional. If result.master_addr
should reflect the master's address rather than the HA server address, ensure that the correct value is assigned.
@@ -49,7 +48,7 @@ | |||
|
|||
impl ClientRequestProcessor { | |||
pub fn new( | |||
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>, | |||
route_info_manager: RouteInfoManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test coverage for the updated constructor
The constructor changes look correct, but test coverage is missing for this critical change. Please add unit tests to verify the behavior with the new RouteInfoManager
type.
Would you like me to help generate unit tests for the constructor and its error cases?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 51-51: rocketmq-namesrv/src/processor/client_request_processor.rs#L51
Added line #L51 was not covered by tests
.unwrap_or(CheetahString::empty()); | ||
result.master_addr = | ||
header.master_addr.clone().unwrap_or(CheetahString::empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Add test coverage for broker registration response handling
The verification confirms that while there are tests in broker_outer_api.rs
, they only cover DNS lookup functionality. The critical broker registration response handling code remains untested. Please add unit tests to cover:
- Response with both addresses present
- Response with missing addresses (testing the
unwrap_or
fallback) - Response with empty addresses
Example test location: Add to the existing tests
module in broker_outer_api.rs
.
🔗 Analysis chain
Add test coverage for broker registration response handling
While the implementation is correct, these critical lines handling broker registration responses lack test coverage. Consider adding unit tests for various scenarios:
- Response with both addresses present
- Response with missing addresses
- Response with empty addresses
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if there are any existing tests for register_broker
rg -l "fn.*test.*register_broker"
Length of output: 35
Script:
#!/bin/bash
# Let's check for any test files in the broker module
fd "test.*\.rs" rocketmq-broker/src
# Also check for any test modules or test functions in the broker_outer_api.rs file
rg -A 5 "#\[test\]|mod test" rocketmq-broker/src/out_api/broker_outer_api.rs
# Check for any test files containing "broker" in their name
fd ".*broker.*test.*\.rs"
# Look for any test functions containing "broker" in their name
rg "fn.*test.*broker"
Length of output: 983
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 279-281: rocketmq-broker/src/out_api/broker_outer_api.rs#L279-L281
Added lines #L279 - L281 were not covered by tests
let mut route_info_manager_arc = self.route_info_manager.clone(); | ||
self.name_server_runtime | ||
.as_ref() | ||
.unwrap() | ||
.schedule_at_fixed_rate( | ||
.schedule_at_fixed_rate_mut( | ||
move || { | ||
route_info_manager_arc.write().scan_not_active_broker(); | ||
route_info_manager_arc.scan_not_active_broker(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Concurrency Issue in Scheduled Task
In the init_processors
method, the closure passed to schedule_at_fixed_rate_mut
calls route_info_manager_arc.scan_not_active_broker()
. Since route_info_manager
no longer uses a RwLock
, this could lead to data races if scan_not_active_broker()
modifies shared state that is accessed elsewhere concurrently.
Consider reintroducing synchronization mechanisms (e.g., Arc<Mutex<RouteInfoManager>>
) or ensuring that route_info_manager
is only accessed from a single thread to prevent concurrency issues.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 111-111: rocketmq-namesrv/src/bootstrap.rs#L111
Added line #L111 was not covered by tests
[warning] 115-115: rocketmq-namesrv/src/bootstrap.rs#L115
Added line #L115 was not covered by tests
[warning] 117-117: rocketmq-namesrv/src/bootstrap.rs#L117
Added line #L117 was not covered by tests
@@ -115,7 +114,7 @@ | |||
fn put_kv_config(&mut self, request: RemotingCommand) -> RemotingCommand { | |||
let request_header = request | |||
.decode_command_custom_header::<PutKVConfigRequestHeader>() | |||
.unwrap(); | |||
.expect("decode PutKVConfigRequestHeader failed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace .expect()
with proper error handling to prevent panics
Using .expect("decode PutKVConfigRequestHeader failed")
will cause the application to panic if decoding fails. Consider handling the error gracefully by returning an appropriate error response to prevent unexpected crashes.
Apply this diff to improve error handling:
-let request_header = request
- .decode_command_custom_header::<PutKVConfigRequestHeader>()
- .expect("decode PutKVConfigRequestHeader failed");
+let request_header = match request.decode_command_custom_header::<PutKVConfigRequestHeader>() {
+ Ok(header) => header,
+ Err(e) => {
+ return RemotingCommand::create_response_command_with_code(
+ RemotingSysResponseCode::SystemError,
+ )
+ .set_remark(format!("Failed to decode PutKVConfigRequestHeader: {}", e));
+ }
+};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
.expect("decode PutKVConfigRequestHeader failed"); | |
let request_header = match request.decode_command_custom_header::<PutKVConfigRequestHeader>() { | |
Ok(header) => header, | |
Err(e) => { | |
return RemotingCommand::create_response_command_with_code( | |
RemotingSysResponseCode::SystemError, | |
) | |
.set_remark(format!("Failed to decode PutKVConfigRequestHeader: {}", e)); | |
} | |
}; |
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 117-117: rocketmq-namesrv/src/processor/default_request_processor.rs#L117
Added line #L117 was not covered by tests
pub fn start(mut route_info_manager: Self, receiver: broadcast::Receiver<SocketAddr>) { | ||
let mut receiver = receiver; | ||
tokio::spawn(async move { | ||
while let Ok(socket_addr) = receiver.recv().await { | ||
route_info_manager | ||
.write() | ||
.connection_disconnected(socket_addr); | ||
route_info_manager.connection_disconnected(socket_addr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Data Race When Moving route_info_manager
into Async Task
Moving route_info_manager
(which owns mutable data) into a spawned async task may lead to data races if it's accessed concurrently elsewhere. Consider wrapping route_info_manager
in an Arc<Mutex<Self>>
to safely share it between threads.
Suggested Change: Use Arc<Mutex<RouteInfoManager>>
for Thread Safety
First, update the start
method signature and parameters:
-pub fn start(mut route_info_manager: Self, receiver: broadcast::Receiver<SocketAddr>) {
+pub fn start(route_info_manager: Arc<Mutex<Self>>, receiver: broadcast::Receiver<SocketAddr>) {
let mut receiver = receiver;
tokio::spawn(async move {
while let Ok(socket_addr) = receiver.recv().await {
- route_info_manager.connection_disconnected(socket_addr);
+ let mut rim = route_info_manager.lock().await;
+ rim.connection_disconnected(socket_addr);
}
});
}
Ensure that all methods accessed within the async task are compatible with the Mutex
locking mechanism.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn start(mut route_info_manager: Self, receiver: broadcast::Receiver<SocketAddr>) { | |
let mut receiver = receiver; | |
tokio::spawn(async move { | |
while let Ok(socket_addr) = receiver.recv().await { | |
route_info_manager | |
.write() | |
.connection_disconnected(socket_addr); | |
route_info_manager.connection_disconnected(socket_addr); | |
pub fn start(route_info_manager: Arc<Mutex<Self>>, receiver: broadcast::Receiver<SocketAddr>) { | |
let mut receiver = receiver; | |
tokio::spawn(async move { | |
while let Ok(socket_addr) = receiver.recv().await { | |
let mut rim = route_info_manager.lock().await; | |
rim.connection_disconnected(socket_addr); | |
} | |
}); |
for (bai, bli) in self.broker_live_table.as_ref() { | ||
if bli.remote_addr == socket_addr { | ||
broker_addr_info = Some(bai.clone()); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure Thread-Safe Access to broker_live_table
When iterating over broker_live_table
, ensure that access is synchronized to prevent data races. Wrap the access in a read lock if using RwLock
, so multiple reads can occur concurrently without issues.
Suggested Change: Use Read Lock When Accessing broker_live_table
-pub fn connection_disconnected(&mut self, socket_addr: SocketAddr) {
- let mut broker_addr_info = None;
- for (bai, bli) in self.broker_live_table.as_ref() {
+pub fn connection_disconnected(&self, socket_addr: SocketAddr) {
+ let mut broker_addr_info = None;
+ let broker_live_table = self.broker_live_table.read().unwrap();
+ for (bai, bli) in broker_live_table.iter() {
if bli.remote_addr == socket_addr {
broker_addr_info = Some(bai.clone());
break;
}
}
Committable suggestion skipped: line range outside the PR's diff.
type TopicQueueTable = ArcMut< | ||
HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>, | ||
>; | ||
type BrokerAddrTable = ArcMut<HashMap<CheetahString /* brokerName */, BrokerData>>; | ||
type ClusterAddrTable = | ||
HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>; | ||
type BrokerLiveTable = HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>; | ||
ArcMut<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>; | ||
type BrokerLiveTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>; | ||
type FilterServerTable = | ||
HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>; | ||
type TopicQueueMappingInfoTable = HashMap< | ||
CheetahString, /* topic */ | ||
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>, | ||
ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>; | ||
type TopicQueueMappingInfoTable = ArcMut< | ||
HashMap< | ||
CheetahString, /* topic */ | ||
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>, | ||
>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure Proper Synchronization with ArcMut
Usage
The types TopicQueueTable
, BrokerAddrTable
, ClusterAddrTable
, BrokerLiveTable
, FilterServerTable
, and TopicQueueMappingInfoTable
have been changed to use ArcMut<HashMap<...>>
, allowing mutable shared ownership across threads. Please verify that ArcMut
provides the necessary synchronization mechanisms to prevent data races and ensure thread safety. If ArcMut
does not internally handle synchronization, consider using standard thread-safe primitives like Arc<RwLock<...>>
or Arc<Mutex<...>>
.
Suggested Change: Replace ArcMut
with Arc<RwLock<...>>
If ArcMut
does not handle synchronization, you can modify the type definitions as follows:
-type TopicQueueTable = ArcMut<
+type TopicQueueTable = Arc<RwLock<
HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>,
->
+>>;
Similarly, apply this change to the other type definitions:
-type BrokerAddrTable = ArcMut<HashMap<CheetahString /* brokerName */, BrokerData>>;
+type BrokerAddrTable = Arc<RwLock<HashMap<CheetahString /* brokerName */, BrokerData>>>;
-type ClusterAddrTable = ArcMut<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>;
+type ClusterAddrTable = Arc<RwLock<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>>;
-type BrokerLiveTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>;
+type BrokerLiveTable = Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>>;
-type FilterServerTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>;
+type FilterServerTable = Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>>>;
-type TopicQueueMappingInfoTable = ArcMut<
+type TopicQueueMappingInfoTable = Arc<RwLock<
HashMap<
CheetahString, /* topic */
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>,
>,
->>;
+>>;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
type TopicQueueTable = ArcMut< | |
HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>, | |
>; | |
type BrokerAddrTable = ArcMut<HashMap<CheetahString /* brokerName */, BrokerData>>; | |
type ClusterAddrTable = | |
HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>; | |
type BrokerLiveTable = HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>; | |
ArcMut<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>; | |
type BrokerLiveTable = ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>; | |
type FilterServerTable = | |
HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>; | |
type TopicQueueMappingInfoTable = HashMap< | |
CheetahString, /* topic */ | |
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>, | |
ArcMut<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>; | |
type TopicQueueMappingInfoTable = ArcMut< | |
HashMap< | |
CheetahString, /* topic */ | |
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>, | |
>, | |
type TopicQueueTable = Arc<RwLock< | |
HashMap<CheetahString /* topic */, HashMap<CheetahString /* broker name */, QueueData>>, | |
>>; | |
type BrokerAddrTable = Arc<RwLock<HashMap<CheetahString /* brokerName */, BrokerData>>>; | |
type ClusterAddrTable = | |
Arc<RwLock<HashMap<CheetahString /* clusterName */, HashSet<CheetahString /* brokerName */>>>>; | |
type BrokerLiveTable = Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, BrokerLiveInfo>>>; | |
type FilterServerTable = | |
Arc<RwLock<HashMap<BrokerAddrInfo /* brokerAddr */, Vec<String> /* Filter Server */>>>>; | |
type TopicQueueMappingInfoTable = Arc<RwLock< | |
HashMap< | |
CheetahString, /* topic */ | |
HashMap<CheetahString /* brokerName */, TopicQueueMappingInfo>, | |
>, | |
>>; |
topic_queue_table: ArcMut::new(HashMap::new()), | ||
broker_addr_table: ArcMut::new(HashMap::new()), | ||
cluster_addr_table: ArcMut::new(HashMap::new()), | ||
broker_live_table: ArcMut::new(HashMap::new()), | ||
filter_server_table: ArcMut::new(HashMap::new()), | ||
topic_queue_mapping_info_table: ArcMut::new(HashMap::new()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Initialize Shared Data Structures with Appropriate Locks
In the RouteInfoManager::new
method, the data structures are initialized using ArcMut::new(HashMap::new())
. If you adopt Arc<RwLock<...>>
, initialize them as follows to ensure thread safety:
-route_info_manager {
- topic_queue_table: ArcMut::new(HashMap::new()),
- broker_addr_table: ArcMut::new(HashMap::new()),
- cluster_addr_table: ArcMut::new(HashMap::new()),
- broker_live_table: ArcMut::new(HashMap::new()),
- filter_server_table: ArcMut::new(HashMap::new()),
- topic_queue_mapping_info_table: ArcMut::new(HashMap::new()),
+RouteInfoManager {
+ topic_queue_table: Arc::new(RwLock::new(HashMap::new())),
+ broker_addr_table: Arc::new(RwLock::new(HashMap::new())),
+ cluster_addr_table: Arc::new(RwLock::new(HashMap::new())),
+ broker_live_table: Arc::new(RwLock::new(HashMap::new())),
+ filter_server_table: Arc::new(RwLock::new(HashMap::new())),
+ topic_queue_mapping_info_table: Arc::new(RwLock::new(HashMap::new())),
namesrv_config,
remoting_client,
}
Committable suggestion skipped: line range outside the PR's diff.
Which Issue(s) This PR Fixes(Closes)
Fixes #1253
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
RemotingCommand
struct.Bug Fixes
Documentation
Tests