Skip to content

[ISSUE #1522]♻️Refactor UpdateConsumerOffsetRequestHeader with derive macro RequestHeaderCodec #1523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 3, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Dec 3, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1522

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Enhanced error handling for consumer offset updates, providing clearer logging for debugging.
    • Required fields for queue_id and commit_offset in the update request header, improving data integrity.
  • Bug Fixes

    • Improved handling of offset not found errors, returning specific error codes for better clarity.
  • Tests

    • Added unit tests for the UpdateConsumerOffsetRequestHeader to verify serialization and deserialization.

Copy link
Contributor

coderabbitai bot commented Dec 3, 2024

Walkthrough

The pull request introduces modifications to the handling of consumer offsets in the RocketMQ codebase. Key changes include the removal of checks for None values in the update_consumer_offset method, altering error handling for queue_id and offset. Additionally, the UpdateConsumerOffsetRequestHeader struct is refactored to mandate these fields, enhancing type safety. The RemoteBrokerOffsetStore methods have been updated for clearer error logging and improved request header construction. Overall, the changes aim to enhance clarity and robustness in offset management.

Changes

File Change Summary
rocketmq-broker/src/processor/consumer_manage_processor.rs Modified update_consumer_offset to comment out checks for queue_id and offset, affecting error handling.
rocketmq-client/src/consumer/store/remote_broker_offset_store.rs Updated update_consume_offset_to_broker to simplify request header construction; improved error handling in fetch_consume_offset_from_broker and refined read_offset error reporting.
rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs Refactored UpdateConsumerOffsetRequestHeader to enforce mandatory queue_id and commit_offset; updated methods for serialization and deserialization; added unit tests.

Assessment against linked issues

Objective Addressed Explanation
Refactor UpdateConsumerOffsetRequestHeader with derive macro RequestHeaderCodec (#1522)

Possibly related PRs

Suggested reviewers

  • SpaceXCN
  • TeslaRustor

Poem

🐰 In the code where offsets play,
A refactor brightens up the way.
With checks removed, we boldly tread,
Ensuring paths where errors fled.
In headers strong, we find our peace,
As clarity and structure increase! 🌟


📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 35e564c and 3d5fe96.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/consumer_manage_processor.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • rocketmq-broker/src/processor/consumer_manage_processor.rs

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (3)
rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs (2)

217-219: Simplify the queue_id getter method

Since queue_id is now a required field of type i32, returning an Option<i32> is unnecessary. Consider changing the return type to i32 for consistency and clarity.

Apply this diff to adjust the getter method:

-fn queue_id(&self) -> Option<i32> {
-    Some(self.queue_id)
+fn queue_id(&self) -> i32 {
+    self.queue_id
}

Line range hint 43-115: Remove commented-out code to improve readability

The large block of commented-out code is no longer needed and can be removed to keep the codebase clean and maintainable.

Delete the commented code to declutter the file.

rocketmq-broker/src/processor/consumer_manage_processor.rs (1)

238-239: Ensure offset commitment considers valid range

When committing offsets, it's important to validate that the offset is within a valid range (e.g., non-negative). Consider adding a check to ensure offset is not negative before committing it.

Add a validation step:

if offset < 0 {
    return Some(
        response
            .set_code(ResponseCode::SystemError)
            .set_remark(format!("Invalid offset value: {}", offset)),
    );
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b4b6c59 and 35e564c.

📒 Files selected for processing (3)
  • rocketmq-broker/src/processor/consumer_manage_processor.rs (1 hunks)
  • rocketmq-client/src/consumer/store/remote_broker_offset_store.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs (3 hunks)
🔇 Additional comments (2)
rocketmq-broker/src/processor/consumer_manage_processor.rs (1)

205-218: Validate queue_id and offset before processing

By removing the checks for queue_id and offset being None, the method assumes that these fields are always valid. However, to prevent potential issues, especially when interacting with older clients, consider adding validation to ensure these fields contain acceptable values.

Run the following script to check if there are any client requests that might send queue_id or offset as None:

rocketmq-client/src/consumer/store/remote_broker_offset_store.rs (1)

285-286: Update method to reflect changes in request header

The UpdateConsumerOffsetRequestHeader now requires queue_id and commit_offset as non-optional fields. The updated code correctly passes these values. Ensure that all other usages of this struct in the codebase are updated accordingly.

Comment on lines +225 to +341
topic_request_header: None,
};
let map = header.to_map().unwrap();
assert_eq!(
map.get(&CheetahString::from_static_str("consumerGroup"))
.unwrap(),
"test_consumer_group"
);
assert_eq!(
map.get(&CheetahString::from_static_str("topic")).unwrap(),
"test_topic"
);
assert_eq!(
map.get(&CheetahString::from_static_str("queueId")).unwrap(),
"1"
);
assert_eq!(
map.get(&CheetahString::from_static_str("commitOffset"))
.unwrap(),
"100"
);
}

#[test]
fn update_consumer_offset_request_header_deserializes_correctly() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("consumerGroup"),
CheetahString::from_static_str("test_consumer_group"),
);
map.insert(
CheetahString::from_static_str("topic"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("queueId"),
CheetahString::from_static_str("1"),
);
map.insert(
CheetahString::from_static_str("commitOffset"),
CheetahString::from_static_str("100"),
);

let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
assert_eq!(header.consumer_group, "test_consumer_group");
assert_eq!(header.topic, "test_topic");
assert_eq!(header.queue_id, 1);
assert_eq!(header.commit_offset, 100);
}

#[test]
fn update_consumer_offset_request_header_handles_missing_optional_fields() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("consumerGroup"),
CheetahString::from_static_str("test_consumer_group"),
);
map.insert(
CheetahString::from_static_str("topic"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("queueId"),
CheetahString::from_static_str("1"),
);
map.insert(
CheetahString::from_static_str("commitOffset"),
CheetahString::from_static_str("100"),
);

let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
assert_eq!(header.consumer_group, "test_consumer_group");
assert_eq!(header.topic, "test_topic");
assert_eq!(header.queue_id, 1);
assert_eq!(header.commit_offset, 100);
assert!(header.topic_request_header.is_some());
}

#[test]
fn update_consumer_offset_request_header_handles_invalid_data() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("consumerGroup"),
CheetahString::from_static_str("test_consumer_group"),
);
map.insert(
CheetahString::from_static_str("topic"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("queueId"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("commitOffset"),
CheetahString::from_static_str("invalid"),
);

let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map);
assert!(result.is_err());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure unit tests reflect changes to required fields

The unit tests still reference optional fields for queue_id and commit_offset. Update the tests to match the new struct definition where these fields are mandatory.

For example, in your tests, remove any Option wrapping around queue_id and commit_offset and ensure all test cases cover scenarios with valid and invalid data appropriately.

Comment on lines +217 to +222
Some(self.queue_id)
}

fn set_queue_id(&mut self, queue_id: Option<i32>) {
self.queue_id = queue_id;
self.queue_id = queue_id.unwrap();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential panic in set_queue_id method

The set_queue_id method unwraps an Option<i32> without checking if it is None, which can lead to a panic if queue_id is None. To prevent this, consider changing the method to accept i32 directly or handle the None case appropriately.

Apply this diff to fix the potential panic:

-def set_queue_id(&mut self, queue_id: Option<i32>) {
-    self.queue_id = queue_id.unwrap();
+def set_queue_id(&mut self, queue_id: i32) {
+    self.queue_id = queue_id;
}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 229 to 230
queue_id,
offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct logging placeholders in info! macro

In the log statement, the placeholders do not match the variables provided. Adjust the variables to match the placeholders for accurate logging.

Apply this diff to fix the logging statement:

-offset
+queue_id,
+offset

Resulting in:

info!(
    "Update consumer offset is rejected because of previous offset-reset. \
     Group={},Topic={}, QueueId={}, Offset={}",
    group,
    topic,
    queue_id,
    offset
);

Copy link

codecov bot commented Dec 3, 2024

Codecov Report

Attention: Patch coverage is 82.11382% with 22 lines in your changes missing coverage. Please review.

Project coverage is 24.87%. Comparing base (b4b6c59) to head (3d5fe96).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...-broker/src/processor/consumer_manage_processor.rs 0.00% 17 Missing ⚠️
...c/protocol/header/update_consumer_offset_header.rs 97.11% 3 Missing ⚠️
...t/src/consumer/store/remote_broker_offset_store.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1523      +/-   ##
==========================================
+ Coverage   24.72%   24.87%   +0.15%     
==========================================
  Files         450      450              
  Lines       59468    59509      +41     
==========================================
+ Hits        14703    14805     +102     
+ Misses      44765    44704      -61     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge refactor♻️ refactor code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Refactor♻️]Refactor UpdateConsumerOffsetRequestHeader with derive macro RequestHeaderCodec
4 participants