Skip to content

Commit 112252b

Browse files
committed
change: RaftStorage add 2 API: last_id_in_log() and last_applied_state(), remove get_last_log_id()
1 parent 74283fd commit 112252b

13 files changed

+131
-194
lines changed

async-raft/src/storage.rs

+7-17
Original file line numberDiff line numberDiff line change
@@ -161,24 +161,14 @@ where
161161
/// It does not return an error if in defensive mode and the log entry at `log_index` is not found.
162162
async fn try_get_log_entry(&self, log_index: u64) -> Result<Option<Entry<D>>, StorageError>;
163163

164-
/// Returns the last known log id.
165-
/// It could be the id of the last entry in log, or the last applied id that is saved in state machine.
164+
/// Returns the last log id in log.
166165
///
167-
/// When there is no log or state machine, it returns (0,0)
168-
///
169-
/// Caveat: an impl must hold the log-state-machine consistency or must deal with the inconsistency when accessing
170-
/// it:
171-
///
172-
/// I.e.: if `logs.last().log_id.index > last_applied.index`, `logs.last().log_id > last_applied` must hold. E.g.,
173-
/// `logs.last() == {term:1, index:2}` and `last_applied == {term:2, index:1}` is inconsistent:
174-
///
175-
/// Log `{term:1, index:2}` can not be committed and should definitely be removed. The simplest way to achieve
176-
/// consistency is to remove such inconsistent logs after a successful `append_entries` or `install_snapshot`
177-
/// request.
178-
///
179-
/// TODO(xp) test it
180-
/// TODO(xp) defensive test about consistency
181-
async fn get_last_log_id(&self) -> Result<LogId, StorageError>;
166+
/// The impl should not consider the applied log id in state machine.
167+
async fn last_id_in_log(&self) -> Result<LogId, StorageError>;
168+
169+
/// Returns the last applied log id which is recorded in state machine, and the last applied membership log id and
170+
/// membership config.
171+
async fn last_applied_state(&self) -> Result<(LogId, Option<(LogId, MembershipConfig)>), StorageError>;
182172

183173
/// Delete all logs in a `range`.
184174
///

async-raft/tests/client_writes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ async fn client_writes() -> Result<()> {
111111
members_after_consensus: None,
112112
})),
113113
)
114-
.await;
114+
.await?;
115115

116116
Ok(())
117117
}

async-raft/tests/compaction.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ async fn compaction() -> Result<()> {
7979
members_after_consensus: None,
8080
})),
8181
)
82-
.await;
82+
.await?;
8383

8484
// Add a new node and assert that it received the same snapshot.
8585
let sto1 = router.new_store(1).await;
@@ -120,7 +120,7 @@ async fn compaction() -> Result<()> {
120120
LogId { term: 1, index: n_logs },
121121
expected_snap,
122122
)
123-
.await;
123+
.await?;
124124

125125
Ok(())
126126
}

async-raft/tests/fixtures/mod.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ impl RaftRouter {
579579
expect_voted_for: Option<u64>,
580580
expect_sm_last_applied_log: LogId,
581581
expect_snapshot: Option<(ValueTest<u64>, u64, MembershipConfig)>,
582-
) {
582+
) -> anyhow::Result<()> {
583583
let rt = self.routing_table.read().await;
584584
for (id, (_node, storage)) in rt.iter() {
585585
let last_log_id = storage.last_log_id().await;
@@ -645,14 +645,16 @@ impl RaftRouter {
645645
);
646646
}
647647

648-
let sm = storage.get_state_machine().await;
648+
let (last_applied, _) = storage.last_applied_state().await?;
649649

650650
assert_eq!(
651-
&sm.last_applied_log, &expect_sm_last_applied_log,
651+
&last_applied, &expect_sm_last_applied_log,
652652
"expected node {} to have state machine last_applied_log {}, got {}",
653-
id, expect_sm_last_applied_log, sm.last_applied_log
653+
id, expect_sm_last_applied_log, last_applied
654654
);
655655
}
656+
657+
Ok(())
656658
}
657659
}
658660

async-raft/tests/initialization.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use anyhow::Result;
44
use async_raft::raft::EntryPayload;
55
use async_raft::raft::MembershipConfig;
66
use async_raft::Config;
7+
use async_raft::LogId;
78
use async_raft::RaftStorage;
8-
use async_raft::RaftStorageDebug;
99
use async_raft::State;
1010
use fixtures::RaftRouter;
1111
use maplit::btreeset;
@@ -65,12 +65,12 @@ async fn initialization() -> Result<()> {
6565
};
6666
assert_eq!(btreeset![0, 1, 2], mem.members);
6767

68-
let sm_mem = sto.get_state_machine().await.last_membership.clone();
68+
let sm_mem = sto.last_applied_state().await?.1;
6969
assert_eq!(
70-
Some(MembershipConfig {
70+
Some((LogId { term: 1, index: 1 }, MembershipConfig {
7171
members: btreeset![0, 1, 2],
7272
members_after_consensus: None,
73-
}),
73+
})),
7474
sm_mem
7575
);
7676
}

async-raft/tests/singlenode.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async fn singlenode() -> Result<()> {
4949
// Write some data to the single node cluster.
5050
router.client_request_many(0, "0", 1000).await;
5151
router.assert_stable_cluster(Some(1), Some(1001)).await;
52-
router.assert_storage_state(1, 1001, Some(0), LogId { term: 1, index: 1001 }, None).await;
52+
router.assert_storage_state(1, 1001, Some(0), LogId { term: 1, index: 1001 }, None).await?;
5353

5454
// Read some data from the single node cluster.
5555
router.client_read(0).await?;

async-raft/tests/snapshot_chunk_size.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async fn snapshot_chunk_size() -> Result<()> {
6666

6767
router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?;
6868
router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
69-
router.assert_storage_state(1, want, Some(0), LogId { term: 1, index: want }, want_snap).await;
69+
router.assert_storage_state(1, want, Some(0), LogId { term: 1, index: want }, want_snap).await?;
7070
}
7171

7272
tracing::info!("--- add non-voter to receive snapshot and logs");
@@ -89,7 +89,7 @@ async fn snapshot_chunk_size() -> Result<()> {
8989
LogId { term: 1, index: want },
9090
want_snap,
9191
)
92-
.await;
92+
.await?;
9393
}
9494

9595
Ok(())

async-raft/tests/snapshot_ge_half_threshold.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
7878
members_after_consensus: None,
7979
})),
8080
)
81-
.await;
81+
.await?;
8282
}
8383

8484
tracing::info!("--- send logs to make distance between snapshot index and last_log_index");
@@ -106,7 +106,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
106106
LogId { term: 1, index: want },
107107
expected_snap,
108108
)
109-
.await;
109+
.await?;
110110
}
111111

112112
Ok(())

async-raft/tests/snapshot_overrides_membership.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
8383
members_after_consensus: None,
8484
})),
8585
)
86-
.await;
86+
.await?;
8787
}
8888

8989
tracing::info!("--- create non-voter");
@@ -146,7 +146,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
146146
LogId { term: 1, index: want },
147147
expected_snap,
148148
)
149-
.await;
149+
.await?;
150150

151151
let m = sto.get_membership_config().await?;
152152
assert_eq!(

async-raft/tests/state_machien_apply_membership.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use std::sync::Arc;
33
use anyhow::Result;
44
use async_raft::raft::MembershipConfig;
55
use async_raft::Config;
6-
use async_raft::RaftStorageDebug;
6+
use async_raft::LogId;
7+
use async_raft::RaftStorage;
78
use async_raft::State;
89
use fixtures::RaftRouter;
910
use futures::stream::StreamExt;
@@ -48,13 +49,12 @@ async fn state_machine_apply_membership() -> Result<()> {
4849

4950
for i in 0..=0 {
5051
let sto = router.get_storage_handle(&i).await?;
51-
let sm = sto.get_state_machine().await;
5252
assert_eq!(
53-
Some(MembershipConfig {
53+
Some((LogId { term: 1, index: 1 }, MembershipConfig {
5454
members: btreeset![0],
5555
members_after_consensus: None,
56-
}),
57-
sm.last_membership
56+
})),
57+
sto.last_applied_state().await?.1
5858
);
5959
}
6060

@@ -83,13 +83,13 @@ async fn state_machine_apply_membership() -> Result<()> {
8383
tracing::info!("--- check applied membership config");
8484
for i in 0..5 {
8585
let sto = router.get_storage_handle(&i).await?;
86-
let sm = sto.get_state_machine().await;
86+
let (_, last_membership) = sto.last_applied_state().await?;
8787
assert_eq!(
88-
Some(MembershipConfig {
88+
Some((LogId { term: 1, index: 3 }, MembershipConfig {
8989
members: btreeset![0, 1, 2],
9090
members_after_consensus: None,
91-
}),
92-
sm.last_membership
91+
})),
92+
last_membership
9393
);
9494
}
9595

async-raft/tests/stepdown.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ async fn stepdown() -> Result<()> {
131131
router.assert_stable_cluster(Some(metrics.current_term), Some(want)).await;
132132
router
133133
.assert_storage_state(metrics.current_term, want, None, LogId { term: 2, index: 4 }, None)
134-
.await;
134+
.await?;
135135
// ----------------------------------- ^^^ this is `0` instead of `4` because blank payloads from new leaders
136136
// and config change entries are never applied to the state machine.
137137

memstore/src/lib.rs

+29-60
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub struct MemStoreSnapshot {
7878
pub struct MemStoreStateMachine {
7979
pub last_applied_log: LogId,
8080

81-
pub last_membership: Option<MembershipConfig>,
81+
pub last_membership: Option<(LogId, MembershipConfig)>,
8282

8383
/// A mapping of client IDs to their state info.
8484
pub client_serial_responses: HashMap<String, (u64, Option<String>)>,
@@ -302,29 +302,6 @@ impl MemStore {
302302
std::cmp::max(log_last_id, sm_last_id)
303303
}
304304

305-
pub async fn defensive_consistent_log_sm(&self) -> Result<(), DefensiveError> {
306-
let log_last_id = {
307-
let log_last = self.log.read().await;
308-
log_last.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default()
309-
};
310-
311-
let sm_last_id = self.sm.read().await.last_applied_log;
312-
313-
if (log_last_id.index == sm_last_id.index && log_last_id != sm_last_id)
314-
|| (log_last_id.index > sm_last_id.index && log_last_id < sm_last_id)
315-
{
316-
return Err(DefensiveError::new(
317-
ErrorSubject::Log(log_last_id),
318-
Violation::DirtyLog {
319-
higher_index_log_id: log_last_id,
320-
lower_index_log_id: sm_last_id,
321-
},
322-
));
323-
}
324-
325-
Ok(())
326-
}
327-
328305
pub async fn defensive_apply_index_is_last_applied_plus_one<D: AppData>(
329306
&self,
330307
entries: &[&Entry<D>],
@@ -512,7 +489,7 @@ impl MemStore {
512489
pub async fn get_membership_from_log(&self, upto_index: Option<u64>) -> Result<MembershipConfig, StorageError> {
513490
self.defensive_no_dirty_log().await?;
514491

515-
let membership = {
492+
let membership_in_log = {
516493
let log = self.log.read().await;
517494

518495
let reversed_logs = log.values().rev();
@@ -527,26 +504,19 @@ impl MemStore {
527504

528505
// Find membership stored in state machine.
529506

530-
let (sm_mem, last_applied) = {
531-
let sm = self.sm.read().await;
532-
(sm.last_membership.clone(), sm.last_applied_log)
533-
};
507+
let (_, membership_in_sm) = self.last_applied_state().await?;
534508

535-
let membership = match membership {
536-
None => sm_mem,
537-
Some((id, log_mem)) => {
538-
if id < last_applied {
539-
sm_mem
540-
} else {
541-
Some(log_mem)
542-
}
543-
}
544-
};
509+
let membership =
510+
if membership_in_log.as_ref().map(|(id, _)| id.index) > membership_in_sm.as_ref().map(|(id, _)| id.index) {
511+
membership_in_log
512+
} else {
513+
membership_in_sm
514+
};
545515

546-
// Otherwise, create a default one.
516+
// Create a default one if both are None.
547517

548518
Ok(match membership {
549-
Some(cfg) => cfg,
519+
Some((_id, cfg)) => cfg,
550520
None => MembershipConfig::new_initial(self.id),
551521
})
552522
}
@@ -573,27 +543,22 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
573543

574544
let membership = self.get_membership_config().await?;
575545
let mut hs = self.hs.write().await;
576-
let log = self.log.read().await;
577-
let sm = self.sm.read().await;
578546
match &mut *hs {
579547
Some(inner) => {
580548
// Search for two place and use the max one,
581549
// because when a state machine is installed there could be logs
582550
// included in the state machine that are not cleaned:
583551
// - the last log id
584552
// - the last_applied log id in state machine.
585-
// TODO(xp): add test for RaftStore to ensure it looks for two places.
586553

587-
let last = log.values().rev().next();
588-
let last = last.map(|x| x.log_id);
589-
let last_in_log = last.unwrap_or_default();
590-
let last_applied_log = sm.last_applied_log;
554+
let last_in_log = self.last_id_in_log().await?;
555+
let (last_applied, _) = self.last_applied_state().await?;
591556

592-
let last_log_id = max(last_in_log, last_applied_log);
557+
let last_log_id = max(last_in_log, last_applied);
593558

594559
Ok(InitialState {
595560
last_log_id,
596-
last_applied: last_applied_log,
561+
last_applied,
597562
hard_state: inner.clone(),
598563
membership,
599564
})
@@ -639,15 +604,15 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
639604
Ok(log.get(&log_index).cloned())
640605
}
641606

642-
#[tracing::instrument(level = "trace", skip(self))]
643-
async fn get_last_log_id(&self) -> Result<LogId, StorageError> {
644-
self.defensive_consistent_log_sm().await?;
645-
646-
// TODO: log id must consistent:
647-
let log_last_id = self.log.read().await.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default();
648-
let last_applied_id = self.sm.read().await.last_applied_log;
607+
async fn last_id_in_log(&self) -> Result<LogId, StorageError> {
608+
let log = self.log.read().await;
609+
let last = log.iter().last().map(|(_, ent)| ent.log_id).unwrap_or_default();
610+
Ok(last)
611+
}
649612

650-
Ok(max(log_last_id, last_applied_id))
613+
async fn last_applied_state(&self) -> Result<(LogId, Option<(LogId, MembershipConfig)>), StorageError> {
614+
let sm = self.sm.read().await;
615+
Ok((sm.last_applied_log, sm.last_membership.clone()))
651616
}
652617

653618
#[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))]
@@ -714,7 +679,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
714679
res.push(ClientResponse(previous));
715680
}
716681
EntryPayload::ConfigChange(ref mem) => {
717-
sm.last_membership = Some(mem.membership.clone());
682+
sm.last_membership = Some((entry.log_id, mem.membership.clone()));
718683
res.push(ClientResponse(None))
719684
}
720685
};
@@ -734,7 +699,11 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
734699
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, e.into()))?;
735700

736701
last_applied_log = sm.last_applied_log;
737-
membership_config = sm.last_membership.clone().unwrap_or_else(|| MembershipConfig::new_initial(self.id));
702+
membership_config = sm
703+
.last_membership
704+
.clone()
705+
.map(|(_id, mem)| mem)
706+
.unwrap_or_else(|| MembershipConfig::new_initial(self.id));
738707
}
739708

740709
let snapshot_size = data.len();

0 commit comments

Comments
 (0)