Skip to content

Commit f618545

Browse files
committed
[ISSUE #101]♻️Refactor thread pool
1 parent b93ab40 commit f618545

File tree

1 file changed

+82
-2
lines changed

1 file changed

+82
-2
lines changed

rocketmq-common/src/thread_pool.rs

+82-2
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,15 @@ impl TokioExecutorService {
4747

4848
pub fn new_with_config(
4949
thread_num: usize,
50-
thread_prefix: impl Into<String>,
50+
thread_prefix: Option<impl Into<String>>,
5151
keep_alive: Duration,
5252
max_blocking_threads: usize,
5353
) -> TokioExecutorService {
54-
let thread_prefix_inner = thread_prefix.into();
54+
let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
55+
thread_prefix.into()
56+
} else {
57+
"rocketmq-thread-".to_string()
58+
};
5559
TokioExecutorService {
5660
inner: tokio::runtime::Builder::new_multi_thread()
5761
.worker_threads(thread_num)
@@ -131,3 +135,79 @@ impl FuturesExecutorServiceBuilder {
131135
Ok(FuturesExecutorService { inner: thread_pool })
132136
}
133137
}
138+
139+
pub struct ScheduledExecutorService {
140+
inner: tokio::runtime::Runtime,
141+
}
142+
143+
impl Default for ScheduledExecutorService {
144+
fn default() -> Self {
145+
Self::new()
146+
}
147+
}
148+
impl ScheduledExecutorService {
149+
pub fn new() -> ScheduledExecutorService {
150+
ScheduledExecutorService {
151+
inner: tokio::runtime::Builder::new_multi_thread()
152+
.worker_threads(num_cpus::get())
153+
.enable_all()
154+
.build()
155+
.unwrap(),
156+
}
157+
}
158+
159+
pub fn new_with_config(
160+
thread_num: usize,
161+
thread_prefix: Option<impl Into<String>>,
162+
keep_alive: Duration,
163+
max_blocking_threads: usize,
164+
) -> ScheduledExecutorService {
165+
let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
166+
thread_prefix.into()
167+
} else {
168+
"rocketmq-thread-".to_string()
169+
};
170+
ScheduledExecutorService {
171+
inner: tokio::runtime::Builder::new_multi_thread()
172+
.worker_threads(thread_num)
173+
.thread_keep_alive(keep_alive)
174+
.max_blocking_threads(max_blocking_threads)
175+
.thread_name_fn(move || {
176+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
177+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
178+
format!("{}{}", thread_prefix_inner, id)
179+
})
180+
.enable_all()
181+
.build()
182+
.unwrap(),
183+
}
184+
}
185+
186+
pub fn schedule_at_fixed_rate<F>(&self, mut task: F, initial_delay: Duration, period: Duration)
187+
where
188+
F: FnMut() + Send + 'static,
189+
{
190+
self.inner.spawn(async move {
191+
// initial delay
192+
193+
tokio::time::sleep(initial_delay).await;
194+
195+
let mut last_execution_time = tokio::time::Instant::now();
196+
197+
loop {
198+
// record current execution time
199+
let current_execution_time = tokio::time::Instant::now();
200+
// execute task
201+
task();
202+
// Calculate the time of the next execution
203+
let next_execution_time = last_execution_time + period;
204+
last_execution_time = current_execution_time;
205+
206+
// Wait until the next execution
207+
let delay =
208+
next_execution_time.saturating_duration_since(tokio::time::Instant::now());
209+
tokio::time::sleep(delay).await;
210+
}
211+
});
212+
}
213+
}

0 commit comments

Comments
 (0)