From a85fa0d7952605d7dd6309e10ceae84ceadba1bd Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 21 Nov 2024 03:02:00 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1254]=F0=9F=9A=80RocketMQRuntime=20ad?= =?UTF-8?q?d=20schedule=5Fat=5Ffixed=5Frate=5Fmut=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-runtime/src/lib.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/rocketmq-runtime/src/lib.rs b/rocketmq-runtime/src/lib.rs index e7232d81..63152f24 100644 --- a/rocketmq-runtime/src/lib.rs +++ b/rocketmq-runtime/src/lib.rs @@ -92,4 +92,38 @@ impl RocketMQRuntime { } } } + + pub fn schedule_at_fixed_rate_mut( + &self, + mut task: F, + initial_delay: Option, + period: Duration, + ) where + F: FnMut() + Send + 'static, + { + match self { + RocketMQRuntime::Multi(runtime) => { + runtime.handle().spawn(async move { + // initial delay + if let Some(initial_delay_inner) = initial_delay { + tokio::time::sleep(initial_delay_inner).await; + } + + loop { + // record current execution time + let current_execution_time = tokio::time::Instant::now(); + // execute task + task(); + // Calculate the time of the next execution + let next_execution_time = current_execution_time + period; + + // Wait until the next execution + let delay = next_execution_time + .saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(delay).await; + } + }); + } + } + } }