Skip to content

Commit 86e2ccd

Browse files
committed
Fix: a single Candidate should be able to vote itself.
A Candidate should check if it is the only member in a cluster before sending vote request. Otherwise a single node cluster does work. - Change: `Wait::log_at_least()` use `Option<u64>` instead of u64 for log index.
1 parent 8b5198c commit 86e2ccd

File tree

8 files changed

+183
-15
lines changed

8 files changed

+183
-15
lines changed

memstore/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ pub struct MemStore {
103103

104104
impl MemStore {
105105
/// Create a new `MemStore` instance.
106-
/// TODO(xp): creating a store should not require an id.
107106
pub async fn new() -> Self {
108107
let log = RwLock::new(BTreeMap::new());
109108
let sm = RwLock::new(MemStoreStateMachine::default());

openraft/src/core/mod.rs

+25-3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::raft::Entry;
4848
use crate::raft::EntryPayload;
4949
use crate::raft::RaftMsg;
5050
use crate::raft::RaftRespTx;
51+
use crate::raft::VoteResponse;
5152
use crate::raft_types::LogIdOptionExt;
5253
use crate::replication::ReplicaEvent;
5354
use crate::replication::ReplicationStream;
@@ -353,6 +354,14 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
353354
leader_metrics,
354355
};
355356

357+
{
358+
let curr = self.tx_metrics.borrow();
359+
if m == *curr {
360+
tracing::debug!("metrics not changed: {}", m.summary());
361+
return;
362+
}
363+
}
364+
356365
tracing::debug!("report_metrics: {}", m.summary());
357366
let res = self.tx_metrics.send(m);
358367

@@ -879,18 +888,17 @@ struct CandidateState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S:
879888

880889
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> CandidateState<'a, D, R, N, S> {
881890
pub(self) fn new(core: &'a mut RaftCore<D, R, N, S>) -> Self {
882-
let id = core.id;
883891
Self {
884892
core,
885-
// vote for itself.
886-
granted: btreeset! {id},
893+
granted: btreeset! {},
887894
}
888895
}
889896

890897
/// Run the candidate loop.
891898
#[tracing::instrument(level="debug", skip(self), fields(id=self.core.id, raft_state="candidate"))]
892899
pub(self) async fn run(mut self) -> Result<(), Fatal> {
893900
// Each iteration of the outer loop represents a new term.
901+
894902
loop {
895903
if !self.core.target_state.is_candidate() {
896904
return Ok(());
@@ -904,6 +912,20 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
904912
self.core.save_hard_state().await?;
905913
self.core.report_metrics(Update::Update(None));
906914

915+
// vote for itself.
916+
self.handle_vote_response(
917+
VoteResponse {
918+
term: self.core.current_term,
919+
vote_granted: true,
920+
last_log_id: self.core.last_log_id,
921+
},
922+
self.core.id,
923+
)
924+
.await?;
925+
if !self.core.target_state.is_candidate() {
926+
return Ok(());
927+
}
928+
907929
// Send RPCs to all members in parallel.
908930
let mut pending_votes = self.spawn_parallel_vote_requests();
909931

openraft/src/core/vote.rs

-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
114114
/// Handle response from a vote request sent to a peer.
115115
#[tracing::instrument(level = "debug", skip(self))]
116116
pub(super) async fn handle_vote_response(&mut self, res: VoteResponse, target: NodeId) -> Result<(), StorageError> {
117-
// TODO(xp): change membership from 123 to 4 may hangs I guess. Because this function will not be called.
118117
// If peer's term is greater than current term, revert to follower state.
119118

120119
if res.term > self.core.current_term {

openraft/src/metrics.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,16 @@ impl Wait {
225225

226226
/// Wait until applied at least `want_log`(inclusive) logs or timeout.
227227
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
228-
pub async fn log_at_least(&self, want_log: u64, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
228+
pub async fn log_at_least(&self, want_log: Option<u64>, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
229229
self.metrics(
230-
|x| x.last_log_index >= Some(want_log),
231-
&format!("{} .last_log_index >= {}", msg.to_string(), want_log),
230+
|x| x.last_log_index >= want_log,
231+
&format!("{} .last_log_index >= {:?}", msg.to_string(), want_log),
232232
)
233233
.await?;
234234

235235
self.metrics(
236-
|x| x.last_applied.index() >= Some(want_log),
237-
&format!("{} .last_applied >= {}", msg.to_string(), want_log),
236+
|x| x.last_applied.index() >= want_log,
237+
&format!("{} .last_applied >= {:?}", msg.to_string(), want_log),
238238
)
239239
.await
240240
}

openraft/src/metrics_wait_test.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ async fn test_wait() -> anyhow::Result<()> {
4545
assert!(rst.is_ok());
4646
});
4747
let got = w.log(Some(3), "log").await?;
48-
let got_least2 = w.log_at_least(2, "log").await?;
49-
let got_least3 = w.log_at_least(3, "log").await?;
50-
let got_least4 = w.log_at_least(4, "log").await;
48+
let got_least2 = w.log_at_least(Some(2), "log").await?;
49+
let got_least3 = w.log_at_least(Some(3), "log").await?;
50+
let got_least4 = w.log_at_least(Some(4), "log").await;
5151
h.await?;
5252

5353
assert_eq!(Some(3), got.last_log_index);

openraft/tests/membership/main.rs

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ mod fixtures;
88
mod t00_learner_restart;
99
mod t10_add_learner;
1010
mod t15_add_remove_follower;
11+
mod t16_change_membership_cases;
12+
// TODO(xp): rename it
1113
mod t20_change_membership;
1214
mod t25_elect_with_new_config;
1315
mod t30_commit_joint_config;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
use std::collections::BTreeSet;
2+
use std::sync::Arc;
3+
use std::time::Duration;
4+
5+
use maplit::btreeset;
6+
use openraft::Config;
7+
use openraft::NodeId;
8+
use openraft::State;
9+
use tracing_futures::Instrument;
10+
11+
use crate::fixtures::RaftRouter;
12+
13+
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
14+
async fn change_membership_cases() -> anyhow::Result<()> {
15+
let (_log_guard, ut_span) = init_ut!();
16+
17+
async {
18+
change_from_to(btreeset! {0}, btreeset! {1}).await?;
19+
change_from_to(btreeset! {0}, btreeset! {1,2}).await?;
20+
change_from_to(btreeset! {0}, btreeset! {1,2, 3}).await?;
21+
change_from_to(btreeset! {0, 1}, btreeset! {1, 2}).await?;
22+
change_from_to(btreeset! {0, 1}, btreeset! {1}).await?;
23+
change_from_to(btreeset! {0, 1}, btreeset! {2}).await?;
24+
change_from_to(btreeset! {0, 1}, btreeset! {3}).await?;
25+
change_from_to(btreeset! {0, 1, 2}, btreeset! {4}).await?;
26+
change_from_to(btreeset! {0, 1, 2}, btreeset! {4,5,6}).await?;
27+
change_from_to(btreeset! {0, 1, 2, 3, 4}, btreeset! {0, 1, 2, 3}).await?;
28+
29+
Ok::<(), anyhow::Error>(())
30+
}
31+
.instrument(ut_span)
32+
.await?;
33+
34+
Ok(())
35+
}
36+
37+
#[tracing::instrument(level = "debug")]
38+
async fn change_from_to(old: BTreeSet<NodeId>, new: BTreeSet<NodeId>) -> anyhow::Result<()> {
39+
let mes = format!("from {:?} to {:?}", old, new);
40+
41+
let only_in_old = old.difference(&new);
42+
let only_in_new = new.difference(&old);
43+
44+
let config = Arc::new(Config::default().validate()?);
45+
let router = Arc::new(RaftRouter::new(config.clone()));
46+
47+
let mut log_index = router.new_nodes_from_single(old.clone(), btreeset! {}).await?;
48+
49+
tracing::info!("--- write 100 logs");
50+
{
51+
router.client_request_many(0, "client", 100).await;
52+
log_index += 100;
53+
54+
router.wait_for_log(&old, Some(log_index), timeout(), &format!("write 100 logs, {}", mes)).await?;
55+
}
56+
57+
// let mtx = router.wait(&0, timeout()).await?.log(Some(0), "get metrics").await?;
58+
// let term_0 = mtx.current_term;
59+
60+
tracing::info!("--- change to {:?}", new);
61+
{
62+
for id in only_in_new {
63+
router.new_raft_node(*id).await;
64+
}
65+
66+
router.change_membership(0, new.clone()).await?;
67+
log_index += 2; // two member-change logs
68+
69+
tracing::info!("--- wait for old leader or new leader");
70+
{
71+
for id in new.iter() {
72+
router
73+
.wait(id, Some(Duration::from_millis(5_000)))
74+
.await?
75+
.metrics(
76+
|x| x.current_leader.is_some() && new.contains(&x.current_leader.unwrap()),
77+
format!("node {} in new cluster has leader in new cluster, {}", id, mes),
78+
)
79+
.await?;
80+
}
81+
}
82+
83+
for id in new.iter() {
84+
// new leader may already elected and committed a blank log.
85+
router
86+
.wait(id, timeout())
87+
.await?
88+
.log_at_least(Some(log_index), format!("new cluster, {}", mes))
89+
.await?;
90+
}
91+
92+
for id in only_in_old.clone() {
93+
router
94+
.wait(id, timeout())
95+
.await?
96+
.state(State::Learner, format!("node {} only in old, {}", id, mes))
97+
.await?;
98+
}
99+
}
100+
101+
tracing::info!("--- write another 100 logs");
102+
{
103+
// get new leader
104+
105+
let m = router
106+
.wait(new.iter().next().unwrap(), timeout())
107+
.await?
108+
.metrics(|x| x.current_leader.is_some(), format!("wait for new leader, {}", mes))
109+
.await?;
110+
111+
let leader = m.current_leader.unwrap();
112+
113+
router.client_request_many(leader, "client", 100).await;
114+
log_index += 100;
115+
}
116+
117+
for id in new.iter() {
118+
router
119+
.wait(id, timeout())
120+
.await?
121+
// new leader may commit a blonk log
122+
.log_at_least(Some(log_index), format!("new cluster recv logs 100~200, {}", mes))
123+
.await?;
124+
}
125+
126+
tracing::info!("--- log will not be sync to removed node");
127+
{
128+
for id in only_in_old {
129+
let res = router
130+
.wait(id, timeout())
131+
.await?
132+
.log(
133+
Some(log_index),
134+
format!("node {} in old cluster wont recv new logs, {}", id, mes),
135+
)
136+
.await;
137+
assert!(res.is_err());
138+
}
139+
}
140+
141+
Ok(())
142+
}
143+
144+
fn timeout() -> Option<Duration> {
145+
Some(Duration::from_millis(500))
146+
}

openraft/tests/membership/t30_step_down.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async fn step_down() -> Result<()> {
5454
router
5555
.wait(&1, timeout())
5656
.await?
57-
.log_at_least(log_index, "node in old cluster commits at least 1 membership log")
57+
.log_at_least(Some(log_index), "node in old cluster commits at least 1 membership log")
5858
.await?;
5959

6060
tracing::info!("--- new cluster commits 2 membership logs");
@@ -67,7 +67,7 @@ async fn step_down() -> Result<()> {
6767
.wait(&id, timeout())
6868
.await?
6969
.log_at_least(
70-
log_index,
70+
Some(log_index),
7171
"node in new cluster finally commit at least one blank leader-initialize log",
7272
)
7373
.await?;

0 commit comments

Comments
 (0)