Skip to content

[ISSUE #3472]🚀Add FlowMonitor for tracking data transfer metrics and flow control management✨ #3473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 16, 2025

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jun 16, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #3472

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced a flow monitoring feature to track and control message transfer rates.
    • Added the ability to configure a maximum transfer rate for high availability (HA) message transfers, with a default limit set to 100 MB per second.
  • Bug Fixes

    • Ensured the default transfer rate limit is consistently applied during configuration and deserialization.

@Copilot Copilot AI review requested due to automatic review settings June 16, 2025 15:24
Copy link
Contributor

coderabbitai bot commented Jun 16, 2025

Walkthrough

A new flow_monitor module has been added to the RocketMQ message store, introducing a FlowMonitor struct for tracking and controlling data transfer rates. The configuration now provides a default maximum transfer rate of 100 MB/s. The module includes lifecycle management, flow control logic, and unit tests for these features.

Changes

File(s) Change Summary
rocketmq-store/src/config/message_store_config.rs Added default function for max_ha_transfer_byte_in_second (100 MB); updated config defaults.
rocketmq-store/src/ha.rs Added pub(crate) mod flow_monitor to module hierarchy.
rocketmq-store/src/ha/flow_monitor.rs Introduced FlowMonitor and FlowMonitorInner for monitoring and controlling transfer speeds; includes lifecycle methods, flow control logic, and unit tests.

Sequence Diagram(s)

sequenceDiagram
    participant App
    participant FlowMonitor
    participant FlowMonitorInner
    participant Config

    App->>FlowMonitor: new(Arc<MessageStoreConfig>)
    App->>FlowMonitor: start()
    FlowMonitor->>FlowMonitorInner: start service task
    loop Every second
        FlowMonitorInner->>FlowMonitorInner: calculate_speed()
    end
    App->>FlowMonitor: shutdown()
    FlowMonitor->>FlowMonitorInner: stop service task
Loading

Assessment against linked issues

Objective Addressed Explanation
Add FlowMonitor for tracking data transfer metrics and flow control management (#3472)

Poem

In the warren, bytes now flow,
Monitored by bunnies, steady and slow.
With FlowMonitor’s gentle, watchful gaze,
Data hops along in measured ways.
From config’s field to module’s heart,
Our RocketMQ store gets a clever new start!
🐇💾✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Jun 16, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new FlowMonitor to track data transfer metrics and enforce HA flow control limits.

  • Adds a flow_monitor module with a background service task that calculates per-second transfer speeds.
  • Registers the new module in ha.rs.
  • Adds a default configuration for max_ha_transfer_byte_in_second and wires it into MessageStoreConfig.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
rocketmq-store/src/ha/flow_monitor.rs Implements FlowMonitor and its inner service task
rocketmq-store/src/ha.rs Exposes the new flow_monitor submodule
rocketmq-store/src/config/message_store_config.rs Defines a const default, updates the serde default, and sets Default impl
Comments suppressed due to low confidence (2)

rocketmq-store/src/ha/flow_monitor.rs:48

  • [nitpick] Public APIs like FlowMonitor and its methods lack doc comments. Adding Rustdoc for new, start, and shutdown would improve clarity for consumers.
struct FlowMonitorInner {

rocketmq-store/src/ha/flow_monitor.rs:121

  • [nitpick] These async tests don't assert any post-conditions; consider verifying that the service is actually running or that state changes occurred.
    async fn flow_monitor_starts_successfully() {

@@ -710,7 +714,7 @@ pub struct MessageStoreConfig {
#[serde(default)]
pub ha_flow_control_enable: bool,

#[serde(default)]
#[serde(default = "defaults::topic_queue_lock_num")]
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serde default for max_ha_transfer_byte_in_second is referencing defaults::topic_queue_lock_num by mistake. It should use defaults::max_ha_transfer_byte_in_second to match the new const function.

Copilot uses AI. Check for mistakes.

Comment on lines +39 to +44
pub async fn start(&self) {
self.server_manager.start().await.unwrap();
}

pub async fn shutdown(&self) {
self.server_manager.shutdown().await.unwrap();
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using unwrap() in library code: propagate errors or handle them gracefully instead of panicking on service startup failure.

Suggested change
pub async fn start(&self) {
self.server_manager.start().await.unwrap();
}
pub async fn shutdown(&self) {
self.server_manager.shutdown().await.unwrap();
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
self.server_manager.start().await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), Box<dyn std::error::Error>> {
self.server_manager.shutdown().await?;
Ok(())

Copilot uses AI. Check for mistakes.

Comment on lines +43 to +44
pub async fn shutdown(&self) {
self.server_manager.shutdown().await.unwrap();
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using unwrap() here as well; consider returning a Result or logging an error instead of panicking.

Suggested change
pub async fn shutdown(&self) {
self.server_manager.shutdown().await.unwrap();
pub async fn shutdown(&self) -> Result<(), Box<dyn std::error::Error>> {
match self.server_manager.shutdown().await {
Ok(_) => Ok(()),
Err(e) => {
eprintln!("Error during shutdown: {}", e);
Err(Box::new(e))
}
}

Copilot uses AI. Check for mistakes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
rocketmq-store/src/config/message_store_config.rs (1)

267-270: defaults::max_ha_transfer_byte_in_second is never used

You added a dedicated default provider but the struct field below still references another function. Unless this is consumed elsewhere, the compiler will optimise it away.

rocketmq-store/src/ha/flow_monitor.rs (3)

63-68: Atomic ordering too relaxed for producer/consumer scenario

calculate_speed() writes with store(Relaxed) then resets the counter with another relaxed store.
When add_byte_count_transferred() is called from other threads the update-vs-read sequence may be reordered, giving inaccurate speeds. Use Acquire/Release pair or at least Ordering::SeqCst.


86-88: Potential counter overflow

fetch_add will wrap on i64::MAX. Use saturating_add pattern or validate the value before adding if extremely long-running brokers are expected.


120-126: Unit test leaks background task

flow_monitor_starts_successfully never calls shutdown(), leaving a running service behind the test runtime.
Add a shutdown to avoid flakiness:

-        monitor.start().await;
+        monitor.start().await;
+        monitor.shutdown().await;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 092c71f and ef0fb0a.

📒 Files selected for processing (3)
  • rocketmq-store/src/config/message_store_config.rs (3 hunks)
  • rocketmq-store/src/ha.rs (1 hunks)
  • rocketmq-store/src/ha/flow_monitor.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
rocketmq-store/src/ha/flow_monitor.rs (1)
rocketmq-store/src/config/message_store_config.rs (1)
  • default (806-994)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: test
  • GitHub Check: build
  • GitHub Check: auto-approve
🔇 Additional comments (2)
rocketmq-store/src/ha.rs (1)

18-20: Module declaration looks good

flow_monitor is correctly registered in the ha module tree.
No further action required.

rocketmq-store/src/config/message_store_config.rs (1)

962-965: Keep struct default and Serde default in sync

Default::default() sets max_ha_transfer_byte_in_second to 100 MB, which mismatches the 32 B/s coming from the Serde attribute above. After applying the previous fix the two code paths will again be consistent.

Comment on lines +717 to 719
#[serde(default = "defaults::topic_queue_lock_num")]
pub max_ha_transfer_byte_in_second: usize,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Wrong Serde default provider – causes an invalid limit (32 bytes/s!)

#[serde(default = "defaults::topic_queue_lock_num")]
pub max_ha_transfer_byte_in_second: usize,

topic_queue_lock_num() returns 32, not 100 MB. Deserialised configs will silently get an unusable 32 B/s cap and immediately throttle replication.

Fix:

-    #[serde(default = "defaults::topic_queue_lock_num")]
+    #[serde(default = "defaults::max_ha_transfer_byte_in_second")]
     pub max_ha_transfer_byte_in_second: usize,
🤖 Prompt for AI Agents
In rocketmq-store/src/config/message_store_config.rs around lines 717 to 719,
the serde default attribute for max_ha_transfer_byte_in_second incorrectly uses
defaults::topic_queue_lock_num, which returns 32, causing an invalid 32 bytes
per second limit. Change the default provider to a function that returns the
correct default value for max_ha_transfer_byte_in_second, such as a new or
existing function returning 100 MB, to ensure proper deserialization and avoid
unintended throttling.

Comment on lines +70 to +84
pub fn can_transfer_max_byte_num(&self) -> i32 {
if self.is_flow_control_enable() {
let max_bytes = self.max_transfer_byte_in_second() as i64;
let current_transferred = self.transferred_byte.load(Ordering::Relaxed);
let res = std::cmp::max(max_bytes - current_transferred, 0);

if res > i32::MAX as i64 {
i32::MAX
} else {
res as i32
}
} else {
i32::MAX
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Return type should be usize, not signed i32

The function computes “how many more bytes can be sent”.
usize:

  1. avoids the weird max( … , 0 ) dance,
  2. matches the config type,
  3. removes the cast / clamp logic.
-    pub fn can_transfer_max_byte_num(&self) -> i32 {
+    pub fn can_transfer_max_byte_num(&self) -> usize {

Adjust callers accordingly.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In rocketmq-store/src/ha/flow_monitor.rs around lines 70 to 84, the function
can_transfer_max_byte_num returns an i32 but should return usize to better
represent byte counts. Change the return type to usize, remove the max(..., 0)
check since usize is unsigned, and eliminate the cast and clamping logic. Update
all callers of this function to handle usize instead of i32 accordingly.

Comment on lines +39 to +45
pub async fn start(&self) {
self.server_manager.start().await.unwrap();
}

pub async fn shutdown(&self) {
self.server_manager.shutdown().await.unwrap();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid .unwrap() – surface errors to the caller

start() / shutdown() panic on failure, taking the whole broker down. Propagate the error instead:

-    pub async fn start(&self) {
-        self.server_manager.start().await.unwrap();
+    pub async fn start(&self) -> anyhow::Result<()> {
+        self.server_manager.start().await
     }

Do the same for shutdown().

🤖 Prompt for AI Agents
In rocketmq-store/src/ha/flow_monitor.rs around lines 39 to 45, the start() and
shutdown() methods use unwrap() on async calls, which causes panics on failure.
Change these methods to return a Result type and propagate errors to the caller
instead of unwrapping. Replace unwrap() with proper error handling by returning
the error from the async calls.

Copy link

codecov bot commented Jun 16, 2025

Codecov Report

Attention: Patch coverage is 93.33333% with 6 lines in your changes missing coverage. Please review.

Project coverage is 26.66%. Comparing base (092c71f) to head (ef0fb0a).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-store/src/config/message_store_config.rs 25.00% 3 Missing ⚠️
rocketmq-store/src/ha/flow_monitor.rs 96.51% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3473      +/-   ##
==========================================
+ Coverage   26.59%   26.66%   +0.07%     
==========================================
  Files         545      546       +1     
  Lines       77961    78050      +89     
==========================================
+ Hits        20730    20813      +83     
- Misses      57231    57237       +6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit e7dd366 into main Jun 16, 2025
24 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Jun 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Add FlowMonitor for tracking data transfer metrics and flow control management
3 participants