Skip to content

Commit 43dd8b6

Browse files
committed
Fix: when leader reverts to follower, send error to waiting clients
When a leader reverts to follower, e.g., if a higher vote is seen, it should inform waiting clients that leadership is lost. - Let Engine deals with vote change event. - Add test for reverting to follower behavior.
1 parent 088cc1f commit 43dd8b6

File tree

6 files changed

+122
-42
lines changed

6 files changed

+122
-42
lines changed

openraft/src/core/raft_core.rs

+22-25
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
266266
const IS_VOTER: bool = true;
267267
const IS_LEARNER: bool = false;
268268

269+
// TODO: these part can be removed. the loop does not depend on server state
269270
self.engine.state.server_state = match (has_log, single, is_voter) {
270271
// A restarted raft that already received some logs but was not yet added to a cluster.
271272
// It should remain in Learner state, not Follower.
@@ -288,12 +289,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
288289
(NO_LOG, MULTI, IS_VOTER) => ServerState::Follower, // impossible: no logs but there are other members.
289290
};
290291

291-
if self.engine.state.server_state == ServerState::Follower
292-
|| self.engine.state.server_state == ServerState::Candidate
293-
{
294-
// To ensure that restarted nodes don't disrupt a stable cluster.
295-
self.set_next_election_time(false);
296-
}
292+
// To ensure that restarted nodes don't disrupt a stable cluster.
293+
self.set_next_election_time(false);
297294

298295
tracing::debug!("id={} target_state: {:?}", self.id, self.engine.state.server_state);
299296

@@ -1380,9 +1377,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
13801377
}
13811378
}
13821379

1383-
RaftMsg::RevertToFollower { target, new_vote, vote } => {
1384-
if self.does_vote_match(vote, "RevertToFollower") {
1385-
self.handle_revert_to_follower(target, new_vote).await?;
1380+
RaftMsg::HigherVote {
1381+
target: _,
1382+
higher,
1383+
vote,
1384+
} => {
1385+
if self.does_vote_match(vote, "HigherVote") {
1386+
// Rejected vote change is ok.
1387+
let _ = self.engine.handle_vote_change(&higher);
1388+
self.run_engine_commands::<Entry<C>>(&[]).await?;
13861389
}
13871390
}
13881391

@@ -1418,22 +1421,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
14181421
Ok(())
14191422
}
14201423

1421-
/// Handle events from replication streams for when this node needs to revert to follower state.
1422-
#[tracing::instrument(level = "trace", skip(self))]
1423-
async fn handle_revert_to_follower(
1424-
&mut self,
1425-
_target: C::NodeId,
1426-
vote: Vote<C::NodeId>,
1427-
) -> Result<(), StorageError<C::NodeId>> {
1428-
if vote > self.engine.state.vote {
1429-
self.engine.state.vote = vote;
1430-
self.save_vote().await?;
1431-
// TODO: when switching to Follower, the next election time has to be set.
1432-
self.set_target_state(ServerState::Follower);
1433-
}
1434-
Ok(())
1435-
}
1436-
14371424
#[tracing::instrument(level = "debug", skip_all)]
14381425
async fn handle_update_matched(
14391426
&mut self,
@@ -1591,6 +1578,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
15911578
debug_assert!(self.leader_data.is_none(), "can not become leader twice");
15921579
self.leader_data = Some(LeaderData::new());
15931580
} else {
1581+
if let Some(l) = &mut self.leader_data {
1582+
// Leadership lost, inform waiting clients
1583+
let chans = std::mem::take(&mut l.client_resp_channels);
1584+
for (_, tx) in chans.into_iter() {
1585+
let _ = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
1586+
leader_id: None,
1587+
leader_node: None,
1588+
})));
1589+
}
1590+
}
15941591
self.leader_data = None;
15951592
}
15961593
}

openraft/src/engine/engine_impl.rs

+9-7
Original file line numberDiff line numberDiff line change
@@ -752,16 +752,10 @@ where
752752

753753
// --- Draft API ---
754754

755-
// // --- app API ---
756-
//
757-
// /// Write a new log entry.
758-
// pub(crate) fn write(&mut self) -> Result<Vec<AlgoCmd<NID>>, ForwardToLeader<NID>> {}
759-
//
760755
// // --- raft protocol API ---
761756
//
762757
// pub(crate) fn handle_install_snapshot() {}
763758
//
764-
// pub(crate) fn handle_append_entries_resp() {}
765759
// pub(crate) fn handle_install_snapshot_resp() {}
766760
}
767761

@@ -788,6 +782,10 @@ where
788782
/// Leader state has two phase: election phase and replication phase, similar to paxos phase-1 and phase-2
789783
pub(crate) fn enter_leading(&mut self) {
790784
debug_assert_eq!(self.state.vote.node_id, self.id);
785+
// debug_assert!(
786+
// self.state.internal_server_state.is_following(),
787+
// "can not enter leading twice"
788+
// );
791789

792790
self.state.new_leader();
793791
}
@@ -803,6 +801,11 @@ where
803801
// This way it holds that 'vote.node_id != self.id <=> following state`.
804802
// debug_assert_ne!(self.state.vote.node_id, self.id);
805803

804+
// debug_assert!(
805+
// self.state.internal_server_state.is_leading(),
806+
// "can not enter following twice"
807+
// );
808+
806809
let vote = &self.state.vote;
807810

808811
if vote.committed {
@@ -1108,7 +1111,6 @@ where
11081111

11091112
/// The node is candidate or leader
11101113
fn is_becoming_leader(&self) -> bool {
1111-
// self.state.vote.node_id == self.id
11121114
self.state.internal_server_state.is_leading()
11131115
}
11141116

openraft/src/raft.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
591591
/// Invoke RaftCore by sending a RaftMsg and blocks waiting for response.
592592
#[tracing::instrument(level = "debug", skip(self, mes, rx))]
593593
pub(crate) async fn call_core<T, E>(&self, mes: RaftMsg<C, N, S>, rx: RaftRespRx<T, E>) -> Result<T, E>
594-
where E: From<Fatal<C::NodeId>> {
594+
where E: From<Fatal<C::NodeId>> + Debug {
595595
let sum = if tracing::enabled!(Level::DEBUG) {
596596
None
597597
} else {
@@ -606,11 +606,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
606606
}
607607

608608
let recv_res = rx.await;
609+
tracing::debug!("call_core receives result is error: {:?}", recv_res.is_err());
609610

610611
match recv_res {
611612
Ok(x) => x,
612613
Err(_) => {
613614
let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await;
615+
tracing::error!(error = debug(&fatal), "core_call fatal error");
614616
Err(fatal.into())
615617
}
616618
}
@@ -645,6 +647,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
645647
}
646648

647649
/// Wait for RaftCore task to finish and record the returned value from the task.
650+
#[tracing::instrument(level = "debug", skip_all)]
648651
async fn join_core_task(&self) {
649652
let mut state = self.inner.core_state.lock().await;
650653
match &mut *state {
@@ -861,15 +864,14 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
861864
membership_log_id: Option<LogId<C::NodeId>>,
862865
},
863866

864-
/// An event indicating that the Raft node needs to revert to follower state.
867+
/// ReplicationCore has seen a higher `vote`.
865868
/// Sent by a replication task `ReplicationCore`.
866-
// TODO: rename it
867-
RevertToFollower {
869+
HigherVote {
868870
/// The ID of the target node from which the new term was observed.
869871
target: C::NodeId,
870872

871-
/// The new vote observed.
872-
new_vote: Vote<C::NodeId>,
873+
/// The higher vote observed.
874+
higher: Vote<C::NodeId>,
873875

874876
/// Which ServerState sent this message
875877
vote: Vote<C::NodeId>,
@@ -960,9 +962,9 @@ where
960962
membership_log_id.summary()
961963
)
962964
}
963-
RaftMsg::RevertToFollower {
965+
RaftMsg::HigherVote {
964966
ref target,
965-
ref new_vote,
967+
higher: ref new_vote,
966968
ref vote,
967969
} => {
968970
format!(

openraft/src/replication/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
199199
return;
200200
}
201201
ReplicationError::HigherVote(h) => {
202-
let _ = self.raft_core_tx.send(RaftMsg::RevertToFollower {
202+
let _ = self.raft_core_tx.send(RaftMsg::HigherVote {
203203
target: self.target,
204-
new_vote: h.higher,
204+
higher: h.higher,
205205
vote: self.vote,
206206
});
207207
return;

openraft/tests/append_entries/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod fixtures;
88
// The later tests may depend on the earlier ones.
99

1010
mod t10_conflict_with_empty_entries;
11+
mod t10_see_higher_vote;
1112
mod t20_append_conflicts;
1213
mod t30_append_inconsistent_log;
1314
mod t40_append_updates_membership;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
4+
use anyhow::Result;
5+
use maplit::btreeset;
6+
use memstore::ClientRequest;
7+
use openraft::raft::VoteRequest;
8+
use openraft::BasicNode;
9+
use openraft::Config;
10+
use openraft::LeaderId;
11+
use openraft::LogId;
12+
use openraft::RaftNetwork;
13+
use openraft::RaftNetworkFactory;
14+
use openraft::ServerState;
15+
use openraft::Vote;
16+
17+
use crate::fixtures::init_default_ut_tracing;
18+
use crate::fixtures::RaftRouter;
19+
20+
/// A leader reverts to follower if a higher vote is seen when append-entries.
21+
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
22+
async fn append_sees_higher_vote() -> Result<()> {
23+
let config = Arc::new(
24+
Config {
25+
enable_heartbeat: false,
26+
enable_elect: false,
27+
..Default::default()
28+
}
29+
.validate()?,
30+
);
31+
32+
let mut router = RaftRouter::new(config.clone());
33+
34+
let _log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {}).await?;
35+
36+
tracing::info!("--- upgrade vote on node-1");
37+
{
38+
router
39+
.connect(1, &BasicNode::default())
40+
.await?
41+
.send_vote(VoteRequest {
42+
vote: Vote::new(10, 1),
43+
last_log_id: Some(LogId::new(LeaderId::new(10, 1), 5)),
44+
})
45+
.await?;
46+
}
47+
48+
tracing::info!("--- a write operation will see a higher vote, then the leader revert to follower");
49+
{
50+
router.wait(&0, timeout()).state(ServerState::Leader, "node-0 is leader").await?;
51+
52+
let n0 = router.get_raft_handle(&0)?;
53+
let res = n0
54+
.client_write(ClientRequest {
55+
client: "0".to_string(),
56+
serial: 1,
57+
status: "2".to_string(),
58+
})
59+
.await;
60+
61+
tracing::debug!("--- client_write res: {:?}", res);
62+
63+
router
64+
.wait(&0, timeout())
65+
.state(ServerState::Follower, "node-0 becomes follower due to a higher vote")
66+
.await?;
67+
68+
router.external_request(0, |st, _, _| {
69+
assert_eq!(Vote::new(10, 1), st.vote, "higher vote is stored");
70+
});
71+
}
72+
73+
Ok(())
74+
}
75+
76+
fn timeout() -> Option<Duration> {
77+
Some(Duration::from_millis(1_000))
78+
}

0 commit comments

Comments
 (0)