Skip to content

[ISSUE #2090]💫Implement PopReviveService#mergeAndRevive⚗️ #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 4, 2025

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jan 4, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #2090

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • Improvements
    • Enhanced PopReviveService with improved subscription group management
    • Refactored revival process to support asynchronous operations
    • Improved error handling and logging for revival operations

Copy link
Contributor

coderabbitai bot commented Jan 4, 2025

Walkthrough

The pull request modifies the PopReviveService in the RocketMQ broker's processor service. A new field subscription_group_manager is added to the struct, enabling better subscription group management. The merge_and_revive method is transformed from a synchronous to an asynchronous function, introducing improved error handling and the ability to perform non-blocking operations. The changes enhance the service's capability to manage and track revive requests more effectively.

Changes

File Change Summary
rocketmq-broker/src/processor/processor_service/pop_revive_service.rs - Added subscription_group_manager: Arc<SubscriptionGroupManager<MS>> field
- Converted merge_and_revive method to async with Result<()> return type
- Improved error handling and logging

Assessment against linked issues

Objective Addressed Explanation
Implement PopReviveService#mergeAndRevive [#2090]

Possibly related PRs

Suggested labels

feature, auto merge, ready to review, waiting-review, AI review first, rocketmq-broker crate

Suggested reviewers

  • TeslaRustor
  • SpaceXCN
  • rocketmq-rust-bot

Poem

🚀 Revive, oh service swift and bright,
Async magic takes its flight!
Subscription groups now dance with grace,
Errors caught in code's embrace.
RocketMQ leaps to new height! 🐰


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot rocketmq-rust-bot self-requested a review January 4, 2025 15:52
@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Jan 4, 2025
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@rocketmq-rust-robot rocketmq-rust-robot added this to the v0.4.0 milestone Jan 4, 2025
Copy link

codecov bot commented Jan 4, 2025

Codecov Report

Attention: Patch coverage is 0% with 108 lines in your changes missing coverage. Please review.

Project coverage is 28.39%. Comparing base (259be44) to head (2027b49).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
.../processor/processor_service/pop_revive_service.rs 0.00% 108 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2093      +/-   ##
==========================================
- Coverage   28.43%   28.39%   -0.05%     
==========================================
  Files         492      492              
  Lines       69976    70081     +105     
==========================================
  Hits        19898    19898              
- Misses      50078    50183     +105     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
rocketmq-broker/src/processor/processor_service/pop_revive_service.rs (5)

347-350: Consider enhanced error handling.
Invoking merge_and_revive is fine, but when errors occur, only logging and continuing may mask persistent failures. Consider retry logic, exponential backoff, or additional monitoring if this error is critical.


635-650: Topic existence check.
Skipping the checkpoint if the topic config is missing makes sense. However, consider whether the topic might appear shortly afterward or should be created automatically.


651-655: Subscription group config check.
Returning immediately on missing subscription group config is valid. If dynamic subscription group creation is intended, capture or queue the checkpoint for retry.


715-718: Unimplemented re_put_ck.
This method is required to handle retries or re-adding checkpoints. Provide at least a basic implementation or panic with a clear message if not yet ready.


720-722: Unimplemented revive_msg_from_ck.
Similar to re_put_ck, this is a placeholder. Implement or remove to prevent unexpected runtime panics if called.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5d6a3b2 and 2027b49.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/processor_service/pop_revive_service.rs (6 hunks)
🔇 Additional comments (12)
rocketmq-broker/src/processor/processor_service/pop_revive_service.rs (12)

56-56: Dependency import looks good.
No issues found with importing SubscriptionGroupManager, as it is used elsewhere in the code for retrieving subscription group configs.


73-73: Field addition aligns with usage.
The subscription_group_manager field, wrapped in an Arc, is appropriate for concurrent environments and is utilized in merge_and_revive.


86-86: Constructor parameter is coherent.
Passing subscription_group_manager as an Arc<SubscriptionGroupManager<MS>> to the constructor is consistent with the struct’s usage.


104-104: Proper struct initialization.
This line correctly assigns subscription_group_manager during struct construction, ensuring all fields are fully populated.


596-599: Async signature looks consistent.
Transforming merge_and_revive into an async function allows non-blocking operations. Be sure it is only invoked where async context is available.


600-603: Initialization logic is clear.
Storing old and end offsets and sorting the list keep the method organized and maintain relevant state for subsequent operations.


604-607: Logging for debug clarity.
Logging the size of the checkpoint list provides a quick snapshot for debugging. This is helpful for diagnosing issues.


608-619: First and last checkpoint logging.
Unwrapping sort_list.last().unwrap() is safe under the !sort_list.is_empty() check. Logging these values is useful for operational insight.


620-627: Condition on should_run_pop_revive.
Breaking out early if should_run_pop_revive flips to false is logical. Ensure the rest of the code handles partial iteration gracefully.


629-633: Time-based break condition might skip needed revivals.
You break if end_time - pop_check_point.get_revive_time() is too short. This logic may miss some checkpoints if the clock is slightly behind or if certain checkpoints need immediate attention. Confirm this arrangement is intentional.


686-694: Cleanup and offset update.
Removing stale inflight checkpoints and updating new_offset ensures correct synchronization with the process. This looks fine overall.


695-713: Offset commit approach is sensible.
Committing only when new_offset > old_offset avoids redundant updates. The final assignment to self.revive_offset and consume_revive_obj.new_offset properly synchronizes the next iteration.

Comment on lines +663 to +685

// may be need to optimize
let mut remove = vec![];
let length = self.inflight_revive_request_map.lock().len();
while length - remove.len() > 3 {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let mut inflight_map = self.inflight_revive_request_map.lock();
let entry = inflight_map.first_entry().unwrap();
let pair = entry.get();
let old_ck = entry.key();
if !pair.1 && (get_current_millis() - pair.0 as u64 > 30 * 1000) {
self.re_put_ck(old_ck, pair);
remove.push(old_ck.clone());
info!(
"stay too long, remove from reviveRequestMap, {}, {:?}, {}, {}",
pop_check_point.topic,
pop_check_point.broker_name,
pop_check_point.queue_id,
pop_check_point.start_offset
);
}
}
let mut inflight_revive_request_map = self.inflight_revive_request_map.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Concurrent inflight map handling.
Heavy locking around inflight_revive_request_map may block other tasks. The loop containing a sleep could degrade throughput if many entries accumulate. Consider more efficient concurrency patterns (e.g., a queue of pending entries or asynchronous channels) to avoid repeated sleeps under the lock.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement PopReviveService#mergeAndRevive
3 participants