Skip to content

Commit dba2403

Browse files
committed
fix: after 2 log compaction, membership should be able to be extract from prev compaction log
1 parent 447dc11 commit dba2403

File tree

2 files changed

+126
-1
lines changed

2 files changed

+126
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
mod fixtures;
2+
3+
use std::sync::Arc;
4+
5+
use anyhow::Result;
6+
use async_raft::raft::MembershipConfig;
7+
use async_raft::Config;
8+
use async_raft::LogId;
9+
use async_raft::RaftStorage;
10+
use async_raft::SnapshotPolicy;
11+
use async_raft::State;
12+
use fixtures::RaftRouter;
13+
use maplit::hashset;
14+
15+
/// Test a second compaction should not lose membership.
16+
///
17+
/// What does this test do?
18+
///
19+
/// - build a cluster of 2 nodes.
20+
/// - send enough requests to trigger a snapshot.
21+
/// - send just enough request to trigger another snapshot.
22+
/// - ensure membership is still valid.
23+
///
24+
/// export RUST_LOG=async_raft,memstore,snapshot_uses_prev_snap_membership=trace
25+
/// cargo test -p async-raft --test snapshot_uses_prev_snap_membership
26+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
27+
async fn snapshot_uses_prev_snap_membership() -> Result<()> {
28+
fixtures::init_tracing();
29+
30+
let snapshot_threshold: u64 = 10;
31+
32+
let config = Arc::new(
33+
Config::build("test".into())
34+
.snapshot_policy(SnapshotPolicy::LogsSinceLast(snapshot_threshold))
35+
.validate()
36+
.expect("failed to build Raft config"),
37+
);
38+
let router = Arc::new(RaftRouter::new(config.clone()));
39+
40+
let mut want = 0;
41+
42+
tracing::info!("--- initializing cluster of 2");
43+
{
44+
router.new_raft_node(0).await;
45+
46+
router.wait_for_log(&hashset![0], want, None, "empty").await?;
47+
router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?;
48+
router.initialize_from_single_node(0).await?;
49+
want += 1;
50+
51+
router.wait_for_log(&hashset![0], want, None, "init").await?;
52+
router.wait_for_state(&hashset![0], State::Leader, None, "empty").await?;
53+
54+
router.new_raft_node(1).await;
55+
router.add_non_voter(0, 1).await?;
56+
57+
router.change_membership(0, hashset![0, 1]).await?;
58+
want += 2;
59+
60+
router.wait_for_log(&hashset![0, 1], want, None, "cluster of 2").await?;
61+
}
62+
63+
let sto = router.get_storage_handle(&0).await?;
64+
65+
tracing::info!("--- send just enough logs to trigger snapshot");
66+
{
67+
router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await;
68+
want = snapshot_threshold;
69+
70+
router.wait_for_log(&hashset![0, 1], want, None, "send log to trigger snapshot").await?;
71+
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
72+
73+
let m = sto.get_membership_config().await?;
74+
assert_eq!(
75+
MembershipConfig {
76+
members: hashset![0, 1],
77+
members_after_consensus: None
78+
},
79+
m,
80+
"membership "
81+
);
82+
83+
// TODO(xp): this assertion fails because when change-membership, a append-entries request does not update
84+
// voted_for and does not call save_hard_state.
85+
// Thus the storage layer does not know about the leader==Some(0).
86+
// Update voted_for whenever a new leader is seen would solve this issue.
87+
// router
88+
// .assert_storage_state(
89+
// 1,
90+
// want,
91+
// Some(0),
92+
// want,
93+
// Some((want.into(), 1, MembershipConfig {
94+
// members: hashset![0, 1],
95+
// members_after_consensus: None,
96+
// })),
97+
// )
98+
// .await;
99+
}
100+
101+
tracing::info!("--- send just enough logs to trigger the 2nd snapshot");
102+
{
103+
router.client_request_many(0, "0", (snapshot_threshold * 2 - want) as usize).await;
104+
want = snapshot_threshold * 2;
105+
106+
router.wait_for_log(&hashset![0, 1], want, None, "send log to trigger snapshot").await?;
107+
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
108+
}
109+
110+
tracing::info!("--- check membership");
111+
{
112+
let m = sto.get_membership_config().await?;
113+
assert_eq!(
114+
MembershipConfig {
115+
members: hashset![0, 1],
116+
members_after_consensus: None
117+
},
118+
m,
119+
"membership "
120+
);
121+
}
122+
123+
Ok(())
124+
}

memstore/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
202202
}
203203
}
204204

205-
#[tracing::instrument(level = "trace", skip(self, hs))]
205+
#[tracing::instrument(level = "trace", skip(self))]
206206
async fn save_hard_state(&self, hs: &HardState) -> Result<()> {
207207
*self.hs.write().await = Some(hs.clone());
208208
Ok(())
@@ -305,6 +305,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
305305
.skip_while(|entry| entry.log_id.index > last_applied_log)
306306
.find_map(|entry| match &entry.payload {
307307
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
308+
EntryPayload::SnapshotPointer(cfg) => Some(cfg.membership.clone()),
308309
_ => None,
309310
})
310311
.unwrap_or_else(|| MembershipConfig::new_initial(self.id));

0 commit comments

Comments
 (0)