Skip to content

Commit a68a9a9

Browse files
committed
Change: use term, node_id, index to identify a log entry
Simplify election: in a `term`, it allows to elect more than one leader and the one with greatest `Vote`, i.e., `term, node_id` wins. Only the last established leader is valid. A node is able to grant more than one vote within one term. A greater `Vote` will always be granted. This modification turns the `Vote` from partial order to a total order value. Thus the vote granting is simple as comparing two value. - Change: Vote now is a tuple of `term, node_id` and is compared in dictionary order. Greater Vote wins. There is another field `committed` in `Vote`, which is not necessary to be persisted by storage layer. Saving it won't affect correctness. - Add: LeaderId into LogId, LogId is identified by `term, node_id, index`.
1 parent 1fdfa46 commit a68a9a9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+565
-771
lines changed

memstore/src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,10 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
292292
*l
293293
};
294294

295-
let snapshot_id = format!("{}-{}-{}", last_applied_log.term, last_applied_log.index, snapshot_idx);
295+
let snapshot_id = format!(
296+
"{}-{}-{}",
297+
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
298+
);
296299

297300
let meta = SnapshotMeta {
298301
last_log_id: last_applied_log,

openraft/src/core/admin.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
3838

3939
if self.core.last_log_id.is_some() || self.core.vote.term != 0 {
4040
tracing::error!(
41-
last_log_id=?self.core.last_log_id, %self.core.vote,
41+
last_log_id=?self.core.last_log_id, ?self.core.vote,
4242
"rejecting init_with_config request as last_log_index is not None or current_term is not 0");
4343
return Err(InitializeError::NotAllowed);
4444
}

openraft/src/core/append_entries.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
3131
let msg_entries = req.entries.as_slice();
3232

3333
// Partial order compare: smaller than or incomparable
34-
#[allow(clippy::neg_cmp_op_on_partial_ord)]
35-
if !(req.vote >= self.vote) {
36-
tracing::debug!(%self.vote, %req.vote, "AppendEntries RPC term is less than current term");
34+
if req.vote < self.vote {
35+
tracing::debug!(?self.vote, %req.vote, "AppendEntries RPC term is less than current term");
3736

3837
return Ok(AppendEntriesResponse {
3938
vote: self.vote,

openraft/src/core/client.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::raft::Entry;
2323
use crate::raft::EntryPayload;
2424
use crate::raft::RaftRespTx;
2525
use crate::replication::RaftEvent;
26-
use crate::vote::Vote;
2726
use crate::AppData;
2827
use crate::AppDataResponse;
2928
use crate::MessageSummary;
@@ -157,8 +156,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
157156
};
158157

159158
// If we receive a response with a greater term, then revert to follower and abort this request.
160-
if data.vote.term != self.core.vote.term {
161-
self.core.vote = Vote::new_uncommitted(data.vote.term, None);
159+
if data.vote > self.core.vote {
160+
self.core.vote = data.vote;
162161
// TODO(xp): deal with storage error
163162
self.core.save_vote().await.unwrap();
164163
// TODO(xp): if receives error about a higher term, it should stop at once?

openraft/src/core/install_snapshot.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
3535
&mut self,
3636
req: InstallSnapshotRequest,
3737
) -> Result<InstallSnapshotResponse, InstallSnapshotError> {
38-
#[allow(clippy::neg_cmp_op_on_partial_ord)]
39-
if !(req.vote >= self.vote) {
40-
tracing::debug!(%self.vote, %req.vote, "InstallSnapshot RPC term is less than current term");
38+
if req.vote < self.vote {
39+
tracing::debug!(?self.vote, %req.vote, "InstallSnapshot RPC term is less than current term");
4140

4241
return Ok(InstallSnapshotResponse { vote: self.vote });
4342
}

openraft/src/core/mod.rs

+22-20
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use crate::replication::ReplicationStream;
5555
use crate::vote::Vote;
5656
use crate::AppData;
5757
use crate::AppDataResponse;
58+
use crate::LeaderId;
5859
use crate::LogId;
5960
use crate::Membership;
6061
use crate::MessageSummary;
@@ -82,7 +83,8 @@ pub struct EffectiveMembership {
8283
impl EffectiveMembership {
8384
pub fn new_initial(node_id: u64) -> Self {
8485
EffectiveMembership {
85-
log_id: LogId::new(0, 0),
86+
// TODO(xp): avoid using Vote::default()
87+
log_id: LogId::new(LeaderId::default(), 0),
8688
membership: Membership::new_initial(node_id),
8789
}
8890
}
@@ -182,7 +184,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
182184
target_state: State::Follower,
183185
committed: None,
184186
last_applied: None,
185-
vote: Vote::new_uncommitted(0, None),
187+
vote: Vote::default(),
186188
last_log_id: None,
187189
snapshot_state: None,
188190
snapshot_last_log_id: None,
@@ -519,7 +521,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
519521

520522
#[tracing::instrument(level = "debug", skip(self, payload))]
521523
pub(super) async fn append_payload_to_log(&mut self, payload: EntryPayload<D>) -> Result<Entry<D>, StorageError> {
522-
let log_id = LogId::new(self.vote.term, self.last_log_id.next_index());
524+
let log_id = LogId::new(self.vote.leader_id(), self.last_log_id.next_index());
523525

524526
let entry = Entry { log_id, payload };
525527
self.storage.append_to_log(&[&entry]).await?;
@@ -539,20 +541,20 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
539541

540542
#[tracing::instrument(level = "debug", skip(self))]
541543
pub fn current_leader(&self) -> Option<NodeId> {
542-
let l = self.vote.leader();
543-
544-
if let Some(id) = l {
545-
if id == self.id {
546-
if self.target_state == State::Leader {
547-
Some(id)
548-
} else {
549-
None
550-
}
551-
} else {
544+
if !self.vote.committed {
545+
return None;
546+
}
547+
548+
let id = self.vote.node_id;
549+
550+
if id == self.id {
551+
if self.target_state == State::Leader {
552552
Some(id)
553+
} else {
554+
None
553555
}
554556
} else {
555-
None
557+
Some(id)
556558
}
557559
}
558560
}
@@ -724,6 +726,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
724726
/// Transition to the Raft leader state.
725727
#[tracing::instrument(level="debug", skip(self), fields(id=self.core.id, raft_state="leader"))]
726728
pub(self) async fn run(mut self) -> Result<(), Fatal> {
729+
// Setup state as leader.
730+
self.core.last_heartbeat = None;
731+
self.core.next_election_timeout = None;
732+
self.core.vote.commit();
733+
727734
// Spawn replication streams.
728735
let targets = self
729736
.core
@@ -746,11 +753,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
746753
self.nodes.insert(*node_id, state);
747754
}
748755

749-
// Setup state as leader.
750-
self.core.last_heartbeat = None;
751-
self.core.next_election_timeout = None;
752-
self.core.vote = Vote::new_committed(self.core.vote.term, self.core.id);
753-
754756
self.leader_report_metrics();
755757

756758
self.commit_initial_leader_entry().await?;
@@ -904,7 +906,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
904906
// Setup new term.
905907
self.core.update_next_election_timeout(false); // Generates a new rand value within range.
906908

907-
self.core.vote = Vote::new_uncommitted(self.core.vote.term + 1, Some(self.core.id));
909+
self.core.vote = Vote::new(self.core.vote.term + 1, self.core.id);
908910

909911
self.core.save_vote().await?;
910912
self.core.report_metrics(Update::Update(None));

openraft/src/core/replication.rs

+8-9
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
3535
caller_tx: Option<RaftRespTx<AddLearnerResponse, AddLearnerError>>,
3636
) -> ReplicationState {
3737
let repl_stream = ReplicationStream::new(
38-
self.core.id,
3938
target,
40-
self.core.vote.term,
39+
self.core.vote,
4140
self.core.config.clone(),
4241
self.core.last_log_id,
4342
self.core.committed,
@@ -60,8 +59,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
6059
event: ReplicaEvent<S::SnapshotData>,
6160
) -> Result<(), StorageError> {
6261
match event {
63-
ReplicaEvent::RevertToFollower { target, term } => {
64-
self.handle_revert_to_follower(target, term).await?;
62+
ReplicaEvent::RevertToFollower { target, vote } => {
63+
self.handle_revert_to_follower(target, vote).await?;
6564
}
6665
ReplicaEvent::UpdateMatched { target, matched } => {
6766
self.handle_update_matched(target, matched).await?;
@@ -82,10 +81,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
8281
}
8382

8483
/// Handle events from replication streams for when this node needs to revert to follower state.
85-
#[tracing::instrument(level = "trace", skip(self, term))]
86-
async fn handle_revert_to_follower(&mut self, _: NodeId, term: u64) -> Result<(), StorageError> {
87-
if term > self.core.vote.term {
88-
self.core.vote = Vote::new_uncommitted(term, None);
84+
#[tracing::instrument(level = "trace", skip(self))]
85+
async fn handle_revert_to_follower(&mut self, _: NodeId, vote: Vote) -> Result<(), StorageError> {
86+
if vote > self.core.vote {
87+
self.core.vote = vote;
8988
self.core.save_vote().await?;
9089
self.core.set_target_state(State::Follower);
9190
}
@@ -208,7 +207,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
208207
// and that new leader may overrides any lower term logs.
209208
// Thus it is not considered as committed.
210209
if let Some(log_id) = matched {
211-
if log_id.term == self.core.vote.term {
210+
if log_id.leader_id == self.core.vote.leader_id() {
212211
res.insert(*id, log_id);
213212
}
214213
}

openraft/src/core/replication_state_test.rs

+23-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use crate::core::is_matched_upto_date;
22
use crate::Config;
3+
use crate::LeaderId;
34
use crate::LogId;
45

56
#[test]
67
fn test_is_line_rate() -> anyhow::Result<()> {
7-
let m = Some(LogId { term: 1, index: 10 });
8+
let m = Some(LogId::new(LeaderId::new(1, 0), 10));
89

910
let cfg = |n| Config {
1011
replication_lag_threshold: n,
@@ -13,33 +14,47 @@ fn test_is_line_rate() -> anyhow::Result<()> {
1314

1415
assert!(is_matched_upto_date(&None, &None, &cfg(0)), "matched, threshold=0");
1516
assert!(
16-
is_matched_upto_date(&None, &Some(LogId { term: 2, index: 0 }), &cfg(1)),
17+
is_matched_upto_date(
18+
&None,
19+
&Some(LogId {
20+
leader_id: LeaderId::new(2, 0),
21+
index: 0
22+
}),
23+
&cfg(1)
24+
),
1725
"matched, threshold=1"
1826
);
1927
assert!(
20-
!is_matched_upto_date(&None, &Some(LogId { term: 2, index: 0 }), &cfg(0)),
28+
!is_matched_upto_date(
29+
&None,
30+
&Some(LogId {
31+
leader_id: LeaderId::new(2, 0),
32+
index: 0
33+
}),
34+
&cfg(0)
35+
),
2136
"not matched, threshold=1"
2237
);
2338

2439
assert!(
25-
is_matched_upto_date(&Some(LogId::new(0, 0)), &None, &cfg(0)),
40+
is_matched_upto_date(&Some(LogId::new(LeaderId::new(0, 0), 0)), &None, &cfg(0)),
2641
"matched, threshold=0"
2742
);
2843

2944
assert!(
30-
is_matched_upto_date(&m, &Some(LogId { term: 2, index: 10 }), &cfg(0)),
45+
is_matched_upto_date(&m, &Some(LogId::new(LeaderId::new(2, 0), 10)), &cfg(0)),
3146
"matched, threshold=0"
3247
);
3348
assert!(
34-
is_matched_upto_date(&m, &Some(LogId { term: 2, index: 9 }), &cfg(0)),
49+
is_matched_upto_date(&m, &Some(LogId::new(LeaderId::new(2, 0), 9)), &cfg(0)),
3550
"overflow, threshold=0"
3651
);
3752
assert!(
38-
!is_matched_upto_date(&m, &Some(LogId { term: 2, index: 11 }), &cfg(0)),
53+
!is_matched_upto_date(&m, &Some(LogId::new(LeaderId::new(2, 0), 11)), &cfg(0)),
3954
"not caught up, threshold=0"
4055
);
4156
assert!(
42-
is_matched_upto_date(&m, &Some(LogId { term: 2, index: 11 }), &cfg(1)),
57+
is_matched_upto_date(&m, &Some(LogId::new(LeaderId::new(2, 0), 11)), &cfg(1)),
4358
"caught up, threshold=1"
4459
);
4560
Ok(())

openraft/src/core/vote.rs

+6-17
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::error::VoteError;
99
use crate::raft::VoteRequest;
1010
use crate::raft::VoteResponse;
1111
use crate::summary::MessageSummary;
12-
use crate::vote::Vote;
1312
use crate::AppData;
1413
use crate::AppDataResponse;
1514
use crate::NodeId;
@@ -25,16 +24,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
2524
pub(super) async fn handle_vote_request(&mut self, req: VoteRequest) -> Result<VoteResponse, VoteError> {
2625
tracing::debug!(
2726
%req.vote,
28-
%self.vote,
27+
?self.vote,
2928
"start handle_vote_request"
3029
);
3130
let last_log_id = self.last_log_id;
3231

33-
#[allow(clippy::neg_cmp_op_on_partial_ord)]
34-
if !(req.vote >= self.vote) {
32+
if req.vote < self.vote {
3533
tracing::debug!(
3634
%req.vote,
37-
%self.vote,
35+
?self.vote,
3836
"RequestVote RPC term is less than current term"
3937
);
4038
return Ok(VoteResponse {
@@ -62,15 +60,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
6260
}
6361
}
6462

65-
// Always save a higher term
66-
if req.vote.term > self.vote.term {
67-
self.update_next_election_timeout(false);
68-
self.vote = Vote::new_uncommitted(req.vote.term, None);
69-
self.save_vote().await?;
70-
71-
self.set_target_state(State::Follower);
72-
}
73-
7463
// Check if candidate's log is at least as up-to-date as this node's.
7564
// If candidate's log is not at least as up-to-date as this node, then reject.
7665
if req.last_log_id < last_log_id {
@@ -107,8 +96,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
10796
pub(super) async fn handle_vote_response(&mut self, res: VoteResponse, target: NodeId) -> Result<(), StorageError> {
10897
// If peer's term is greater than current term, revert to follower state.
10998

110-
if res.vote.term > self.core.vote.term {
111-
self.core.vote = Vote::new_uncommitted(res.vote.term, None);
99+
if res.vote > self.core.vote {
100+
self.core.vote = res.vote;
112101
self.core.save_vote().await?;
113102

114103
// If a quorum of nodes have higher `last_log_id`, I have no chance to become a leader.
@@ -121,7 +110,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
121110
} else {
122111
tracing::debug!(
123112
id = %self.core.id,
124-
%self.core.vote,
113+
?self.core.vote,
125114
%res.vote,
126115
self_last_log_id=?self.core.last_log_id,
127116
res_last_log_id=?res.last_log_id,

openraft/src/error.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::LogId;
1414
use crate::NodeId;
1515
use crate::RPCTypes;
1616
use crate::StorageError;
17+
use crate::Vote;
1718

1819
/// Fatal is unrecoverable and shuts down raft at once.
1920
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)]
@@ -179,7 +180,7 @@ impl From<StorageError> for AddLearnerError {
179180
#[allow(clippy::large_enum_variant)]
180181
pub enum ReplicationError {
181182
#[error(transparent)]
182-
HigherTerm(#[from] HigherTerm),
183+
HigherVote(#[from] HigherVote),
183184

184185
#[error("Replication is closed")]
185186
Closed,
@@ -231,10 +232,10 @@ impl<T: std::error::Error> RemoteError<T> {
231232
}
232233

233234
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
234-
#[error("seen a higher term: {higher} GT mine: {mine}")]
235-
pub struct HigherTerm {
236-
pub higher: u64,
237-
pub mine: u64,
235+
#[error("seen a higher vote: {higher} GT mine: {mine}")]
236+
pub struct HigherVote {
237+
pub higher: Vote,
238+
pub mine: Vote,
238239
}
239240

240241
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]

openraft/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub use crate::storage_error::Violation;
5858
pub use crate::store_ext::StoreExt;
5959
pub use crate::store_wrapper::Wrapper;
6060
pub use crate::summary::MessageSummary;
61-
pub use crate::vote::CommittedState;
61+
pub use crate::vote::LeaderId;
6262
pub use crate::vote::Vote;
6363

6464
/// A Raft node's ID.

0 commit comments

Comments
 (0)