Skip to content

Commit 4ea66ac

Browse files
committed
Fix: stop tick task when shutting down Raft
Signed-off-by: Matthias Wahl <[email protected]>
1 parent 4cdd59f commit 4ea66ac

File tree

2 files changed

+23
-23
lines changed

2 files changed

+23
-23
lines changed

openraft/src/core/tick.rs

+21-15
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ use std::sync::Arc;
66
use std::time::Duration;
77

88
use tokio::sync::mpsc;
9+
use tokio::task::JoinHandle;
910
use tokio::time::sleep_until;
1011
use tokio::time::Instant;
12+
use tracing::Instrument;
13+
use tracing::Level;
14+
use tracing::Span;
1115

1216
use crate::raft::RaftMsg;
1317
use crate::NodeId;
@@ -55,11 +59,12 @@ where
5559
tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>,
5660

5761
/// Emit event or not
58-
running: Arc<AtomicBool>,
62+
enabled: Arc<AtomicBool>,
5963
}
6064

6165
pub(crate) struct TickHandle {
62-
running: Arc<AtomicBool>,
66+
enabled: Arc<AtomicBool>,
67+
join_handle: JoinHandle<()>,
6368
}
6469

6570
impl<C, N, S> Tick<C, N, S>
@@ -68,12 +73,16 @@ where
6873
N: RaftNetworkFactory<C>,
6974
S: RaftStorage<C>,
7075
{
71-
pub(crate) fn new(interval: Duration, tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>, enabled: bool) -> Self {
72-
Tick {
76+
pub(crate) fn spawn(interval: Duration, tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>, enabled: bool) -> TickHandle {
77+
let enabled = Arc::new(AtomicBool::from(enabled));
78+
let this = Self {
7379
interval,
74-
running: Arc::new(AtomicBool::from(enabled)),
80+
enabled: enabled.clone(),
7581
tx,
76-
}
82+
};
83+
let join_handle =
84+
tokio::spawn(this.tick_loop().instrument(tracing::span!(parent: &Span::current(), Level::DEBUG, "tick")));
85+
TickHandle { enabled, join_handle }
7786
}
7887

7988
pub(crate) async fn tick_loop(self) {
@@ -84,7 +93,7 @@ where
8493
let at = Instant::now() + self.interval;
8594
sleep_until(at).await;
8695

87-
if !self.running.load(Ordering::Relaxed) {
96+
if !self.enabled.load(Ordering::Relaxed) {
8897
i -= 1;
8998
continue;
9099
}
@@ -97,17 +106,14 @@ where
97106
}
98107
}
99108
}
100-
101-
/// Return a handle to control the ticker.
102-
pub(crate) fn get_handle(&self) -> TickHandle {
103-
TickHandle {
104-
running: self.running.clone(),
105-
}
106-
}
107109
}
108110

109111
impl TickHandle {
110112
pub(crate) fn enable(&self, enabled: bool) {
111-
self.running.store(enabled, Ordering::Relaxed);
113+
self.enabled.store(enabled, Ordering::Relaxed);
114+
}
115+
116+
pub(crate) async fn shutdown(&self) {
117+
self.join_handle.abort();
112118
}
113119
}

openraft/src/raft.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ use tokio::sync::watch;
1313
use tokio::sync::Mutex;
1414
use tokio::task::JoinError;
1515
use tokio::task::JoinHandle;
16-
use tracing::Instrument;
1716
use tracing::Level;
18-
use tracing::Span;
1917

2018
use crate::config::Config;
2119
use crate::config::RuntimeConfig;
@@ -196,16 +194,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
196194
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
197195
let (tx_shutdown, rx_shutdown) = oneshot::channel();
198196

199-
let tick = Tick::new(
197+
let tick_handle = Tick::spawn(
200198
Duration::from_millis(config.heartbeat_interval * 3 / 2),
201199
tx_api.clone(),
202200
config.enable_tick,
203201
);
204202

205-
let tick_handle = tick.get_handle();
206-
let _tick_join_handle =
207-
tokio::spawn(tick.tick_loop().instrument(tracing::span!(parent: &Span::current(), Level::DEBUG, "tick")));
208-
209203
let runtime_config = Arc::new(RuntimeConfig::new(&config));
210204

211205
let core_handle = RaftCore::spawn(
@@ -771,8 +765,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
771765
let send_res = tx.send(());
772766
tracing::info!("sending shutdown signal to RaftCore, sending res: {:?}", send_res);
773767
}
774-
775768
self.join_core_task().await;
769+
self.inner.tick_handle.shutdown().await;
776770

777771
// TODO(xp): API change: replace `JoinError` with `Fatal`,
778772
// to let the caller know the return value of RaftCore task.

0 commit comments

Comments
 (0)