Skip to content

Commit 58f2491

Browse files
committed
Change: use Vote to define Leader and Candidate
`Vote` is a similar concept to paxos proposer-id. It is defined by a tuple of `(term, uncommitted|committed, node_id)`. A Candidate creates an uncommitted `Vote` to identify it, as a replacement of `(term, candidate_id)`. A Leader has a `Vote` that is committed, i.e., the vote is granted by a quorum. The rule for overriding a `Vote` is defined by a **partially ordered** relation: a node is only allowed to grant a greater vote. The partial order relation covers all behavior of the `vote` in raft spec. This way wee make the test very easy to be done. With `Vote`, checking validity of a request is quite simple: just by `self.vote <= rpc.vote`. - Rename: save_hard_state() and read_hard_state() to save_vote() and read_vote(). - Replace `term, node_id` pair with `Vote` in RaftCore and RPC struct-s. - Fix: request handler for append-entries and install-snapshot should save the vote they received.
1 parent 9a8b750 commit 58f2491

32 files changed

+576
-439
lines changed

guide/src/getting-started.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ or a wrapper of a remote sql DB.
5050

5151
- Read/write raft state, e.g., term or vote.
5252
```rust
53-
fn save_hard_state(hs:&HardState)
54-
fn read_hard_state() -> Result<Option<HardState>>
53+
fn save_vote(vote:&Vote)
54+
fn read_vote() -> Result<Option<Vote>>
5555
```
5656

5757
- Read/write logs.

memstore/src/lib.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use anyerror::AnyError;
1515
use openraft::async_trait::async_trait;
1616
use openraft::raft::Entry;
1717
use openraft::raft::EntryPayload;
18-
use openraft::storage::HardState;
1918
use openraft::storage::LogState;
2019
use openraft::storage::Snapshot;
2120
use openraft::AppData;
@@ -30,6 +29,7 @@ use openraft::SnapshotMeta;
3029
use openraft::StateMachineChanges;
3130
use openraft::StorageError;
3231
use openraft::StorageIOError;
32+
use openraft::Vote;
3333
use serde::Deserialize;
3434
use serde::Serialize;
3535
use tokio::sync::RwLock;
@@ -93,7 +93,7 @@ pub struct MemStore {
9393
sm: RwLock<MemStoreStateMachine>,
9494

9595
/// The current hard state.
96-
hs: RwLock<Option<HardState>>,
96+
vote: RwLock<Option<Vote>>,
9797

9898
snapshot_idx: Arc<Mutex<u64>>,
9999

@@ -106,14 +106,13 @@ impl MemStore {
106106
pub async fn new() -> Self {
107107
let log = RwLock::new(BTreeMap::new());
108108
let sm = RwLock::new(MemStoreStateMachine::default());
109-
let hs = RwLock::new(None);
110109
let current_snapshot = RwLock::new(None);
111110

112111
Self {
113112
last_purged_log_id: RwLock::new(None),
114113
log,
115114
sm,
116-
hs,
115+
vote: RwLock::new(None),
117116
snapshot_idx: Arc::new(Mutex::new(0)),
118117
current_snapshot,
119118
}
@@ -133,16 +132,16 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
133132
type SnapshotData = Cursor<Vec<u8>>;
134133

135134
#[tracing::instrument(level = "trace", skip(self))]
136-
async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError> {
137-
tracing::debug!(?hs, "save_hard_state");
138-
let mut h = self.hs.write().await;
135+
async fn save_vote(&self, vote: &Vote) -> Result<(), StorageError> {
136+
tracing::debug!(?vote, "save_vote");
137+
let mut h = self.vote.write().await;
139138

140-
*h = Some(hs.clone());
139+
*h = Some(*vote);
141140
Ok(())
142141
}
143142

144-
async fn read_hard_state(&self) -> Result<Option<HardState>, StorageError> {
145-
Ok(self.hs.read().await.clone())
143+
async fn read_vote(&self) -> Result<Option<Vote>, StorageError> {
144+
Ok(*self.vote.read().await)
146145
}
147146

148147
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(

openraft/src/core/admin.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
3636
) -> Result<(), InitializeError> {
3737
// TODO(xp): simplify this condition
3838

39-
if self.core.last_log_id.is_some() || self.core.current_term != 0 {
39+
if self.core.last_log_id.is_some() || self.core.vote.term != 0 {
4040
tracing::error!(
41-
last_log_id=?self.core.last_log_id, self.core.current_term,
41+
last_log_id=?self.core.last_log_id, %self.core.vote,
4242
"rejecting init_with_config request as last_log_index is not None or current_term is not 0");
4343
return Err(InitializeError::NotAllowed);
4444
}
@@ -249,7 +249,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
249249

250250
// TODO(xp): transfer leadership
251251
self.core.set_target_state(State::Learner);
252-
self.core.current_leader = None;
253252
return;
254253
}
255254

openraft/src/core/append_entries.rs

+25-50
Original file line numberDiff line numberDiff line change
@@ -21,72 +21,47 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
2121
/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
2222
///
2323
/// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo.
24-
#[tracing::instrument(level = "debug", skip(self, msg))]
24+
#[tracing::instrument(level = "debug", skip(self, req))]
2525
pub(super) async fn handle_append_entries_request(
2626
&mut self,
27-
msg: AppendEntriesRequest<D>,
27+
req: AppendEntriesRequest<D>,
2828
) -> Result<AppendEntriesResponse, AppendEntriesError> {
29-
tracing::debug!(last_log_id=?self.last_log_id, ?self.last_applied, msg=%msg.summary(), "handle_append_entries_request");
29+
tracing::debug!(last_log_id=?self.last_log_id, ?self.last_applied, msg=%req.summary(), "handle_append_entries_request");
3030

31-
let msg_entries = msg.entries.as_slice();
31+
let msg_entries = req.entries.as_slice();
32+
33+
// Partial order compare: smaller than or incomparable
34+
#[allow(clippy::neg_cmp_op_on_partial_ord)]
35+
if !(req.vote >= self.vote) {
36+
tracing::debug!(%self.vote, %req.vote, "AppendEntries RPC term is less than current term");
3237

33-
// If message's term is less than most recent term, then we do not honor the request.
34-
if msg.term < self.current_term {
35-
tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term");
3638
return Ok(AppendEntriesResponse {
37-
term: self.current_term,
39+
vote: self.vote,
3840
success: false,
3941
conflict: false,
4042
});
4143
}
4244

4345
self.update_next_election_timeout(true);
4446

45-
// Caveat: Because we can not just delete `log[prev_log_id.index..]`, (which results in loss of committed
46-
// entry), the commit index must be update only after append-entries
47-
// and must point to a log entry that is consistent to leader.
48-
// Or there would be chance applying an uncommitted entry:
49-
//
50-
// ```
51-
// R0 1,1 1,2 3,3
52-
// R1 1,1 1,2 2,3
53-
// R2 1,1 1,2 3,3
54-
// ```
55-
//
56-
// - R0 to R1 append_entries: entries=[{1,2}], prev_log_id = {1,1}, commit_index = 3
57-
// - R1 accepted this append_entries request but was not aware of that entry {2,3} is inconsistent to leader.
58-
// Then it will update commit_index to 3 and apply {2,3}
59-
60-
// TODO(xp): cleanup commit index at sender side.
61-
let valid_commit_index = msg_entries.last().map(|x| Some(x.log_id)).unwrap_or_else(|| msg.prev_log_id);
62-
let valid_committed = std::cmp::min(msg.leader_commit, valid_commit_index);
63-
6447
tracing::debug!("start to check and update to latest term/leader");
65-
{
66-
let mut report_metrics = false;
67-
68-
if msg.term > self.current_term {
69-
self.update_current_term(msg.term, Some(msg.leader_id));
70-
self.save_hard_state().await?;
71-
report_metrics = true;
72-
}
48+
if req.vote > self.vote {
49+
self.vote = req.vote;
50+
self.save_vote().await?;
7351

74-
// Update current leader if needed.
75-
if self.current_leader != Some(msg.leader_id) {
76-
report_metrics = true;
52+
// If not follower, become follower.
53+
if !self.target_state.is_follower() && !self.target_state.is_learner() {
54+
self.set_target_state(State::Follower); // State update will emit metrics.
7755
}
7856

79-
self.current_leader = Some(msg.leader_id);
80-
81-
if report_metrics {
82-
self.report_metrics(Update::AsIs);
83-
}
57+
self.report_metrics(Update::AsIs);
8458
}
8559

86-
// Transition to follower state if needed.
87-
if !self.target_state.is_follower() && !self.target_state.is_learner() {
88-
self.set_target_state(State::Follower);
89-
}
60+
// Caveat: [commit-index must not advance the last known consistent log](https://datafuselabs.github.io/openraft/replication.html#caveat-commit-index-must-not-advance-the-last-known-consistent-log)
61+
62+
// TODO(xp): cleanup commit index at sender side.
63+
let valid_commit_index = msg_entries.last().map(|x| Some(x.log_id)).unwrap_or_else(|| req.prev_log_id);
64+
let valid_committed = std::cmp::min(req.leader_commit, valid_commit_index);
9065

9166
tracing::debug!("begin log consistency check");
9267

@@ -95,7 +70,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
9570
// +----------------+------------------------+
9671
// ` 0 ` last_applied ` last_log_id
9772

98-
let res = self.append_apply_log_entries(msg.prev_log_id, msg_entries, valid_committed).await?;
73+
let res = self.append_apply_log_entries(req.prev_log_id, msg_entries, valid_committed).await?;
9974

10075
Ok(res)
10176
}
@@ -220,7 +195,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
220195
}
221196

222197
return Ok(AppendEntriesResponse {
223-
term: self.current_term,
198+
vote: self.vote,
224199
success: false,
225200
conflict: true,
226201
});
@@ -252,7 +227,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
252227
self.report_metrics(Update::AsIs);
253228

254229
Ok(AppendEntriesResponse {
255-
term: self.current_term,
230+
vote: self.vote,
256231
success: true,
257232
conflict: false,
258233
})

openraft/src/core/client.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::raft::Entry;
2323
use crate::raft::EntryPayload;
2424
use crate::raft::RaftRespTx;
2525
use crate::replication::RaftEvent;
26+
use crate::vote::Vote;
2627
use crate::AppData;
2728
use crate::AppDataResponse;
2829
use crate::MessageSummary;
@@ -101,8 +102,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
101102
}
102103

103104
let rpc = AppendEntriesRequest {
104-
term: self.core.current_term,
105-
leader_id: self.core.id,
105+
vote: self.core.vote,
106106
prev_log_id: node.matched,
107107
entries: vec![],
108108
leader_commit: self.core.committed,
@@ -157,8 +157,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
157157
};
158158

159159
// If we receive a response with a greater term, then revert to follower and abort this request.
160-
if data.term != self.core.current_term {
161-
self.core.update_current_term(data.term, None);
160+
if data.vote.term != self.core.vote.term {
161+
self.core.vote = Vote::new_uncommitted(data.vote.term, None);
162+
// TODO(xp): deal with storage error
163+
self.core.save_vote().await.unwrap();
162164
// TODO(xp): if receives error about a higher term, it should stop at once?
163165
self.core.set_target_state(State::Follower);
164166
}

openraft/src/core/install_snapshot.rs

+15-33
Original file line numberDiff line numberDiff line change
@@ -35,37 +35,25 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
3535
&mut self,
3636
req: InstallSnapshotRequest,
3737
) -> Result<InstallSnapshotResponse, InstallSnapshotError> {
38-
// If message's term is less than most recent term, then we do not honor the request.
39-
if req.term < self.current_term {
40-
return Ok(InstallSnapshotResponse {
41-
term: self.current_term,
42-
});
38+
#[allow(clippy::neg_cmp_op_on_partial_ord)]
39+
if !(req.vote >= self.vote) {
40+
tracing::debug!(%self.vote, %req.vote, "InstallSnapshot RPC term is less than current term");
41+
42+
return Ok(InstallSnapshotResponse { vote: self.vote });
4343
}
4444

4545
// Update election timeout.
4646
self.update_next_election_timeout(true);
4747

48-
// Update current term if needed.
49-
let mut report_metrics = false;
50-
if self.current_term != req.term {
51-
self.update_current_term(req.term, None);
52-
self.save_hard_state().await?;
53-
report_metrics = true;
54-
}
55-
56-
// Update current leader if needed.
57-
if self.current_leader != Some(req.leader_id) {
58-
report_metrics = true;
59-
}
48+
if req.vote > self.vote {
49+
self.vote = req.vote;
50+
self.save_vote().await?;
6051

61-
self.current_leader = Some(req.leader_id);
62-
63-
// If not follower, become follower.
64-
if !self.target_state.is_follower() && !self.target_state.is_learner() {
65-
self.set_target_state(State::Follower); // State update will emit metrics.
66-
}
52+
// If not follower, become follower.
53+
if !self.target_state.is_follower() && !self.target_state.is_learner() {
54+
self.set_target_state(State::Follower); // State update will emit metrics.
55+
}
6756

68-
if report_metrics {
6957
self.report_metrics(Update::AsIs);
7058
}
7159

@@ -134,9 +122,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
134122
// If this was a small snapshot, and it is already done, then finish up.
135123
if req.done {
136124
self.finalize_snapshot_installation(req, snapshot).await?;
137-
return Ok(InstallSnapshotResponse {
138-
term: self.current_term,
139-
});
125+
return Ok(InstallSnapshotResponse { vote: self.vote });
140126
}
141127

142128
// Else, retain snapshot components for later segments & respond.
@@ -145,9 +131,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
145131
id,
146132
snapshot,
147133
});
148-
Ok(InstallSnapshotResponse {
149-
term: self.current_term,
150-
})
134+
Ok(InstallSnapshotResponse { vote: self.vote })
151135
}
152136

153137
#[tracing::instrument(level = "debug", skip(self, req, snapshot), fields(req=%req.summary()))]
@@ -188,9 +172,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
188172
} else {
189173
self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot });
190174
}
191-
Ok(InstallSnapshotResponse {
192-
term: self.current_term,
193-
})
175+
Ok(InstallSnapshotResponse { vote: self.vote })
194176
}
195177

196178
/// Finalize the installation of a new snapshot.

0 commit comments

Comments
 (0)