Skip to content

Commit 4911180

Browse files
committed
Refactor: when installing snapshot, delete logs included in snapshot
A local log that is included in snapshot may be inconsistent with the leader. It has to purge all of them to prevent these log form being replicated, when this node becomes leader. When deleting these logs, it does not need to consider the `max_applied_log_to_keep` config. Thus `purge_applied_logs()` does not need to deal with the condition that `last_log_id` is inconsistent with `last_applied`.
1 parent b827b54 commit 4911180

File tree

3 files changed

+19
-27
lines changed

3 files changed

+19
-27
lines changed

openraft/src/core/install_snapshot.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use anyerror::AnyError;
44
use tokio::io::AsyncSeekExt;
55
use tokio::io::AsyncWriteExt;
66

7-
use crate::core::purge_applied_logs;
87
use crate::core::RaftCore;
98
use crate::core::ServerState;
109
use crate::core::SnapshotState;
@@ -214,29 +213,32 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
214213
// TODO(xp): do not install if self.engine.st.last_applied >= snapshot.meta.last_applied
215214

216215
let changes = self.storage.install_snapshot(&req.meta, snapshot).await?;
217-
218216
tracing::debug!("update after apply or install-snapshot: {:?}", changes);
219217

220-
// After installing snapshot, no inconsistent log is removed.
221-
// This does not affect raft consistency.
222-
// If you have any question about this, let me know: drdr.xp at gmail.com
223-
224218
let last_applied = changes.last_applied;
225219

226-
// Applied logs are not needed.
227-
purge_applied_logs(&mut self.storage, &last_applied, self.config.max_applied_log_to_keep).await?;
228-
229-
// snapshot is installed
230-
self.engine.state.last_applied = Some(last_applied);
220+
let st = &mut self.engine.state;
231221

232-
if self.engine.state.committed < self.engine.state.last_applied {
233-
self.engine.state.committed = self.engine.state.last_applied;
222+
if st.last_log_id < Some(last_applied) {
223+
st.last_log_id = Some(last_applied);
234224
}
235-
if self.engine.state.last_log_id < self.engine.state.last_applied {
236-
self.engine.state.last_log_id = self.engine.state.last_applied;
225+
if st.committed < Some(last_applied) {
226+
st.committed = Some(last_applied);
237227
}
228+
if st.last_applied < Some(last_applied) {
229+
st.last_applied = Some(last_applied);
230+
}
231+
232+
assert!(st.last_purged_log_id <= Some(last_applied));
233+
234+
// A local log that is <= last_applied may be inconsistent with the leader.
235+
// It has to purge all of them to prevent these log form being replicated, when this node becomes leader.
236+
st.last_purged_log_id = Some(last_applied);
237+
self.storage.purge_logs_upto(last_applied).await?;
238238

239-
// There could be unknown membership in the snapshot.
239+
// TODO(xp): just compare the membership config in the snapshot and the current effective membership config and
240+
// decide whether to replace the effective confg.
241+
// No need to bother the storage.
240242
let membership = self.storage.get_membership().await?;
241243
tracing::debug!("storage membership: {:?}", membership);
242244

openraft/src/core/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ mod snapshot_state;
1717
pub(crate) use internal_msg::InternalMessage;
1818
use leader_state::LeaderState;
1919
use raft_core::apply_to_state_machine;
20-
use raft_core::purge_applied_logs;
2120
use raft_core::MetricsProvider;
2221
pub use raft_core::RaftCore;
2322
pub use replication_state::is_matched_upto_date;

openraft/src/core/raft_core.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -582,21 +582,12 @@ where
582582

583583
tracing::debug!(%last_applied, max_keep, delete_lt = end, "delete_applied_logs");
584584

585-
if end == 0 {
586-
return Ok(());
587-
}
588-
589585
let st = sto.get_log_state().await?;
590586

591-
if st.last_log_id < Some(*last_applied) {
592-
sto.purge_logs_upto(*last_applied).await?;
593-
return Ok(());
594-
}
595-
596587
// non applied logs are deleted. it is a bug.
597588
assert!(st.last_purged_log_id <= Some(*last_applied));
598589

599-
if st.last_purged_log_id.index() >= Some(end - 1) {
590+
if st.last_purged_log_id.next_index() >= end {
600591
return Ok(());
601592
}
602593

0 commit comments

Comments
 (0)