Skip to content

Commit 5631ecf

Browse files
committed
Feature: Add Raft::begin_receiving_snapshot()
`Raft::begin_receiving_snapshot()` request the state machine to return a `SnapshotData` for receiving snapshot from the leader. Internally it calls `RaftStateMachine::begin_receiving_snapshot()` Handling snapshot receiving is moved out of state-machine worker task. Now it is in implemented outside the `RaftCore`. Receiving snapshot could be totally application specific and should not be part of Openraft. The in sm-worker snapshot receiving is removed. - Part of #606
1 parent c805b29 commit 5631ecf

File tree

18 files changed

+370
-614
lines changed

18 files changed

+370
-614
lines changed

openraft/src/core/raft_core.rs

+2-28
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use crate::core::raft_msg::external_command::ExternalCommand;
3030
use crate::core::raft_msg::AppendEntriesTx;
3131
use crate::core::raft_msg::ClientReadTx;
3232
use crate::core::raft_msg::ClientWriteTx;
33-
use crate::core::raft_msg::InstallSnapshotTx;
3433
use crate::core::raft_msg::RaftMsg;
3534
use crate::core::raft_msg::ResultSender;
3635
use crate::core::raft_msg::VoteTx;
@@ -69,7 +68,6 @@ use crate::quorum::QuorumSet;
6968
use crate::raft::AppendEntriesRequest;
7069
use crate::raft::AppendEntriesResponse;
7170
use crate::raft::ClientWriteResponse;
72-
use crate::raft::InstallSnapshotRequest;
7371
use crate::raft::VoteRequest;
7472
use crate::raft_state::LogStateReader;
7573
use crate::replication;
@@ -607,21 +605,6 @@ where
607605
});
608606
}
609607

610-
/// Invoked by leader to send chunks of a snapshot to a follower.
611-
///
612-
/// Leaders always send chunks in order. It is important to note that, according to the Raft
613-
/// spec, a node may only have one snapshot at any time. As snapshot contents are application
614-
/// specific.
615-
#[tracing::instrument(level = "debug", skip_all)]
616-
pub(crate) fn handle_install_snapshot_request(
617-
&mut self,
618-
req: InstallSnapshotRequest<C>,
619-
tx: InstallSnapshotTx<C::NodeId>,
620-
) {
621-
tracing::info!(req = display(req.summary()), "{}", func_name!());
622-
self.engine.handle_install_snapshot(req, tx);
623-
}
624-
625608
/// Trigger a snapshot building(log compaction) job if there is no pending building job.
626609
#[tracing::instrument(level = "debug", skip(self))]
627610
pub(crate) fn trigger_snapshot(&mut self) {
@@ -1125,14 +1108,8 @@ where
11251108

11261109
self.handle_vote_request(rpc, tx);
11271110
}
1128-
RaftMsg::InstallSnapshot { rpc, tx } => {
1129-
tracing::info!(
1130-
req = display(rpc.summary()),
1131-
"received RaftMst::InstallSnapshot: {}",
1132-
func_name!()
1133-
);
1134-
1135-
self.handle_install_snapshot_request(rpc, tx);
1111+
RaftMsg::BeginReceivingSnapshot { vote, tx } => {
1112+
self.engine.handle_begin_receiving_snapshot(vote, tx);
11361113
}
11371114
RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx } => {
11381115
self.engine.handle_install_complete_snapshot(vote, snapshot, tx);
@@ -1375,9 +1352,6 @@ where
13751352
let st = self.engine.state.io_state_mut();
13761353
st.update_snapshot(last_log_id);
13771354
}
1378-
sm::Response::ReceiveSnapshotChunk(_) => {
1379-
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());
1380-
}
13811355
sm::Response::InstallSnapshot(meta) => {
13821356
tracing::info!(
13831357
"sm::StateMachine command done: InstallSnapshot: {}: {}",

openraft/src/core/raft_msg/mod.rs

+14-12
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ use tokio::sync::oneshot;
55
use crate::core::raft_msg::external_command::ExternalCommand;
66
use crate::error::CheckIsLeaderError;
77
use crate::error::ClientWriteError;
8+
use crate::error::HigherVote;
89
use crate::error::Infallible;
910
use crate::error::InitializeError;
10-
use crate::error::InstallSnapshotError;
1111
use crate::raft::AppendEntriesRequest;
1212
use crate::raft::AppendEntriesResponse;
1313
use crate::raft::BoxCoreFn;
1414
use crate::raft::ClientWriteResponse;
15-
use crate::raft::InstallSnapshotRequest;
1615
use crate::raft::InstallSnapshotResponse;
1716
use crate::raft::VoteRequest;
1817
use crate::raft::VoteResponse;
1918
use crate::type_config::alias::LogIdOf;
2019
use crate::type_config::alias::NodeIdOf;
2120
use crate::type_config::alias::NodeOf;
21+
use crate::type_config::alias::SnapshotDataOf;
2222
use crate::ChangeMembers;
2323
use crate::MessageSummary;
2424
use crate::RaftTypeConfig;
@@ -32,9 +32,6 @@ pub(crate) type ResultSender<T, E = Infallible> = oneshot::Sender<Result<T, E>>;
3232

3333
pub(crate) type ResultReceiver<T, E = Infallible> = oneshot::Receiver<Result<T, E>>;
3434

35-
/// TX for Install Snapshot Response
36-
pub(crate) type InstallSnapshotTx<NID> = ResultSender<InstallSnapshotResponse<NID>, InstallSnapshotError>;
37-
3835
/// TX for Vote Response
3936
pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>>;
4037

@@ -64,17 +61,22 @@ where C: RaftTypeConfig
6461
tx: VoteTx<C::NodeId>,
6562
},
6663

67-
InstallSnapshot {
68-
rpc: InstallSnapshotRequest<C>,
69-
tx: InstallSnapshotTx<C::NodeId>,
70-
},
71-
7264
InstallCompleteSnapshot {
7365
vote: Vote<C::NodeId>,
7466
snapshot: Snapshot<C>,
7567
tx: ResultSender<InstallSnapshotResponse<C::NodeId>>,
7668
},
7769

70+
/// Begin receiving a snapshot from the leader.
71+
///
72+
/// Returns a handle to a snapshot data ready for receiving if successful.
73+
/// Otherwise, it is an error because of the `vote` is not GE the local `vote`, the local `vote`
74+
/// will be returned in a Err
75+
BeginReceivingSnapshot {
76+
vote: Vote<C::NodeId>,
77+
tx: ResultSender<Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
78+
},
79+
7880
ClientWriteRequest {
7981
app_data: C::D,
8082
tx: ClientWriteTx<C>,
@@ -119,8 +121,8 @@ where C: RaftTypeConfig
119121
RaftMsg::RequestVote { rpc, .. } => {
120122
format!("RequestVote: {}", rpc.summary())
121123
}
122-
RaftMsg::InstallSnapshot { rpc, .. } => {
123-
format!("InstallSnapshot: {}", rpc.summary())
124+
RaftMsg::BeginReceivingSnapshot { vote, .. } => {
125+
format!("BeginReceivingSnapshot: vote: {}", vote)
124126
}
125127
RaftMsg::InstallCompleteSnapshot { vote, snapshot, .. } => {
126128
format!("InstallCompleteSnapshot: vote: {}, snapshot: {}", vote, snapshot)

openraft/src/core/sm/command.rs

+13-56
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ use std::fmt::Formatter;
33

44
use crate::core::raft_msg::ResultSender;
55
use crate::display_ext::DisplaySlice;
6+
use crate::error::HigherVote;
67
use crate::log_id::RaftLogId;
7-
use crate::raft::InstallSnapshotRequest;
8-
use crate::MessageSummary;
8+
use crate::type_config::alias::SnapshotDataOf;
99
use crate::RaftTypeConfig;
1010
use crate::Snapshot;
11-
use crate::SnapshotMeta;
1211

1312
#[derive(PartialEq)]
1413
pub(crate) struct Command<C>
@@ -60,17 +59,8 @@ where C: RaftTypeConfig
6059
Command::new(payload)
6160
}
6261

63-
pub(crate) fn receive(req: InstallSnapshotRequest<C>) -> Self {
64-
let payload = CommandPayload::ReceiveSnapshotChunk { req };
65-
Command::new(payload)
66-
}
67-
68-
// TODO: all sm command should have a command seq.
69-
pub(crate) fn install_snapshot(snapshot_meta: SnapshotMeta<C::NodeId, C::Node>) -> Self {
70-
let payload = CommandPayload::FinalizeSnapshot {
71-
install: true,
72-
snapshot_meta,
73-
};
62+
pub(crate) fn begin_receiving_snapshot(tx: ResultSender<Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>) -> Self {
63+
let payload = CommandPayload::BeginReceivingSnapshot { tx };
7464
Command::new(payload)
7565
}
7666

@@ -79,14 +69,6 @@ where C: RaftTypeConfig
7969
Command::new(payload)
8070
}
8171

82-
pub(crate) fn cancel_snapshot(snapshot_meta: SnapshotMeta<C::NodeId, C::Node>) -> Self {
83-
let payload = CommandPayload::FinalizeSnapshot {
84-
install: false,
85-
snapshot_meta,
86-
};
87-
Command::new(payload)
88-
}
89-
9072
pub(crate) fn apply(entries: Vec<C::Entry>) -> Self {
9173
let payload = CommandPayload::Apply { entries };
9274
Command::new(payload)
@@ -112,21 +94,8 @@ where C: RaftTypeConfig
11294
tx: ResultSender<Option<Snapshot<C>>>,
11395
},
11496

115-
/// Receive a chunk of snapshot.
116-
///
117-
/// If it is the final chunk, the snapshot stream will be closed and saved.
118-
///
119-
/// Installing a snapshot includes two steps: ReceiveSnapshotChunk and FinalizeSnapshot.
120-
ReceiveSnapshotChunk {
121-
req: InstallSnapshotRequest<C>,
122-
},
123-
124-
/// After receiving all chunks, finalize the snapshot by installing it or discarding it,
125-
/// if the snapshot is stale(the snapshot last log id is smaller than the local committed).
126-
FinalizeSnapshot {
127-
/// To install it, or just discard it.
128-
install: bool,
129-
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
97+
BeginReceivingSnapshot {
98+
tx: ResultSender<Box<SnapshotDataOf<C>>, HigherVote<C::NodeId>>,
13099
},
131100

132101
InstallCompleteSnapshot {
@@ -146,15 +115,12 @@ where C: RaftTypeConfig
146115
match self {
147116
CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"),
148117
CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"),
149-
CommandPayload::ReceiveSnapshotChunk { req, .. } => {
150-
write!(f, "ReceiveSnapshotChunk: {}", req.summary())
151-
}
152-
CommandPayload::FinalizeSnapshot { install, snapshot_meta } => {
153-
write!(f, "FinalizeSnapshot: install:{} {:?}", install, snapshot_meta)
154-
}
155118
CommandPayload::InstallCompleteSnapshot { snapshot } => {
156119
write!(f, "InstallCompleteSnapshot: meta: {:?}", snapshot.meta)
157120
}
121+
CommandPayload::BeginReceivingSnapshot { .. } => {
122+
write!(f, "BeginReceivingSnapshot")
123+
}
158124
CommandPayload::Apply { entries } => write!(f, "Apply: {}", DisplaySlice::<_>(entries)),
159125
}
160126
}
@@ -168,20 +134,11 @@ where C: RaftTypeConfig
168134
match (self, other) {
169135
(CommandPayload::BuildSnapshot, CommandPayload::BuildSnapshot) => true,
170136
(CommandPayload::GetSnapshot { .. }, CommandPayload::GetSnapshot { .. }) => true,
137+
(CommandPayload::BeginReceivingSnapshot { .. }, CommandPayload::BeginReceivingSnapshot { .. }) => true,
171138
(
172-
CommandPayload::ReceiveSnapshotChunk { req: req1, .. },
173-
CommandPayload::ReceiveSnapshotChunk { req: req2, .. },
174-
) => req1 == req2,
175-
(
176-
CommandPayload::FinalizeSnapshot {
177-
install: install1,
178-
snapshot_meta: meta1,
179-
},
180-
CommandPayload::FinalizeSnapshot {
181-
install: install2,
182-
snapshot_meta: meta2,
183-
},
184-
) => install1 == install2 && meta1 == meta2,
139+
CommandPayload::InstallCompleteSnapshot { snapshot: s1 },
140+
CommandPayload::InstallCompleteSnapshot { snapshot: s2 },
141+
) => s1.meta == s2.meta,
185142
(CommandPayload::Apply { entries: entries1 }, CommandPayload::Apply { entries: entries2 }) => {
186143
// Entry may not be `Eq`, we just compare log id.
187144
// This would be enough for testing.

0 commit comments

Comments
 (0)