Skip to content

Commit c1cf8a8

Browse files
committed
Feature: add Raft::get_snapshot() to get the last snapshot from state machine
1 parent 343b27e commit c1cf8a8

File tree

11 files changed

+122
-25
lines changed

11 files changed

+122
-25
lines changed

openraft/src/core/raft_core.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1183,6 +1183,13 @@ where
11831183
self.send_heartbeat("ExternalCommand");
11841184
}
11851185
ExternalCommand::Snapshot => self.trigger_snapshot(),
1186+
ExternalCommand::GetSnapshot { tx } => {
1187+
let cmd = sm::Command::get_snapshot(tx);
1188+
let res = self.sm_handle.send(cmd);
1189+
if let Err(e) = res {
1190+
tracing::error!(error = display(e), "error sending GetSnapshot to sm worker");
1191+
}
1192+
}
11861193
ExternalCommand::PurgeLog { upto } => {
11871194
self.engine.trigger_purge_log(upto);
11881195
}

openraft/src/core/raft_msg/external_command.rs

+25-6
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
33
use std::fmt;
44

5+
use crate::core::raft_msg::ResultSender;
6+
use crate::RaftTypeConfig;
7+
use crate::Snapshot;
8+
59
/// Application-triggered Raft actions for testing and administration.
610
///
711
/// Typically, openraft handles actions automatically.
812
///
913
/// An application can also disable these policy-based triggering and use these commands manually,
1014
/// for testing or administrative purpose.
11-
#[derive(Debug, Clone)]
12-
pub(crate) enum ExternalCommand {
15+
pub(crate) enum ExternalCommand<C: RaftTypeConfig> {
1316
/// Initiate an election at once.
1417
Elect,
1518

@@ -19,6 +22,9 @@ pub(crate) enum ExternalCommand {
1922
/// Initiate to build a snapshot on this node.
2023
Snapshot,
2124

25+
/// Get a snapshot from the state machine, send back via a oneshot::Sender.
26+
GetSnapshot { tx: ResultSender<Option<Snapshot<C>>> },
27+
2228
/// Purge logs covered by a snapshot up to a specified index.
2329
///
2430
/// Openraft respects the [`max_in_snapshot_log_to_keep`] config when purging.
@@ -27,17 +33,30 @@ pub(crate) enum ExternalCommand {
2733
PurgeLog { upto: u64 },
2834
}
2935

30-
impl fmt::Display for ExternalCommand {
36+
impl<C> fmt::Debug for ExternalCommand<C>
37+
where C: RaftTypeConfig
38+
{
39+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40+
fmt::Display::fmt(self, f)
41+
}
42+
}
43+
44+
impl<C> fmt::Display for ExternalCommand<C>
45+
where C: RaftTypeConfig
46+
{
3147
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3248
match self {
3349
ExternalCommand::Elect => {
34-
write!(f, "{:?}", self)
50+
write!(f, "Elect")
3551
}
3652
ExternalCommand::Heartbeat => {
37-
write!(f, "{:?}", self)
53+
write!(f, "Heartbeat")
3854
}
3955
ExternalCommand::Snapshot => {
40-
write!(f, "{:?}", self)
56+
write!(f, "Snapshot")
57+
}
58+
ExternalCommand::GetSnapshot { .. } => {
59+
write!(f, "GetSnapshot")
4160
}
4261
ExternalCommand::PurgeLog { upto } => {
4362
write!(f, "PurgeLog[..={}]", upto)

openraft/src/core/raft_msg/mod.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,18 @@ use crate::RaftTypeConfig;
2626
pub(crate) mod external_command;
2727

2828
/// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`.
29-
pub(crate) type ResultSender<T, E> = oneshot::Sender<Result<T, E>>;
29+
pub(crate) type ResultSender<T, E = Infallible> = oneshot::Sender<Result<T, E>>;
30+
31+
pub(crate) type ResultReceiver<T, E = Infallible> = oneshot::Receiver<Result<T, E>>;
3032

3133
/// TX for Install Snapshot Response
3234
pub(crate) type InstallSnapshotTx<NID> = ResultSender<InstallSnapshotResponse<NID>, InstallSnapshotError>;
3335

3436
/// TX for Vote Response
35-
pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>, Infallible>;
37+
pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>>;
3638

3739
/// TX for Append Entries Response
38-
pub(crate) type AppendEntriesTx<NID> = ResultSender<AppendEntriesResponse<NID>, Infallible>;
40+
pub(crate) type AppendEntriesTx<NID> = ResultSender<AppendEntriesResponse<NID>>;
3941

4042
/// TX for Client Write Response
4143
pub(crate) type ClientWriteTx<C> = ResultSender<ClientWriteResponse<C>, ClientWriteError<NodeIdOf<C>, NodeOf<C>>>;
@@ -94,7 +96,7 @@ where C: RaftTypeConfig
9496
},
9597

9698
ExternalCommand {
97-
cmd: ExternalCommand,
99+
cmd: ExternalCommand<C>,
98100
},
99101
}
100102

openraft/src/core/sm/command.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use std::fmt::Debug;
22
use std::fmt::Formatter;
33

4-
use tokio::sync::oneshot;
5-
4+
use crate::core::raft_msg::ResultSender;
65
use crate::display_ext::DisplaySlice;
76
use crate::log_id::RaftLogId;
87
use crate::raft::InstallSnapshotRequest;
@@ -56,7 +55,7 @@ where C: RaftTypeConfig
5655
Command::new(payload)
5756
}
5857

59-
pub(crate) fn get_snapshot(tx: oneshot::Sender<Option<Snapshot<C>>>) -> Self {
58+
pub(crate) fn get_snapshot(tx: ResultSender<Option<Snapshot<C>>>) -> Self {
6059
let payload = CommandPayload::GetSnapshot { tx };
6160
Command::new(payload)
6261
}
@@ -104,7 +103,7 @@ where C: RaftTypeConfig
104103
BuildSnapshot,
105104

106105
/// Get the latest built snapshot.
107-
GetSnapshot { tx: oneshot::Sender<Option<Snapshot<C>>> },
106+
GetSnapshot { tx: ResultSender<Option<Snapshot<C>>> },
108107

109108
/// Receive a chunk of snapshot.
110109
///

openraft/src/core/sm/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
77
use tokio::io::AsyncWriteExt;
88
use tokio::sync::mpsc;
9-
use tokio::sync::oneshot;
109

1110
use crate::core::snapshot_state::SnapshotRequestId;
1211
use crate::core::streaming_state::Streaming;
@@ -35,6 +34,7 @@ pub(crate) use response::CommandResult;
3534
pub(crate) use response::Response;
3635

3736
use crate::core::notify::Notify;
37+
use crate::core::raft_msg::ResultSender;
3838

3939
/// State machine worker handle for sending command to it.
4040
pub(crate) struct Handle<C>
@@ -242,7 +242,7 @@ where
242242
}
243243

244244
#[tracing::instrument(level = "info", skip_all)]
245-
async fn get_snapshot(&mut self, tx: oneshot::Sender<Option<Snapshot<C>>>) -> Result<(), StorageError<C::NodeId>> {
245+
async fn get_snapshot(&mut self, tx: ResultSender<Option<Snapshot<C>>>) -> Result<(), StorageError<C::NodeId>> {
246246
tracing::info!("{}", func_name!());
247247

248248
let snapshot = self.state_machine.get_current_snapshot().await?;
@@ -251,7 +251,7 @@ where
251251
"sending back snapshot: meta: {:?}",
252252
snapshot.as_ref().map(|s| s.meta.summary())
253253
);
254-
let _ = tx.send(snapshot);
254+
let _ = tx.send(Ok(snapshot));
255255
Ok(())
256256
}
257257

openraft/src/raft/mod.rs

+15
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use tracing::Level;
3737
use crate::config::Config;
3838
use crate::config::RuntimeConfig;
3939
use crate::core::command_state::CommandState;
40+
use crate::core::raft_msg::external_command::ExternalCommand;
4041
use crate::core::raft_msg::RaftMsg;
4142
use crate::core::replication_lag;
4243
use crate::core::sm;
@@ -69,6 +70,7 @@ use crate::LogIdOptionExt;
6970
use crate::MessageSummary;
7071
use crate::RaftState;
7172
pub use crate::RaftTypeConfig;
73+
use crate::Snapshot;
7274
use crate::StorageHelper;
7375

7476
/// Define types for a Raft type configuration.
@@ -346,6 +348,19 @@ where C: RaftTypeConfig
346348
self.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await
347349
}
348350

351+
/// Get the latest snapshot from the state machine.
352+
///
353+
/// It returns error only when `RaftCore` fails to serve the request, e.g., Encountering a
354+
/// storage error or shutting down.
355+
#[tracing::instrument(level = "debug", skip_all)]
356+
pub async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, RaftError<C::NodeId>> {
357+
tracing::debug!("Raft::get_snapshot()");
358+
359+
let (tx, rx) = oneshot::channel();
360+
let cmd = ExternalCommand::GetSnapshot { tx };
361+
self.call_core(RaftMsg::ExternalCommand { cmd }, rx).await
362+
}
363+
349364
/// Submit an InstallSnapshot RPC to this Raft node.
350365
///
351366
/// These RPCs are sent by the cluster leader in order to bring a new node or a slow node

openraft/src/raft/raft_inner.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ where C: RaftTypeConfig
4747
/// It returns at once.
4848
pub(in crate::raft) async fn send_external_command(
4949
&self,
50-
cmd: ExternalCommand,
50+
cmd: ExternalCommand<C>,
5151
cmd_desc: impl fmt::Display + Default,
5252
) -> Result<(), Fatal<C::NodeId>> {
5353
let send_res = self.tx_api.send(RaftMsg::ExternalCommand { cmd });

openraft/src/replication/mod.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ use tokio::io::AsyncReadExt;
2121
use tokio::io::AsyncSeekExt;
2222
use tokio::select;
2323
use tokio::sync::mpsc;
24-
use tokio::sync::oneshot;
2524
use tracing_futures::Instrument;
2625

2726
use crate::config::Config;
2827
use crate::core::notify::Notify;
28+
use crate::core::raft_msg::ResultReceiver;
2929
use crate::display_ext::DisplayOption;
3030
use crate::display_ext::DisplayOptionExt;
3131
use crate::error::HigherVote;
@@ -684,7 +684,7 @@ where
684684
#[tracing::instrument(level = "info", skip_all)]
685685
async fn stream_snapshot(
686686
&mut self,
687-
snapshot_rx: DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>,
687+
snapshot_rx: DataWithId<ResultReceiver<Option<Snapshot<C>>>>,
688688
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
689689
let request_id = snapshot_rx.request_id();
690690
let rx = snapshot_rx.into_data();
@@ -696,6 +696,9 @@ where
696696
StorageError::IO { source: io_err }
697697
})?;
698698

699+
// Safe unwrap(): the error is Infallible, so it is safe to unwrap.
700+
let snapshot = snapshot.unwrap();
701+
699702
tracing::info!(
700703
"received snapshot: request_id={}; meta:{}",
701704
request_id.display(),

openraft/src/replication/request.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ where C: RaftTypeConfig
2121
Self::Data(Data::new_logs(id, log_id_range))
2222
}
2323

24-
pub(crate) fn snapshot(id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
24+
pub(crate) fn snapshot(id: Option<u64>, snapshot_rx: ResultReceiver<Option<Snapshot<C>>>) -> Self {
2525
Self::Data(Data::new_snapshot(id, snapshot_rx))
2626
}
2727
}
@@ -42,8 +42,7 @@ where C: RaftTypeConfig
4242
}
4343
}
4444

45-
use tokio::sync::oneshot;
46-
45+
use crate::core::raft_msg::ResultReceiver;
4746
use crate::display_ext::DisplayOptionExt;
4847
use crate::log_id_range::LogIdRange;
4948
use crate::LogId;
@@ -60,7 +59,7 @@ pub(crate) enum Data<C>
6059
where C: RaftTypeConfig
6160
{
6261
Logs(DataWithId<LogIdRange<C::NodeId>>),
63-
Snapshot(DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>),
62+
Snapshot(DataWithId<ResultReceiver<Option<Snapshot<C>>>>),
6463
}
6564

6665
impl<C> fmt::Debug for Data<C>
@@ -111,7 +110,7 @@ where C: RaftTypeConfig
111110
Self::Logs(DataWithId::new(request_id, log_id_range))
112111
}
113112

114-
pub(crate) fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
113+
pub(crate) fn new_snapshot(request_id: Option<u64>, snapshot_rx: ResultReceiver<Option<Snapshot<C>>>) -> Self {
115114
Self::Snapshot(DataWithId::new(request_id, snapshot_rx))
116115
}
117116

tests/tests/client_api/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod fixtures;
1010
mod t10_client_writes;
1111
mod t11_client_reads;
1212
mod t12_trigger_purge_log;
13+
mod t13_get_snapshot;
1314
mod t13_trigger_snapshot;
1415
mod t16_with_raft_state;
1516
mod t50_lagging_network_write;
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
4+
use maplit::btreeset;
5+
use openraft::testing::log_id;
6+
use openraft::Config;
7+
8+
use crate::fixtures::init_default_ut_tracing;
9+
use crate::fixtures::RaftRouter;
10+
11+
/// Get snapshot with `Raft::get_snapshot()`
12+
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
13+
async fn get_snapshot() -> anyhow::Result<()> {
14+
let config = Arc::new(
15+
Config {
16+
enable_heartbeat: false,
17+
..Default::default()
18+
}
19+
.validate()?,
20+
);
21+
22+
let mut router = RaftRouter::new(config.clone());
23+
24+
tracing::info!("--- initializing cluster");
25+
let log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?;
26+
27+
tracing::info!(log_index, "--- get None snapshot for node-1");
28+
{
29+
let n1 = router.get_raft_handle(&1)?;
30+
31+
let curr_snap = n1.get_snapshot().await?;
32+
assert!(curr_snap.is_none());
33+
}
34+
35+
tracing::info!(log_index, "--- trigger and get snapshot for node-1");
36+
{
37+
let n1 = router.get_raft_handle(&1)?;
38+
n1.trigger().snapshot().await?;
39+
40+
router.wait(&1, timeout()).snapshot(log_id(1, 0, log_index), "node-1 snapshot").await?;
41+
42+
let curr_snap = n1.get_snapshot().await?;
43+
let snap = curr_snap.unwrap();
44+
assert_eq!(snap.meta.last_log_id, Some(log_id(1, 0, log_index)));
45+
}
46+
47+
Ok(())
48+
}
49+
50+
fn timeout() -> Option<Duration> {
51+
Some(Duration::from_millis(1_000))
52+
}

0 commit comments

Comments
 (0)