-
Notifications
You must be signed in to change notification settings - Fork 159
[ISSUE #3472]🚀Add FlowMonitor for tracking data transfer metrics and flow control management✨ #3473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…flow control management✨
WalkthroughA new Changes
Sequence Diagram(s)sequenceDiagram
participant App
participant FlowMonitor
participant FlowMonitorInner
participant Config
App->>FlowMonitor: new(Arc<MessageStoreConfig>)
App->>FlowMonitor: start()
FlowMonitor->>FlowMonitorInner: start service task
loop Every second
FlowMonitorInner->>FlowMonitorInner: calculate_speed()
end
App->>FlowMonitor: shutdown()
FlowMonitor->>FlowMonitorInner: stop service task
Assessment against linked issues
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new FlowMonitor
to track data transfer metrics and enforce HA flow control limits.
- Adds a
flow_monitor
module with a background service task that calculates per-second transfer speeds. - Registers the new module in
ha.rs
. - Adds a default configuration for
max_ha_transfer_byte_in_second
and wires it intoMessageStoreConfig
.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
rocketmq-store/src/ha/flow_monitor.rs | Implements FlowMonitor and its inner service task |
rocketmq-store/src/ha.rs | Exposes the new flow_monitor submodule |
rocketmq-store/src/config/message_store_config.rs | Defines a const default, updates the serde default, and sets Default impl |
Comments suppressed due to low confidence (2)
rocketmq-store/src/ha/flow_monitor.rs:48
- [nitpick] Public APIs like
FlowMonitor
and its methods lack doc comments. Adding Rustdoc fornew
,start
, andshutdown
would improve clarity for consumers.
struct FlowMonitorInner {
rocketmq-store/src/ha/flow_monitor.rs:121
- [nitpick] These async tests don't assert any post-conditions; consider verifying that the service is actually running or that state changes occurred.
async fn flow_monitor_starts_successfully() {
@@ -710,7 +714,7 @@ pub struct MessageStoreConfig { | |||
#[serde(default)] | |||
pub ha_flow_control_enable: bool, | |||
|
|||
#[serde(default)] | |||
#[serde(default = "defaults::topic_queue_lock_num")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The serde default for max_ha_transfer_byte_in_second
is referencing defaults::topic_queue_lock_num
by mistake. It should use defaults::max_ha_transfer_byte_in_second
to match the new const function.
Copilot uses AI. Check for mistakes.
pub async fn start(&self) { | ||
self.server_manager.start().await.unwrap(); | ||
} | ||
|
||
pub async fn shutdown(&self) { | ||
self.server_manager.shutdown().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using unwrap()
in library code: propagate errors or handle them gracefully instead of panicking on service startup failure.
pub async fn start(&self) { | |
self.server_manager.start().await.unwrap(); | |
} | |
pub async fn shutdown(&self) { | |
self.server_manager.shutdown().await.unwrap(); | |
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> { | |
self.server_manager.start().await?; | |
Ok(()) | |
} | |
pub async fn shutdown(&self) -> Result<(), Box<dyn std::error::Error>> { | |
self.server_manager.shutdown().await?; | |
Ok(()) |
Copilot uses AI. Check for mistakes.
pub async fn shutdown(&self) { | ||
self.server_manager.shutdown().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using unwrap()
here as well; consider returning a Result
or logging an error instead of panicking.
pub async fn shutdown(&self) { | |
self.server_manager.shutdown().await.unwrap(); | |
pub async fn shutdown(&self) -> Result<(), Box<dyn std::error::Error>> { | |
match self.server_manager.shutdown().await { | |
Ok(_) => Ok(()), | |
Err(e) => { | |
eprintln!("Error during shutdown: {}", e); | |
Err(Box::new(e)) | |
} | |
} |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
rocketmq-store/src/config/message_store_config.rs (1)
267-270
:defaults::max_ha_transfer_byte_in_second
is never usedYou added a dedicated default provider but the struct field below still references another function. Unless this is consumed elsewhere, the compiler will optimise it away.
rocketmq-store/src/ha/flow_monitor.rs (3)
63-68
: Atomic ordering too relaxed for producer/consumer scenario
calculate_speed()
writes withstore(Relaxed)
then resets the counter with another relaxed store.
Whenadd_byte_count_transferred()
is called from other threads the update-vs-read sequence may be reordered, giving inaccurate speeds. UseAcquire
/Release
pair or at leastOrdering::SeqCst
.
86-88
: Potential counter overflow
fetch_add
will wrap oni64::MAX
. Usesaturating_add
pattern or validate the value before adding if extremely long-running brokers are expected.
120-126
: Unit test leaks background task
flow_monitor_starts_successfully
never callsshutdown()
, leaving a running service behind the test runtime.
Add a shutdown to avoid flakiness:- monitor.start().await; + monitor.start().await; + monitor.shutdown().await;
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-store/src/config/message_store_config.rs
(3 hunks)rocketmq-store/src/ha.rs
(1 hunks)rocketmq-store/src/ha/flow_monitor.rs
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
rocketmq-store/src/ha/flow_monitor.rs (1)
rocketmq-store/src/config/message_store_config.rs (1)
default
(806-994)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: test
- GitHub Check: build
- GitHub Check: auto-approve
🔇 Additional comments (2)
rocketmq-store/src/ha.rs (1)
18-20
: Module declaration looks good
flow_monitor
is correctly registered in theha
module tree.
No further action required.rocketmq-store/src/config/message_store_config.rs (1)
962-965
: Keep struct default and Serde default in sync
Default::default()
setsmax_ha_transfer_byte_in_second
to 100 MB, which mismatches the 32 B/s coming from the Serde attribute above. After applying the previous fix the two code paths will again be consistent.
#[serde(default = "defaults::topic_queue_lock_num")] | ||
pub max_ha_transfer_byte_in_second: usize, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong Serde default provider – causes an invalid limit (32 bytes/s!)
#[serde(default = "defaults::topic_queue_lock_num")]
pub max_ha_transfer_byte_in_second: usize,
topic_queue_lock_num()
returns 32, not 100 MB. Deserialised configs will silently get an unusable 32 B/s cap and immediately throttle replication.
Fix:
- #[serde(default = "defaults::topic_queue_lock_num")]
+ #[serde(default = "defaults::max_ha_transfer_byte_in_second")]
pub max_ha_transfer_byte_in_second: usize,
🤖 Prompt for AI Agents
In rocketmq-store/src/config/message_store_config.rs around lines 717 to 719,
the serde default attribute for max_ha_transfer_byte_in_second incorrectly uses
defaults::topic_queue_lock_num, which returns 32, causing an invalid 32 bytes
per second limit. Change the default provider to a function that returns the
correct default value for max_ha_transfer_byte_in_second, such as a new or
existing function returning 100 MB, to ensure proper deserialization and avoid
unintended throttling.
pub fn can_transfer_max_byte_num(&self) -> i32 { | ||
if self.is_flow_control_enable() { | ||
let max_bytes = self.max_transfer_byte_in_second() as i64; | ||
let current_transferred = self.transferred_byte.load(Ordering::Relaxed); | ||
let res = std::cmp::max(max_bytes - current_transferred, 0); | ||
|
||
if res > i32::MAX as i64 { | ||
i32::MAX | ||
} else { | ||
res as i32 | ||
} | ||
} else { | ||
i32::MAX | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Return type should be usize
, not signed i32
The function computes “how many more bytes can be sent”.
usize
:
- avoids the weird
max( … , 0 )
dance, - matches the config type,
- removes the cast / clamp logic.
- pub fn can_transfer_max_byte_num(&self) -> i32 {
+ pub fn can_transfer_max_byte_num(&self) -> usize {
Adjust callers accordingly.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/flow_monitor.rs around lines 70 to 84, the function
can_transfer_max_byte_num returns an i32 but should return usize to better
represent byte counts. Change the return type to usize, remove the max(..., 0)
check since usize is unsigned, and eliminate the cast and clamping logic. Update
all callers of this function to handle usize instead of i32 accordingly.
pub async fn start(&self) { | ||
self.server_manager.start().await.unwrap(); | ||
} | ||
|
||
pub async fn shutdown(&self) { | ||
self.server_manager.shutdown().await.unwrap(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid .unwrap()
– surface errors to the caller
start()
/ shutdown()
panic on failure, taking the whole broker down. Propagate the error instead:
- pub async fn start(&self) {
- self.server_manager.start().await.unwrap();
+ pub async fn start(&self) -> anyhow::Result<()> {
+ self.server_manager.start().await
}
Do the same for shutdown()
.
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/flow_monitor.rs around lines 39 to 45, the start() and
shutdown() methods use unwrap() on async calls, which causes panics on failure.
Change these methods to return a Result type and propagate errors to the caller
instead of unwrapping. Replace unwrap() with proper error handling by returning
the error from the async calls.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3473 +/- ##
==========================================
+ Coverage 26.59% 26.66% +0.07%
==========================================
Files 545 546 +1
Lines 77961 78050 +89
==========================================
+ Hits 20730 20813 +83
- Misses 57231 57237 +6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #3472
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes