Skip to content

Commit 25e94c3

Browse files
committed
Change: InstallSnapshotResponse: replies the last applied log id; Do not install a smaller snapshot
A snapshot may not be installed by a follower if it already has a higher `last_applied` log id locally. In such a case, it just ignores the snapshot and respond with its local `last_applied` log id. This way the applied state(i.e., `last_applied`) will never revert back.
1 parent 6057abb commit 25e94c3

File tree

4 files changed

+41
-4
lines changed

4 files changed

+41
-4
lines changed

openraft/src/core/install_snapshot.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
4040
if req.term < self.current_term {
4141
return Ok(InstallSnapshotResponse {
4242
term: self.current_term,
43+
last_applied: None,
4344
});
4445
}
4546

@@ -134,6 +135,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
134135
self.finalize_snapshot_installation(req, snapshot).await?;
135136
return Ok(InstallSnapshotResponse {
136137
term: self.current_term,
138+
last_applied: self.last_applied,
137139
});
138140
}
139141

@@ -145,6 +147,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
145147
});
146148
Ok(InstallSnapshotResponse {
147149
term: self.current_term,
150+
last_applied: None,
148151
})
149152
}
150153

@@ -188,6 +191,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
188191
}
189192
Ok(InstallSnapshotResponse {
190193
term: self.current_term,
194+
last_applied: self.last_applied,
191195
})
192196
}
193197

@@ -224,7 +228,14 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
224228
// --------------------------------------------------------------------> time
225229
// ```
226230

227-
// TODO(xp): do not install if self.last_applied >= snapshot.meta.last_applied
231+
if req.meta.last_log_id < self.last_applied {
232+
tracing::info!(
233+
"skip installing snapshot because snapshot_meta.last_log_id({}) <= self.last_applied({})",
234+
req.meta.last_log_id.summary(),
235+
self.last_applied.summary(),
236+
);
237+
return Ok(());
238+
}
228239

229240
let changes = self.storage.install_snapshot(&req.meta, snapshot).await?;
230241

@@ -253,7 +264,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
253264

254265
// There could be unknown membership in the snapshot.
255266
let membership = StorageHelper::new(&self.storage).get_membership().await?;
256-
tracing::info!("refetch membership from store: {:?}", membership);
267+
tracing::info!("re-fetch membership from store: {:?}", membership);
257268

258269
assert!(membership.is_some());
259270

openraft/src/replication/mod.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -887,13 +887,20 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
887887
// If we just sent the final chunk of the snapshot, then transition to lagging state.
888888
if done {
889889
tracing::info!(
890-
"done install snapshot: snapshot last_log_id: {:?}, matched: {:?}",
890+
"done install snapshot: snapshot last_log_id: {:?}, self.matched: {:?}, remote last_applied: {:?}",
891891
snapshot.meta.last_log_id,
892892
self.matched,
893+
res.last_applied,
893894
);
894895

895-
self.update_matched(snapshot.meta.last_log_id);
896+
// In previous version a node that does return `last_applied`.
897+
let matched = if res.last_applied.is_some() {
898+
res.last_applied
899+
} else {
900+
snapshot.meta.last_log_id
901+
};
896902

903+
self.update_matched(matched);
897904
return Ok(());
898905
}
899906

openraft/src/types/v070/log_id.rs

+13
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
11
use serde::Deserialize;
22
use serde::Serialize;
33

4+
use crate::MessageSummary;
5+
46
/// The identity of a raft log.
57
/// A term and an index identifies an log globally.
68
#[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)]
79
pub struct LogId {
810
pub term: u64,
911
pub index: u64,
1012
}
13+
14+
impl MessageSummary for Option<LogId> {
15+
fn summary(&self) -> String {
16+
match self {
17+
None => "None".to_string(),
18+
Some(log_id) => {
19+
format!("{}", log_id)
20+
}
21+
}
22+
}
23+
}

openraft/src/types/v070/rpc.rs

+6
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ pub struct InstallSnapshotRequest {
8989
pub struct InstallSnapshotResponse {
9090
/// The receiving node's current term, for leader to update itself.
9191
pub term: u64,
92+
93+
/// The last applied log id after snapshot being installed.
94+
///
95+
/// A node may choose not to install a snapshot if it already has a greater `last_applied`.
96+
/// In this case, it just returns the `last_applied`.
97+
pub last_applied: Option<LogId>,
9298
}
9399

94100
/// The response to a `ClientRequest`.

0 commit comments

Comments
 (0)