-
Notifications
You must be signed in to change notification settings - Fork 144
[ISSUE #1254]🚀RocketMQRuntime add schedule_at_fixed_rate_mut method #1255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes introduce a new method, Changes
Assessment against linked issues
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
rocketmq-runtime/src/lib.rs (2)
96-128
: Consider adding timing drift compensation.For better timing accuracy over long periods, consider compensating for task execution time:
loop { // record current execution time let current_execution_time = tokio::time::Instant::now(); // execute task task(); // Calculate the time of the next execution - let next_execution_time = current_execution_time + period; + let execution_duration = current_execution_time.elapsed(); + let next_execution_time = current_execution_time + period; // Wait until the next execution let delay = next_execution_time - .saturating_duration_since(tokio::time::Instant::now()); + .saturating_duration_since(tokio::time::Instant::now()) + .saturating_sub(execution_duration); tokio::time::sleep(delay).await; }
96-128
: Consider enhancing the scheduler architecture.The current implementation could benefit from some architectural improvements:
- Task metrics collection (execution time, success/failure counts)
- Rate limiting or backoff strategies for failing tasks
- Task coordination and graceful shutdown
Consider introducing a ScheduledTask trait or struct to encapsulate these concerns:
pub struct ScheduledTask<F> { task: F, period: Duration, metrics: TaskMetrics, last_execution: Option<Instant>, } impl<F> ScheduledTask<F> { pub fn new(task: F, period: Duration) -> Self { // Implementation details } pub fn with_backoff(mut self, strategy: BackoffStrategy) -> Self { // Implementation details } pub fn metrics(&self) -> &TaskMetrics { // Implementation details } }This would provide a more maintainable and feature-rich foundation for scheduled tasks.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
rocketmq-runtime/src/lib.rs
(1 hunks)
🔇 Additional comments (1)
rocketmq-runtime/src/lib.rs (1)
96-128
: Implementation looks correct but could be enhanced.
The new schedule_at_fixed_rate_mut
method correctly implements mutable task scheduling with fixed rate execution.
pub fn schedule_at_fixed_rate_mut<F>( | ||
&self, | ||
mut task: F, | ||
initial_delay: Option<Duration>, | ||
period: Duration, | ||
) where | ||
F: FnMut() + Send + 'static, | ||
{ | ||
match self { | ||
RocketMQRuntime::Multi(runtime) => { | ||
runtime.handle().spawn(async move { | ||
// initial delay | ||
if let Some(initial_delay_inner) = initial_delay { | ||
tokio::time::sleep(initial_delay_inner).await; | ||
} | ||
|
||
loop { | ||
// record current execution time | ||
let current_execution_time = tokio::time::Instant::now(); | ||
// execute task | ||
task(); | ||
// Calculate the time of the next execution | ||
let next_execution_time = current_execution_time + period; | ||
|
||
// Wait until the next execution | ||
let delay = next_execution_time | ||
.saturating_duration_since(tokio::time::Instant::now()); | ||
tokio::time::sleep(delay).await; | ||
} | ||
}); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding error handling and task cancellation.
The current implementation could be more robust:
- Task panics aren't handled and could crash the runtime
- No mechanism to stop/cancel the scheduled task
- Potential for timing drift over long periods
Consider this enhanced implementation:
pub fn schedule_at_fixed_rate_mut<F>(
&self,
mut task: F,
initial_delay: Option<Duration>,
period: Duration,
) where
- F: FnMut() + Send + 'static,
+ F: FnMut() -> Result<(), Box<dyn std::error::Error>> + Send + 'static,
{
match self {
RocketMQRuntime::Multi(runtime) => {
- runtime.handle().spawn(async move {
+ let handle = runtime.handle().spawn(async move {
// initial delay
if let Some(initial_delay_inner) = initial_delay {
tokio::time::sleep(initial_delay_inner).await;
}
loop {
// record current execution time
let current_execution_time = tokio::time::Instant::now();
// execute task
- task();
+ if let Err(e) = task() {
+ log::error!("Scheduled task failed: {}", e);
+ break;
+ }
// Calculate the time of the next execution
let next_execution_time = current_execution_time + period;
// Wait until the next execution
let delay = next_execution_time
.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;
}
});
+ Ok(handle)
}
}
}
Committable suggestion skipped: line range outside the PR's diff.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1255 +/- ##
==========================================
- Coverage 18.26% 18.25% -0.01%
==========================================
Files 428 428
Lines 53581 53609 +28
==========================================
Hits 9789 9789
- Misses 43792 43820 +28 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
Which Issue(s) This PR Fixes(Closes)
Fixes #1254
Brief Description
How Did You Test This Change?
Summary by CodeRabbit