-
Notifications
You must be signed in to change notification settings - Fork 143
[ISSUE #1522]♻️Refactor UpdateConsumerOffsetRequestHeader with derive macro RequestHeaderCodec #1523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… macro RequestHeaderCodec
WalkthroughThe pull request introduces modifications to the handling of consumer offsets in the RocketMQ codebase. Key changes include the removal of checks for Changes
Assessment against linked issues
Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (3)
rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs (2)
217-219
: Simplify thequeue_id
getter methodSince
queue_id
is now a required field of typei32
, returning anOption<i32>
is unnecessary. Consider changing the return type toi32
for consistency and clarity.Apply this diff to adjust the getter method:
-fn queue_id(&self) -> Option<i32> { - Some(self.queue_id) +fn queue_id(&self) -> i32 { + self.queue_id }
Line range hint
43-115
: Remove commented-out code to improve readabilityThe large block of commented-out code is no longer needed and can be removed to keep the codebase clean and maintainable.
Delete the commented code to declutter the file.
rocketmq-broker/src/processor/consumer_manage_processor.rs (1)
238-239
: Ensure offset commitment considers valid rangeWhen committing offsets, it's important to validate that the offset is within a valid range (e.g., non-negative). Consider adding a check to ensure
offset
is not negative before committing it.Add a validation step:
if offset < 0 { return Some( response .set_code(ResponseCode::SystemError) .set_remark(format!("Invalid offset value: {}", offset)), ); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (3)
rocketmq-broker/src/processor/consumer_manage_processor.rs
(1 hunks)rocketmq-client/src/consumer/store/remote_broker_offset_store.rs
(1 hunks)rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs
(3 hunks)
🔇 Additional comments (2)
rocketmq-broker/src/processor/consumer_manage_processor.rs (1)
205-218
: Validate queue_id
and offset
before processing
By removing the checks for queue_id
and offset
being None
, the method assumes that these fields are always valid. However, to prevent potential issues, especially when interacting with older clients, consider adding validation to ensure these fields contain acceptable values.
Run the following script to check if there are any client requests that might send queue_id
or offset
as None
:
rocketmq-client/src/consumer/store/remote_broker_offset_store.rs (1)
285-286
: Update method to reflect changes in request header
The UpdateConsumerOffsetRequestHeader
now requires queue_id
and commit_offset
as non-optional fields. The updated code correctly passes these values. Ensure that all other usages of this struct in the codebase are updated accordingly.
topic_request_header: None, | ||
}; | ||
let map = header.to_map().unwrap(); | ||
assert_eq!( | ||
map.get(&CheetahString::from_static_str("consumerGroup")) | ||
.unwrap(), | ||
"test_consumer_group" | ||
); | ||
assert_eq!( | ||
map.get(&CheetahString::from_static_str("topic")).unwrap(), | ||
"test_topic" | ||
); | ||
assert_eq!( | ||
map.get(&CheetahString::from_static_str("queueId")).unwrap(), | ||
"1" | ||
); | ||
assert_eq!( | ||
map.get(&CheetahString::from_static_str("commitOffset")) | ||
.unwrap(), | ||
"100" | ||
); | ||
} | ||
|
||
#[test] | ||
fn update_consumer_offset_request_header_deserializes_correctly() { | ||
let mut map = HashMap::new(); | ||
map.insert( | ||
CheetahString::from_static_str("consumerGroup"), | ||
CheetahString::from_static_str("test_consumer_group"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("topic"), | ||
CheetahString::from_static_str("test_topic"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("queueId"), | ||
CheetahString::from_static_str("1"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("commitOffset"), | ||
CheetahString::from_static_str("100"), | ||
); | ||
|
||
let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap(); | ||
assert_eq!(header.consumer_group, "test_consumer_group"); | ||
assert_eq!(header.topic, "test_topic"); | ||
assert_eq!(header.queue_id, 1); | ||
assert_eq!(header.commit_offset, 100); | ||
} | ||
|
||
#[test] | ||
fn update_consumer_offset_request_header_handles_missing_optional_fields() { | ||
let mut map = HashMap::new(); | ||
map.insert( | ||
CheetahString::from_static_str("consumerGroup"), | ||
CheetahString::from_static_str("test_consumer_group"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("topic"), | ||
CheetahString::from_static_str("test_topic"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("queueId"), | ||
CheetahString::from_static_str("1"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("commitOffset"), | ||
CheetahString::from_static_str("100"), | ||
); | ||
|
||
let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap(); | ||
assert_eq!(header.consumer_group, "test_consumer_group"); | ||
assert_eq!(header.topic, "test_topic"); | ||
assert_eq!(header.queue_id, 1); | ||
assert_eq!(header.commit_offset, 100); | ||
assert!(header.topic_request_header.is_some()); | ||
} | ||
|
||
#[test] | ||
fn update_consumer_offset_request_header_handles_invalid_data() { | ||
let mut map = HashMap::new(); | ||
map.insert( | ||
CheetahString::from_static_str("consumerGroup"), | ||
CheetahString::from_static_str("test_consumer_group"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("topic"), | ||
CheetahString::from_static_str("test_topic"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("queueId"), | ||
CheetahString::from_static_str("invalid"), | ||
); | ||
map.insert( | ||
CheetahString::from_static_str("commitOffset"), | ||
CheetahString::from_static_str("invalid"), | ||
); | ||
|
||
let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map); | ||
assert!(result.is_err()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure unit tests reflect changes to required fields
The unit tests still reference optional fields for queue_id
and commit_offset
. Update the tests to match the new struct definition where these fields are mandatory.
For example, in your tests, remove any Option
wrapping around queue_id
and commit_offset
and ensure all test cases cover scenarios with valid and invalid data appropriately.
Some(self.queue_id) | ||
} | ||
|
||
fn set_queue_id(&mut self, queue_id: Option<i32>) { | ||
self.queue_id = queue_id; | ||
self.queue_id = queue_id.unwrap(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid potential panic in set_queue_id
method
The set_queue_id
method unwraps an Option<i32>
without checking if it is None
, which can lead to a panic if queue_id
is None
. To prevent this, consider changing the method to accept i32
directly or handle the None
case appropriately.
Apply this diff to fix the potential panic:
-def set_queue_id(&mut self, queue_id: Option<i32>) {
- self.queue_id = queue_id.unwrap();
+def set_queue_id(&mut self, queue_id: i32) {
+ self.queue_id = queue_id;
}
Committable suggestion skipped: line range outside the PR's diff.
queue_id, | ||
offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct logging placeholders in info!
macro
In the log statement, the placeholders do not match the variables provided. Adjust the variables to match the placeholders for accurate logging.
Apply this diff to fix the logging statement:
-offset
+queue_id,
+offset
Resulting in:
info!(
"Update consumer offset is rejected because of previous offset-reset. \
Group={},Topic={}, QueueId={}, Offset={}",
group,
topic,
queue_id,
offset
);
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1523 +/- ##
==========================================
+ Coverage 24.72% 24.87% +0.15%
==========================================
Files 450 450
Lines 59468 59509 +41
==========================================
+ Hits 14703 14805 +102
+ Misses 44765 44704 -61 ☔ View full report in Codecov by Sentry. |
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Which Issue(s) This PR Fixes(Closes)
Fixes #1522
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
queue_id
andcommit_offset
in the update request header, improving data integrity.Bug Fixes
Tests
UpdateConsumerOffsetRequestHeader
to verify serialization and deserialization.