Skip to content

Commit 425114b

Browse files
authored
[ISSUE #417]✨Implement message persistence to disk during transmission🚀 (#418)
1 parent f3a1058 commit 425114b

17 files changed

+854
-113
lines changed

rocketmq-common/Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,7 @@ log = "0.4.20"
5151

5252
parking_lot = { workspace = true }
5353
once_cell = { workspace = true }
54-
tempfile = "3.10.1"
54+
tempfile = "3.10.1"
55+
trait-variant.workspace = true
56+
[dev-dependencies]
57+
mockall = "0.12.1"

rocketmq-common/src/common/thread.rs

+25-1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,28 @@
1515
* limitations under the License.
1616
*/
1717

18-
pub mod thread_service;
18+
use std::any::Any;
19+
20+
use crate::common::thread::thread_service_std::ServiceThreadStd;
21+
22+
pub mod thread_service_std;
23+
pub mod thread_service_tokio;
24+
25+
pub trait ServiceThread {
26+
fn start(&mut self);
27+
fn shutdown(&mut self);
28+
fn make_stop(&mut self);
29+
fn wakeup(&mut self);
30+
fn wait_for_running(&mut self, interval: i64);
31+
fn is_stopped(&self) -> bool;
32+
fn get_service_name(&self) -> String;
33+
}
34+
35+
pub trait Runnable: Send + Sync + 'static {
36+
fn run(&mut self) {}
37+
}
38+
39+
#[trait_variant::make(TokioRunnable: Send)]
40+
pub trait RocketMQTokioRunnable: Sync + 'static {
41+
async fn run(&mut self);
42+
}

rocketmq-common/src/common/thread/thread_service.rs

-95
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::{
18+
sync::{
19+
atomic::{AtomicBool, Ordering},
20+
Arc,
21+
},
22+
thread::JoinHandle,
23+
};
24+
25+
use parking_lot::Mutex;
26+
use tracing::info;
27+
28+
use crate::common::thread::{Runnable, ServiceThread};
29+
30+
pub struct ServiceThreadStd {
31+
name: String,
32+
runnable: Arc<Mutex<dyn Runnable>>,
33+
thread: Option<JoinHandle<()>>,
34+
stopped: Arc<AtomicBool>,
35+
started: Arc<AtomicBool>,
36+
notified: (parking_lot::Mutex<()>, parking_lot::Condvar),
37+
}
38+
39+
impl ServiceThreadStd {
40+
pub fn new<T: Runnable>(name: String, runnable: T) -> Self {
41+
ServiceThreadStd {
42+
name,
43+
runnable: Arc::new(Mutex::new(runnable)),
44+
thread: None,
45+
stopped: Arc::new(AtomicBool::new(false)),
46+
started: Arc::new(AtomicBool::new(false)),
47+
notified: (Default::default(), Default::default()),
48+
}
49+
}
50+
}
51+
52+
impl ServiceThreadStd {
53+
pub fn start(&mut self) {
54+
if let Ok(value) =
55+
self.started
56+
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
57+
{
58+
if value {
59+
return;
60+
}
61+
} else {
62+
return;
63+
}
64+
let name = self.name.clone();
65+
let runnable = self.runnable.clone();
66+
let stopped = self.stopped.clone();
67+
let thread = std::thread::Builder::new()
68+
.name(name.clone())
69+
.spawn(move || {
70+
log::info!("Starting service thread: {}", name);
71+
if stopped.load(std::sync::atomic::Ordering::Relaxed) {
72+
log::info!("Service thread stopped: {}", name);
73+
return;
74+
}
75+
runnable.lock().run();
76+
})
77+
.expect("Failed to start service thread");
78+
self.thread = Some(thread);
79+
}
80+
81+
pub fn shutdown(&mut self) {
82+
self.shutdown_interrupt(false);
83+
}
84+
85+
pub fn shutdown_interrupt(&mut self, interrupt: bool) {
86+
if let Ok(value) =
87+
self.started
88+
.compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
89+
{
90+
if !value {
91+
return;
92+
}
93+
} else {
94+
return;
95+
}
96+
self.stopped.store(true, Ordering::Relaxed);
97+
self.wakeup();
98+
if let Some(thread) = self.thread.take() {
99+
if interrupt {
100+
drop(thread);
101+
} else {
102+
thread.join().expect("Failed to join service thread");
103+
}
104+
}
105+
}
106+
107+
pub fn make_stop(&mut self) {
108+
if !self.started.load(Ordering::Acquire) {
109+
return;
110+
}
111+
self.stopped.store(true, Ordering::Release);
112+
}
113+
114+
pub fn wakeup(&mut self) {
115+
self.notified.1.notify_all();
116+
}
117+
118+
pub fn wait_for_running(&mut self, interval: i64) {
119+
let mut guard = self.notified.0.lock();
120+
self.notified.1.wait_for(
121+
&mut guard,
122+
std::time::Duration::from_millis(interval as u64),
123+
);
124+
}
125+
126+
pub fn is_stopped(&self) -> bool {
127+
self.stopped.load(Ordering::Acquire)
128+
}
129+
130+
pub fn get_service_name(&self) -> String {
131+
self.name.clone()
132+
}
133+
}
134+
135+
#[cfg(test)]
136+
mod tests {
137+
/*use mockall::{automock, predicate::*};
138+
139+
use super::*;
140+
141+
struct MockTestRunnable;
142+
impl MockTestRunnable {
143+
fn new() -> MockTestRunnable {
144+
MockTestRunnable
145+
}
146+
}
147+
impl Runnable for MockTestRunnable {
148+
fn run(&mut self, service_thread: &dyn ServiceThread) {}
149+
}
150+
151+
#[test]
152+
fn test_start_and_shutdown() {
153+
let mock_runnable = MockTestRunnable::new();
154+
155+
let mut service_thread =
156+
ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
157+
158+
service_thread.start();
159+
assert!(service_thread.started.load(Ordering::SeqCst));
160+
assert!(!service_thread.stopped.load(Ordering::SeqCst));
161+
162+
service_thread.shutdown_interrupt(false);
163+
assert!(!service_thread.started.load(Ordering::SeqCst));
164+
assert!(service_thread.stopped.load(Ordering::SeqCst));
165+
}
166+
167+
#[test]
168+
fn test_make_stop() {
169+
let mock_runnable = MockTestRunnable::new();
170+
let mut service_thread =
171+
ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
172+
173+
service_thread.start();
174+
service_thread.make_stop();
175+
assert!(service_thread.is_stopped());
176+
}
177+
178+
#[test]
179+
fn test_wait_for_running() {
180+
let mock_runnable = MockTestRunnable::new();
181+
let mut service_thread =
182+
ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
183+
184+
service_thread.start();
185+
service_thread.wait_for_running(100);
186+
assert!(service_thread.started.load(Ordering::SeqCst));
187+
}
188+
189+
#[test]
190+
fn test_wakeup() {
191+
let mock_runnable = MockTestRunnable::new();
192+
let mut service_thread =
193+
ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
194+
195+
service_thread.start();
196+
service_thread.wakeup();
197+
// We expect that the wakeup method is called successfully.
198+
}*/
199+
}

0 commit comments

Comments
 (0)