Skip to content

Commit d5b52d3

Browse files
committed
Fix: snapshot with smaller last_log_id than last_applied should not be installed
Otherwise, the state machine will revert to an older state, while the in-memory `last_applied` is unchanged. This finally causes logs that falls in range `(snapshot.last_log_id, last_applied]` can not be applied.
1 parent 1fdc118 commit d5b52d3

File tree

2 files changed

+103
-2
lines changed

2 files changed

+103
-2
lines changed

openraft/snapshot_le_last_applied.rs

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
4+
use anyhow::Result;
5+
use maplit::btreeset;
6+
use openraft::raft::InstallSnapshotRequest;
7+
use openraft::Config;
8+
use openraft::LogId;
9+
use openraft::RaftStorage;
10+
use openraft::SnapshotMeta;
11+
12+
use crate::fixtures::RaftRouter;
13+
14+
/// Snapshot with smaller last_log_id than local last_applied should not be installed.
15+
/// Otherwise state machine will revert to a older state.
16+
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
17+
async fn snapshot_le_last_applied() -> Result<()> {
18+
let (_log_guard, ut_span) = init_ut!();
19+
let _ent = ut_span.enter();
20+
21+
let config = Arc::new(Config { ..Default::default() }.validate()?);
22+
let router = Arc::new(RaftRouter::new(config.clone()));
23+
24+
let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {}).await?;
25+
26+
tracing::info!("--- send logs to increase last_applied");
27+
{
28+
router.client_request_many(0, "0", 10).await;
29+
log_index += 10;
30+
31+
router
32+
.wait_for_log(
33+
&btreeset![0],
34+
Some(log_index),
35+
timeout(),
36+
"send log to trigger snapshot",
37+
)
38+
.await?;
39+
40+
router.assert_stable_cluster(Some(1), Some(log_index)).await;
41+
}
42+
43+
tracing::info!("--- it should fail to install a snapshot with smaller last_log_id");
44+
{
45+
assert!(log_index > 3);
46+
let req = InstallSnapshotRequest {
47+
term: 100,
48+
leader_id: 1,
49+
meta: SnapshotMeta {
50+
snapshot_id: "ss1".into(),
51+
last_log_id: Some(LogId { term: 1, index: 3 }),
52+
},
53+
offset: 0,
54+
data: vec![1, 2, 3],
55+
done: true,
56+
};
57+
58+
let n = router.remove_node(0).await.unwrap();
59+
n.0.install_snapshot(req).await?;
60+
let st = n.1.last_applied_state().await?;
61+
assert_eq!(
62+
Some(LogId {
63+
term: 1,
64+
index: log_index
65+
}),
66+
st.0,
67+
"last_applied is not affected"
68+
);
69+
70+
let wait_rs =
71+
n.0.wait(timeout())
72+
.metrics(
73+
|x| {
74+
x.last_applied
75+
!= Some(LogId {
76+
term: 1,
77+
index: log_index,
78+
})
79+
},
80+
"in-memory last_applied is not affected",
81+
)
82+
.await;
83+
84+
assert!(wait_rs.is_err());
85+
}
86+
87+
Ok(())
88+
}
89+
90+
fn timeout() -> Option<Duration> {
91+
Some(Duration::from_millis(1_000))
92+
}

openraft/src/core/install_snapshot.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
2626
/// Leaders always send chunks in order. It is important to note that, according to the Raft spec,
2727
/// a log may only have one snapshot at any time. As snapshot contents are application specific,
2828
/// the Raft log will only store a pointer to the snapshot file along with the index & term.
29-
#[tracing::instrument(level = "debug", skip(self, req), fields(req=%req.summary()))]
29+
#[tracing::instrument(level = "debug", skip_all)]
3030
pub(super) async fn handle_install_snapshot_request(
3131
&mut self,
3232
req: InstallSnapshotRequest,
3333
) -> RaftResult<InstallSnapshotResponse> {
34+
tracing::info!("handle_install_snapshot_request: req: {}", req.summary());
35+
3436
// If message's term is less than most recent term, then we do not honor the request.
3537
if req.term < self.current_term {
3638
return Ok(InstallSnapshotResponse {
@@ -193,7 +195,14 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
193195
// --------------------------------------------------------------------> time
194196
// ```
195197

196-
// TODO(xp): do not install if self.last_applied >= snapshot.meta.last_applied
198+
if req.meta.last_log_id <= self.last_applied {
199+
tracing::info!(
200+
"Skip install_snapshot because snapshot.last_log_id({:?}) <= last_applied({:?})",
201+
req.meta.last_log_id,
202+
self.last_applied
203+
);
204+
return Ok(());
205+
}
197206

198207
let changes = self
199208
.storage

0 commit comments

Comments
 (0)