Skip to content

Commit 44381b0

Browse files
committed
Fix: when handling append-entries, if prev_log_id is purged, it should not delete any logs.
When handling append-entries, if the local log at `prev_log_id.index` is purged, a follower should not believe it is a **conflict** and should not delete all logs. It will get committed log lost. To fix this issue, use `last_applied` instead of `committed`: `last_applied` is always the committed log id, while `committed` is not persisted and may be smaller than the actually applied, when a follower is restarted.
1 parent 0355a60 commit 44381b0

File tree

5 files changed

+113
-3
lines changed

5 files changed

+113
-3
lines changed

openraft/src/core/append_entries.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
108108
// - keep track of last_log_id, first_log_id,
109109
// RaftStorage should only provides the least basic APIs.
110110

111-
self.storage.delete_conflict_logs_since(start).await?;
111+
let res = self.storage.delete_conflict_logs_since(start).await;
112+
tracing::debug!("delete_conflict_logs_since res: {:?}", res);
113+
114+
res?;
112115

113116
self.last_log_id = self.storage.get_log_state().await?.last_log_id;
114117

@@ -278,7 +281,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
278281
for i in 0..l {
279282
let log_id = entries[i].log_id;
280283

281-
if Some(log_id) <= self.committed {
284+
if Some(log_id) <= self.last_applied {
282285
continue;
283286
}
284287

@@ -312,7 +315,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
312315
};
313316

314317
// Committed entries are always safe and are consistent to a valid leader.
315-
if remote_log_id <= self.committed {
318+
if remote_log_id <= self.last_applied {
316319
return Ok(None);
317320
}
318321

openraft/src/defensive.rs

+4
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ where
191191
}
192192

193193
async fn defensive_delete_conflict_gt_last_applied(&self, since: LogId) -> Result<(), StorageError> {
194+
if !self.is_defensive() {
195+
return Ok(());
196+
}
197+
194198
let (last_applied, _) = self.inner().last_applied_state().await?;
195199
if Some(since.index) <= last_applied.index() {
196200
return Err(

openraft/tests/append_entries/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod fixtures;
88
mod t10_conflict_with_empty_entries;
99
mod t20_append_conflicts;
1010
mod t30_append_inconsistent_log;
11+
mod t31_append_prev_is_purged;
1112
mod t40_append_updates_membership;
1213
mod t50_append_entries_with_bigger_term;
1314
mod t60_large_heartbeat;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::Result;
4+
use maplit::btreeset;
5+
use openraft::raft::Entry;
6+
use openraft::raft::EntryPayload;
7+
use openraft::AppendEntriesRequest;
8+
use openraft::Config;
9+
use openraft::DefensiveCheck;
10+
use openraft::LogId;
11+
use openraft::Membership;
12+
use openraft::Raft;
13+
use openraft::RaftStorage;
14+
15+
use crate::fixtures::blank;
16+
use crate::fixtures::RaftRouter;
17+
18+
/// When handling append-entries, if the local log at `prev_log_id.index` is purged, a follower should not believe it is
19+
/// a **conflict** and should not delete all logs. Which will get committed log lost.
20+
///
21+
/// Fake a raft node with one log (1,3) and set last-applied to (1,2).
22+
/// Then an append-entries with `prev_log_id=(1,2)` should not be considered as **conflict**.
23+
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
24+
async fn append_prev_is_purged() -> Result<()> {
25+
let (_log_guard, ut_span) = init_ut!();
26+
let _ent = ut_span.enter();
27+
28+
let config = Arc::new(
29+
Config {
30+
max_applied_log_to_keep: 2,
31+
..Default::default()
32+
}
33+
.validate()?,
34+
);
35+
let router = Arc::new(RaftRouter::new(config.clone()));
36+
37+
tracing::info!("--- fake store: logs: (1,3), last_applied == last_purged == (1,2)");
38+
let sto0 = {
39+
let sto0 = router.new_store().await;
40+
41+
// With defensive==true, it will panic.
42+
sto0.set_defensive(false);
43+
44+
let entries = [
45+
&Entry {
46+
log_id: LogId { term: 0, index: 0 },
47+
payload: EntryPayload::Membership(Membership::new_single(btreeset! {0,1})),
48+
},
49+
&blank(1, 1),
50+
&blank(1, 2),
51+
&blank(1, 3),
52+
];
53+
54+
sto0.append_to_log(&entries).await?;
55+
sto0.apply_to_state_machine(&entries[0..3]).await?;
56+
sto0.purge_logs_upto(LogId::new(1, 2)).await?;
57+
58+
let logs = sto0.try_get_log_entries(..).await?;
59+
tracing::debug!("logs left after purge: {:?}", logs);
60+
assert_eq!(LogId::new(1, 3), logs[0].log_id);
61+
62+
sto0
63+
};
64+
65+
tracing::info!("--- new node with faked sto");
66+
let node0 = {
67+
let config0 = Arc::new(
68+
Config {
69+
max_applied_log_to_keep: 1,
70+
..Default::default()
71+
}
72+
.validate()?,
73+
);
74+
let node0 = Raft::new(0, config0.clone(), router.clone(), sto0.clone());
75+
router.add_raft_node(0, node0.clone(), sto0.clone()).await;
76+
node0
77+
};
78+
79+
tracing::info!("--- append-entries with prev_log_id=(1,2), should not erase any logs");
80+
{
81+
node0
82+
.append_entries(AppendEntriesRequest {
83+
term: 1,
84+
leader_id: 1,
85+
prev_log_id: Some(LogId::new(1, 2)),
86+
entries: vec![],
87+
leader_commit: None,
88+
})
89+
.await?;
90+
91+
let logs = sto0.try_get_log_entries(..).await?;
92+
tracing::debug!("logs left after append: {:?}", logs);
93+
assert_eq!(LogId::new(1, 3), logs[0].log_id);
94+
}
95+
96+
Ok(())
97+
}

openraft/tests/fixtures/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ impl RaftRouter {
270270
rt.insert(id, (node, sto));
271271
}
272272

273+
pub async fn add_raft_node(self: &Arc<Self>, id: NodeId, node: MemRaft, sto: Arc<StoreWithDefensive>) {
274+
let mut rt = self.routing_table.write().await;
275+
rt.insert(id, (node, sto));
276+
}
277+
273278
/// Remove the target node from the routing table & isolation.
274279
pub async fn remove_node(&self, id: NodeId) -> Option<(MemRaft, Arc<StoreWithDefensive>)> {
275280
let mut rt = self.routing_table.write().await;

0 commit comments

Comments
 (0)