Skip to content

[ISSUE #1254]🚀RocketMQRuntime add schedule_at_fixed_rate_mut method #1255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions rocketmq-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,38 @@
}
}
}

pub fn schedule_at_fixed_rate_mut<F>(
&self,
mut task: F,
initial_delay: Option<Duration>,
period: Duration,
) where
F: FnMut() + Send + 'static,
{
match self {
RocketMQRuntime::Multi(runtime) => {
runtime.handle().spawn(async move {

Check warning on line 106 in rocketmq-runtime/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-runtime/src/lib.rs#L96-L106

Added lines #L96 - L106 were not covered by tests
// initial delay
if let Some(initial_delay_inner) = initial_delay {
tokio::time::sleep(initial_delay_inner).await;
}

Check warning on line 110 in rocketmq-runtime/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-runtime/src/lib.rs#L108-L110

Added lines #L108 - L110 were not covered by tests

loop {
// record current execution time
let current_execution_time = tokio::time::Instant::now();
// execute task
task();
// Calculate the time of the next execution
let next_execution_time = current_execution_time + period;

// Wait until the next execution
let delay = next_execution_time
.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;

Check warning on line 123 in rocketmq-runtime/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-runtime/src/lib.rs#L114-L123

Added lines #L114 - L123 were not covered by tests
}
});
}
}
}

Check warning on line 128 in rocketmq-runtime/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-runtime/src/lib.rs#L125-L128

Added lines #L125 - L128 were not covered by tests
Comment on lines +96 to +128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding error handling and task cancellation.

The current implementation could be more robust:

  1. Task panics aren't handled and could crash the runtime
  2. No mechanism to stop/cancel the scheduled task
  3. Potential for timing drift over long periods

Consider this enhanced implementation:

     pub fn schedule_at_fixed_rate_mut<F>(
         &self,
         mut task: F,
         initial_delay: Option<Duration>,
         period: Duration,
     ) where
-        F: FnMut() + Send + 'static,
+        F: FnMut() -> Result<(), Box<dyn std::error::Error>> + Send + 'static,
     {
         match self {
             RocketMQRuntime::Multi(runtime) => {
-                runtime.handle().spawn(async move {
+                let handle = runtime.handle().spawn(async move {
                     // initial delay
                     if let Some(initial_delay_inner) = initial_delay {
                         tokio::time::sleep(initial_delay_inner).await;
                     }
 
                     loop {
                         // record current execution time
                         let current_execution_time = tokio::time::Instant::now();
                         // execute task
-                        task();
+                        if let Err(e) = task() {
+                            log::error!("Scheduled task failed: {}", e);
+                            break;
+                        }
                         // Calculate the time of the next execution
                         let next_execution_time = current_execution_time + period;
 
                         // Wait until the next execution
                         let delay = next_execution_time
                             .saturating_duration_since(tokio::time::Instant::now());
                         tokio::time::sleep(delay).await;
                     }
                 });
+                Ok(handle)
             }
         }
     }

Committable suggestion skipped: line range outside the PR's diff.

}
Loading