Skip to content

Commit 82a3f2f

Browse files
committed
change: use LogId to track last applied instead of using just an index.
It provides more info by Using LogId to track last applied log. E.g. when creating a snapshot, it need to walk through logs to find the term of the last applied log, just like it did in memstore impl. Using LogId{term, index} is a more natural way in every aspect. changes: RaftCore: change type of `last_applied` from u64 to LogId.
1 parent 0b719d6 commit 82a3f2f

15 files changed

+112
-57
lines changed

async-raft/src/core/append_entries.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::raft::Entry;
99
use crate::raft::EntryPayload;
1010
use crate::AppData;
1111
use crate::AppDataResponse;
12+
use crate::LogId;
1213
use crate::RaftNetwork;
1314
use crate::RaftStorage;
1415
use crate::Update;
@@ -209,7 +210,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
209210
return;
210211
}
211212
// If we don't have any new entries to replicate, then do nothing.
212-
if self.commit_index <= self.last_applied {
213+
if self.commit_index <= self.last_applied.index {
213214
return;
214215
}
215216
// If we have no cached entries, then do nothing.
@@ -219,13 +220,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
219220
};
220221

221222
// Drain entries from the beginning of the cache up to commit index.
222-
let mut last_entry_seen: Option<u64> = None;
223+
let mut last_entry_seen: Option<LogId> = None;
223224
let entries: Vec<_> = (first_idx..=self.commit_index)
224225
.filter_map(|idx| {
225226
if let Some(entry) = self.entries_cache.remove(&idx) {
226-
last_entry_seen = Some(entry.log_id.index);
227+
last_entry_seen = Some(entry.log_id);
227228
match entry.payload {
228-
EntryPayload::Normal(inner) => Some((entry.log_id.index, inner.data)),
229+
EntryPayload::Normal(inner) => Some((entry.log_id, inner.data)),
229230
_ => None,
230231
}
231232
} else {
@@ -236,8 +237,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
236237

237238
// If we have no data entries to apply, then do nothing.
238239
if entries.is_empty() {
239-
if let Some(index) = last_entry_seen {
240-
self.last_applied = index;
240+
if let Some(log_id) = last_entry_seen {
241+
self.last_applied = log_id;
241242
self.report_metrics(Update::Ignore);
242243
}
243244
return;
@@ -264,7 +265,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
264265
#[tracing::instrument(level = "trace", skip(self))]
265266
async fn initial_replicate_to_state_machine(&mut self) {
266267
let stop = std::cmp::min(self.commit_index, self.last_log_id.index) + 1;
267-
let start = self.last_applied + 1;
268+
let start = self.last_applied.index + 1;
268269
let storage = self.storage.clone();
269270

270271
// If we already have an active replication task, then do nothing.
@@ -274,15 +275,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
274275

275276
// Fetch the series of entries which must be applied to the state machine, then apply them.
276277
let handle = tokio::spawn(async move {
277-
let mut new_last_applied: Option<u64> = None;
278+
let mut new_last_applied: Option<LogId> = None;
278279
let entries = storage.get_log_entries(start, stop).await?;
279280
if let Some(entry) = entries.last() {
280-
new_last_applied = Some(entry.log_id.index);
281+
new_last_applied = Some(entry.log_id);
281282
}
282283
let data_entries: Vec<_> = entries
283284
.iter()
284285
.filter_map(|entry| match &entry.payload {
285-
EntryPayload::Normal(inner) => Some((&entry.log_id.index, &inner.data)),
286+
EntryPayload::Normal(inner) => Some((&entry.log_id, &inner.data)),
286287
_ => None,
287288
})
288289
.collect();

async-raft/src/core/client.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
334334
}
335335
}
336336
ClientOrInternalResponseTx::Internal(tx) => {
337-
self.core.last_applied = req.entry.log_id.index;
337+
self.core.last_applied = req.entry.log_id;
338338
self.leader_report_metrics();
339339
let _ = tx.send(Ok(req.entry.log_id.index));
340340
}
@@ -355,7 +355,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
355355

356356
let index = log_id.index;
357357

358-
let expected_next_index = self.core.last_applied + 1;
358+
let expected_next_index = self.core.last_applied.index + 1;
359359
if index != expected_next_index {
360360
let entries = self
361361
.core
@@ -365,13 +365,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
365365
.map_err(|err| self.core.map_fatal_storage_error(err))?;
366366

367367
if let Some(entry) = entries.last() {
368-
self.core.last_applied = entry.log_id.index;
368+
self.core.last_applied = entry.log_id;
369369
}
370370

371371
let data_entries: Vec<_> = entries
372372
.iter()
373373
.filter_map(|entry| match &entry.payload {
374-
EntryPayload::Normal(inner) => Some((&entry.log_id.index, &inner.data)),
374+
EntryPayload::Normal(inner) => Some((&entry.log_id, &inner.data)),
375375
_ => None,
376376
})
377377
.collect();
@@ -393,7 +393,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
393393
}
394394
}
395395
// Apply this entry to the state machine and return its data response.
396-
let res = self.core.storage.apply_entry_to_state_machine(&index, entry).await.map_err(|err| {
396+
let res = self.core.storage.apply_entry_to_state_machine(&log_id, entry).await.map_err(|err| {
397397
if err.downcast_ref::<S::ShutdownError>().is_some() {
398398
// If this is an instance of the storage impl's shutdown error, then trigger shutdown.
399399
self.core.map_fatal_storage_error(err)
@@ -402,7 +402,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
402402
RaftError::RaftStorage(err)
403403
}
404404
});
405-
self.core.last_applied = index;
405+
self.core.last_applied = *log_id;
406406
self.leader_report_metrics();
407407
res
408408
}

async-raft/src/core/install_snapshot.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
174174
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
175175
self.update_membership(membership)?;
176176
self.last_log_id = req.meta.last_log_id;
177-
self.last_applied = req.meta.last_log_id.index;
177+
self.last_applied = req.meta.last_log_id;
178178
self.snapshot_last_log_id = req.meta.last_log_id;
179179
self.report_metrics(Update::Ignore);
180180
Ok(())

async-raft/src/core/mod.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,15 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
8484
/// Is initialized to 0, and increases monotonically. This is always based on the leader's
8585
/// commit index which is communicated to other members via the AppendEntries protocol.
8686
commit_index: u64,
87-
/// The index of the highest log entry which has been applied to the local state machine.
87+
88+
/// The log id of the highest log entry which has been applied to the local state machine.
8889
///
89-
/// Is initialized to 0 for a pristine node; else, for nodes with existing state it is
90+
/// Is initialized to 0,0 for a pristine node; else, for nodes with existing state it is
9091
/// is initialized to the value returned from the `RaftStorage::get_initial_state` on startup.
9192
/// This value increases following the `commit_index` as logs are applied to the state
9293
/// machine (via the storage interface).
93-
last_applied: u64,
94+
last_applied: LogId,
95+
9496
/// The current term.
9597
///
9698
/// Is initialized to 0 on first boot, and increases monotonically. This is normally based on
@@ -130,7 +132,7 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
130132
/// This abstraction is needed to ensure that replicating to the state machine does not block
131133
/// the AppendEntries RPC flow, and to ensure that we have a smooth transition to becoming
132134
/// leader without concern over duplicate application of entries to the state machine.
133-
replicate_to_sm_handle: FuturesOrdered<JoinHandle<anyhow::Result<Option<u64>>>>,
135+
replicate_to_sm_handle: FuturesOrdered<JoinHandle<anyhow::Result<Option<LogId>>>>,
134136
/// A bool indicating if this system has performed its initial replication of
135137
/// outstanding entries to the state machine.
136138
has_completed_initial_replication_to_sm: bool,
@@ -168,7 +170,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
168170
storage,
169171
target_state: State::Follower,
170172
commit_index: 0,
171-
last_applied: 0,
173+
last_applied: LogId { term: 0, index: 0 },
172174
current_term: 0,
173175
current_leader: None,
174176
voted_for: None,
@@ -281,7 +283,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
281283
state: self.target_state,
282284
current_term: self.current_term,
283285
last_log_index: self.last_log_id.index,
284-
last_applied: self.last_applied,
286+
last_applied: self.last_applied.index,
285287
current_leader: self.current_leader,
286288
membership_config: self.membership.clone(),
287289
snapshot: self.snapshot_last_log_id,
@@ -425,13 +427,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
425427
}
426428
let SnapshotPolicy::LogsSinceLast(threshold) = &self.config.snapshot_policy;
427429
// Check to ensure we have actual entries for compaction.
428-
if self.last_applied == 0 || self.last_applied < self.snapshot_last_log_id.index {
430+
if self.last_applied.index == 0 || self.last_applied.index < self.snapshot_last_log_id.index {
429431
return;
430432
}
431433

432434
if !force {
433435
// If we are below the threshold, then there is nothing to do.
434-
if self.last_applied < self.snapshot_last_log_id.index + *threshold {
436+
if self.last_applied.index < self.snapshot_last_log_id.index + *threshold {
435437
return;
436438
}
437439
}
@@ -471,7 +473,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
471473

472474
/// Handle the output of an async task replicating entries to the state machine.
473475
#[tracing::instrument(level = "trace", skip(self, res))]
474-
pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result<Option<u64>>) -> RaftResult<()> {
476+
pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result<Option<LogId>>) -> RaftResult<()> {
475477
let last_applied_opt = res.map_err(|err| self.map_fatal_storage_error(err))?;
476478
if let Some(last_applied) = last_applied_opt {
477479
self.last_applied = last_applied;

async-raft/src/storage.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ pub struct InitialState {
5858
/// The last entry.
5959
pub last_log_id: LogId,
6060

61-
/// The index of the last log applied to the state machine.
62-
pub last_applied_log: u64,
61+
/// The LogId of the last log applied to the state machine.
62+
pub last_applied_log: LogId,
6363
/// The saved hard state of the node.
6464
pub hard_state: HardState,
6565
/// The latest cluster membership configuration found in the log, else a new initial
@@ -75,7 +75,7 @@ impl InitialState {
7575
pub fn new_initial(id: NodeId) -> Self {
7676
Self {
7777
last_log_id: LogId { term: 0, index: 0 },
78-
last_applied_log: 0,
78+
last_applied_log: LogId { term: 0, index: 0 },
7979
hard_state: HardState {
8080
current_term: 0,
8181
voted_for: None,
@@ -186,15 +186,15 @@ where
186186
///
187187
/// It is important to note that even in cases where an application specific error is returned,
188188
/// implementations should still record that the entry has been applied to the state machine.
189-
async fn apply_entry_to_state_machine(&self, index: &u64, data: &D) -> Result<R>;
189+
async fn apply_entry_to_state_machine(&self, index: &LogId, data: &D) -> Result<R>;
190190

191191
/// Apply the given payload of entries to the state machine, as part of replication.
192192
///
193193
/// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which
194194
/// have been replicated to a majority of the cluster, will be applied to the state machine.
195195
///
196196
/// Errors returned from this method will cause Raft to go into shutdown.
197-
async fn replicate_to_state_machine(&self, entries: &[(&u64, &D)]) -> Result<()>;
197+
async fn replicate_to_state_machine(&self, entries: &[(&LogId, &D)]) -> Result<()>;
198198

199199
/// Perform log compaction, returning a handle to the generated snapshot.
200200
///

async-raft/tests/client_writes.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
use anyhow::Result;
44
use async_raft::raft::MembershipConfig;
55
use async_raft::Config;
6+
use async_raft::LogId;
67
use async_raft::State;
78
use fixtures::RaftRouter;
89
use futures::prelude::*;
@@ -67,7 +68,7 @@ async fn client_writes() -> Result<()> {
6768
1,
6869
want,
6970
Some(0),
70-
want,
71+
LogId { term: 1, index: want },
7172
Some(((5000..5100).into(), 1, MembershipConfig {
7273
members: hashset![0, 1, 2],
7374
members_after_consensus: None,

async-raft/tests/compaction.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async fn compaction() -> Result<()> {
6565
1,
6666
want,
6767
Some(0),
68-
want,
68+
LogId { term: 1, index: want },
6969
Some((want.into(), 1, MembershipConfig {
7070
members: hashset![0],
7171
members_after_consensus: None,
@@ -87,7 +87,15 @@ async fn compaction() -> Result<()> {
8787
members: hashset![0u64],
8888
members_after_consensus: None,
8989
}));
90-
router.assert_storage_state(1, want, None /* non-voter does not vote */, want, expected_snap).await;
90+
router
91+
.assert_storage_state(
92+
1,
93+
want,
94+
None, /* non-voter does not vote */
95+
LogId { term: 1, index: want },
96+
expected_snap,
97+
)
98+
.await;
9199

92100
Ok(())
93101
}

async-raft/tests/fixtures/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ impl RaftRouter {
460460
expect_term: u64,
461461
expect_last_log: u64,
462462
expect_voted_for: Option<u64>,
463-
expect_sm_last_applied_log: u64,
463+
expect_sm_last_applied_log: LogId,
464464
expect_snapshot: Option<(ValueTest<u64>, u64, MembershipConfig)>,
465465
) {
466466
let rt = self.routing_table.read().await;

async-raft/tests/singlenode.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::sync::Arc;
44

55
use anyhow::Result;
66
use async_raft::Config;
7+
use async_raft::LogId;
78
use async_raft::State;
89
use fixtures::RaftRouter;
910
use maplit::hashset;
@@ -46,7 +47,7 @@ async fn singlenode() -> Result<()> {
4647
// Write some data to the single node cluster.
4748
router.client_request_many(0, "0", 1000).await;
4849
router.assert_stable_cluster(Some(1), Some(1001)).await;
49-
router.assert_storage_state(1, 1001, Some(0), 1001, None).await;
50+
router.assert_storage_state(1, 1001, Some(0), LogId { term: 1, index: 1001 }, None).await;
5051

5152
// Read some data from the single node cluster.
5253
router.client_read(0).await?;

async-raft/tests/snapshot_chunk_size.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ async fn snapshot_chunk_size() -> Result<()> {
6363

6464
router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?;
6565
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
66-
router.assert_storage_state(1, want, Some(0), want, want_snap).await;
66+
router.assert_storage_state(1, want, Some(0), LogId { term: 1, index: want }, want_snap).await;
6767
}
6868

6969
tracing::info!("--- add non-voter to receive snapshot and logs");
@@ -78,7 +78,15 @@ async fn snapshot_chunk_size() -> Result<()> {
7878

7979
router.wait_for_log(&hashset![0, 1], want, None, "add non-voter").await?;
8080
router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?;
81-
router.assert_storage_state(1, want, None /* non-voter does not vote */, want, want_snap).await;
81+
router
82+
.assert_storage_state(
83+
1,
84+
want,
85+
None, /* non-voter does not vote */
86+
LogId { term: 1, index: want },
87+
want_snap,
88+
)
89+
.await;
8290
}
8391

8492
Ok(())

async-raft/tests/snapshot_ge_half_threshold.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
6868
1,
6969
want,
7070
Some(0),
71-
want,
71+
LogId { term: 1, index: want },
7272
Some((want.into(), 1, MembershipConfig {
7373
members: hashset![0],
7474
members_after_consensus: None,
@@ -94,7 +94,15 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
9494
members_after_consensus: None,
9595
}));
9696
router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?;
97-
router.assert_storage_state(1, want, None /* non-voter does not vote */, want, expected_snap).await;
97+
router
98+
.assert_storage_state(
99+
1,
100+
want,
101+
None, /* non-voter does not vote */
102+
LogId { term: 1, index: want },
103+
expected_snap,
104+
)
105+
.await;
98106
}
99107

100108
Ok(())

async-raft/tests/snapshot_overrides_membership.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
7070
1,
7171
want,
7272
Some(0),
73-
want,
73+
LogId { term: 1, index: want },
7474
Some((want.into(), 1, MembershipConfig {
7575
members: hashset![0],
7676
members_after_consensus: None,
@@ -127,7 +127,15 @@ async fn snapshot_overrides_membership() -> Result<()> {
127127
members_after_consensus: None,
128128
}));
129129
router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?;
130-
router.assert_storage_state(1, want, None /* non-voter does not vote */, want, expected_snap).await;
130+
router
131+
.assert_storage_state(
132+
1,
133+
want,
134+
None, /* non-voter does not vote */
135+
LogId { term: 1, index: want },
136+
expected_snap,
137+
)
138+
.await;
131139

132140
let m = sto.get_membership_config().await?;
133141
assert_eq!(

0 commit comments

Comments
 (0)