Skip to content

Commit 4a85ee9

Browse files
committed
Feature: feature flag "single-term-leader": standard raft mode
With this feature on: only one leader can be elected in each term, but reduce LogId size from `LogId:{term, node_id, index}` to `LogId{term, index}`. Add `CommittedLeaderId` as the leader-id type used in `LogId`: The leader-id used in `LogId` can be different(smaller) from leader-id used in `Vote`, depending on `LeaderId` definition. `CommittedLeaderId` is the smallest data that can identify a leader after the leadership is granted by a quorum(committed). Change: Vote stores a LeaderId in it. ```rust // Before pub struct Vote<NID> { pub term: u64, pub node_id: NID, pub committed: bool, } // After pub struct Vote<NID> { #[cfg_attr(feature = "serde", serde(flatten))] pub leader_id: LeaderId<NID>, pub committed: bool, } ``` Upgrade tip: If you manually serialize `Vote`, i.e. without using `serde`, the serialization part should be rewritten. Otherwise, nothing needs to be done. - Fix: #660
1 parent 24c212a commit 4a85ee9

File tree

75 files changed

+1195
-685
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+1195
-685
lines changed

.github/workflows/ci.yaml

+13
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,27 @@ jobs:
7878
- toolchain: "stable"
7979
store_defensive: "off"
8080
send_delay: "0"
81+
features: ""
82+
8183
- toolchain: "nightly"
8284
store_defensive: "on"
8385
send_delay: "30"
86+
features: ""
87+
8488
- toolchain: "nightly"
8589
store_defensive: "on"
8690
send_delay: "0"
91+
features: ""
92+
8793
- toolchain: "nightly"
8894
store_defensive: "off"
8995
send_delay: "0"
96+
features: ""
97+
98+
- toolchain: "nightly"
99+
store_defensive: "on"
100+
send_delay: "0"
101+
features: "single-term-leader"
90102

91103
steps:
92104
- name: Setup | Checkout
@@ -107,6 +119,7 @@ jobs:
107119
uses: actions-rs/cargo@v1
108120
with:
109121
command: test
122+
args: --features "${{ matrix.features }}"
110123
env:
111124
# Parallel tests block each other and result in timeout.
112125
RUST_TEST_THREADS: 2

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ test:
1010
cargo test
1111
cargo test --features bt
1212
cargo test --features serde
13+
cargo test --features single-term-leader
1314
cargo test --manifest-path examples/raft-kv-memstore/Cargo.toml
1415
cargo test --manifest-path examples/raft-kv-rocksdb/Cargo.toml
1516

README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,13 @@ Currently, openraft is the consensus engine of meta-service cluster in [databend
6363

6464
# Roadmap
6565

66-
- [x] [Extended joint membership](https://datafuselabs.github.io/openraft/extended-membership)
66+
- [x] **2022-10-31** [Extended joint membership](https://datafuselabs.github.io/openraft/extended-membership)
6767

68-
- [ ] Reduce the complexity of vote and pre-vote: [get rid of pre-vote RPC](https://github.com/datafuselabs/openraft/discussions/15);
68+
- [x] **2023-02-14** Reduce confliction rate when electing;
69+
See: [Openraft Vote design](https://datafuselabs.github.io/openraft/vote);
70+
Or use standard raft mode with [feature flag `single-term-leader`](https://datafuselabs.github.io/openraft/feature-flags).
6971

70-
- [ ] Reduce confliction rate when electing;
71-
Allow leadership to be taken in one term by a node with greater node-id.
72+
- [ ] Reduce the complexity of vote and pre-vote: [get rid of pre-vote RPC](https://github.com/datafuselabs/openraft/discussions/15);
7273

7374
- [ ] Support flexible quorum, e.g.:[Hierarchical Quorums](https://zookeeper.apache.org/doc/r3.5.9/zookeeperHierarchicalQuorums.html)
7475

guide/src/SUMMARY.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
- [Metrics](./metrics.md)
1515

16+
- [Feature flags](./feature-flags.md)
17+
1618
- [Internal](./internal.md)
1719
- [Architecture](./architecture.md)
1820
- [Threads](./threading.md)

guide/src/feature-flags.md

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Feature flags
2+
3+
By default openraft enables no features.
4+
5+
- `bt`: attaches backtrace to generated errors.
6+
7+
- `serde`: derives `serde::Serialize, serde::Deserialize` for type that are used
8+
in storage and network, such as `Vote` or `AppendEntriesRequest`.
9+
10+
- `single-term-leader`: allows only one leader to be elected in each `term`.
11+
This is the standard raft policy, which increases election confliction rate
12+
but reduce `LogId`(`(term, node_id, index)` to `(term, index)`) size.
13+
Read more about how it is implemented in [`vote`](./vote.md)

guide/src/vote.md

+95-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,25 @@
22

33
```rust
44
struct Vote<NID: NodeId> {
5-
term: u64,
6-
node_id: NID,
5+
leader_id: LeaderId<NID>
76
committed: bool
87
}
8+
9+
// Advanced mode(default):
10+
#[cfg(not(feature = "single-term-leader"))]
11+
pub struct LeaderId<NID: NodeId>
12+
{
13+
pub term: u64,
14+
pub node_id: NID,
15+
}
16+
17+
// Standard raft mode:
18+
#[cfg(feature = "single-term-leader")]
19+
pub struct LeaderId<NID: NodeId>
20+
{
21+
pub term: u64,
22+
pub voted_for: Option<NID>,
23+
}
924
```
1025

1126
`vote` in openraft defines the pseudo **time**(in other word, defines every `leader`) in a distributed consensus.
@@ -15,6 +30,19 @@ In a standard raft, the corresponding concept is `term`.
1530
Although in standard raft a single `term` is not enough to define a **time
1631
point**.
1732

33+
In openraft, RPC validity checking(such as when handling vote request, or
34+
append-entries request) is very simple: **A node grants a `Vote` which is greater than its last seen `Vote`**:
35+
36+
```rust
37+
fn handle_vote(vote: Vote) {
38+
if !(vote >= self.vote) {
39+
return Err(())
40+
}
41+
save_vote(vote);
42+
Ok(())
43+
}
44+
```
45+
1846
Every server state(leader, candidate, follower or learner) has a unique
1947
corresponding `vote`, thus `vote` can be used to identify different server
2048
states, i.e, if the `vote` changes, the server state must have changed.
@@ -38,3 +66,68 @@ E.g.:
3866

3967
- A vote `(term=1, node_id=1, committed=false|true)` is in another different
4068
follower/learner state for node-3.
69+
70+
71+
## Partial order
72+
73+
`Vote` in openraft is partially ordered value,
74+
i.e., it is legal that `!(vote_a => vote_b) && !(vote_a <= vote_b)`.
75+
Because `Vote.leader_id` may be a partial order value:
76+
77+
78+
## LeaderId: advanced mode and standard mode
79+
80+
Openraft provides two `LeaderId` type, which can be switched with feature
81+
`single-term-leader`:
82+
83+
- `cargo build` without `single-term-leader`, is the advanced mode, the default mode:
84+
It builds openraft with `LeaderId:(term, node_id)`, which is totally ordered.
85+
Which means, in a single `term`, there could be more than one leaders
86+
elected(although only the last is valid and can commit logs).
87+
88+
- Pros: election conflict is minimized,
89+
90+
- Cons: `LogId` becomes larger: every log has to store an additional `NodeId` in `LogId`:
91+
`LogId: {{term, NodeId}, index}`.
92+
If an application uses a big `NodeId` type, e.g., UUID, the penalty may not
93+
be negligible.
94+
95+
- `cargo build --features "single-term-leader"` builds openraft in standard raft mode with:
96+
`LeaderId:(term, voted_for:Option<NodeId>)`, which makes `LeaderId` and `Vote`
97+
**partially-ordered** values. In this mode, only one leader can be elected in
98+
each `term`.
99+
100+
The partial order relation of `LeaderId`:
101+
102+
```
103+
LeaderId(3, None) > LeaderId(2, None): true
104+
LeaderId(3, None) > LeaderId(2, Some(y)): true
105+
LeaderId(3, None) == LeaderId(3, None): true
106+
LeaderId(3, Some(x)) > LeaderId(2, Some(y)): true
107+
LeaderId(3, Some(x)) > LeaderId(3, None): true
108+
LeaderId(3, Some(x)) == LeaderId(3, Some(x)): true
109+
LeaderId(3, Some(x)) > LeaderId(3, Some(y)): false
110+
```
111+
112+
The partial order between `Vote` is defined as:
113+
Given two `Vote` `a` and `b`:
114+
`a > b` iff:
115+
116+
```rust
117+
a.leader_id > b.leader_id || (
118+
!(a.leader_id < b.leader_id) && a.committed > b.committed
119+
)
120+
```
121+
122+
In other words, if `a.leader_id` and `b.leader_id` is not
123+
comparable(`!(a.leader_id>=b.leader_id) && !(a.leader_id<=b.leader_id)`), use
124+
field `committed` to determine the order between `a` and `b`.
125+
126+
Because a leader must be granted by a quorum before committing any log, two
127+
incomparable `leader_id` can not both be granted.
128+
So let a committed `Vote` override a incomparable non-committed is safe.
129+
130+
- Pros: `LogId` just store a `term`.
131+
132+
- Cons: election conflicting rate may increase.
133+

openraft/Cargo.toml

+16
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ anyhow = { workspace = true }
3636
async-entry = { workspace = true }
3737
lazy_static = { workspace = true }
3838
pretty_assertions = { workspace = true }
39+
serde_json = { workspace = true }
3940
tracing-appender = { workspace = true }
4041
tracing-subscriber = { workspace = true }
4142

@@ -58,5 +59,20 @@ bt = ["anyerror/backtrace", "anyhow/backtrace"]
5859
# If you'd like to use `serde` to serialize messages.
5960
serde = ["dep:serde"]
6061

62+
# Turn on this feature it allows at most ONE quorum-granted leader for each term.
63+
# This is the way standard raft does, by making the LeaderId a partial order value.
64+
#
65+
# - With this feature on:
66+
# It is more likely to conflict during election. But every log only needs to store one `term` in it.
67+
#
68+
# - With this feature off:
69+
# Election conflict rate will be reduced, but every log has to store a `LeaderId{ term, node_id}`,
70+
# which may be costly if an application uses a big NodeId type.
71+
#
72+
# This feature is disabled by default.
73+
single-term-leader = []
74+
75+
# default = ["single-term-leader"]
76+
6177
[package.metadata.docs.rs]
6278
features = ["docinclude"] # Activate `docinclude` during docs.rs build.

openraft/src/core/raft_core.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
318318
// If we receive a response with a greater term, then revert to follower and abort this
319319
// request.
320320
if let AppendEntriesResponse::HigherVote(vote) = data {
321+
debug_assert!(
322+
&vote > self.engine.state.get_vote(),
323+
"committed vote({}) has total order relation with other votes({})",
324+
self.engine.state.get_vote(),
325+
vote
326+
);
327+
321328
let res = self.engine.vote_handler().handle_message_vote(&vote);
322329
self.run_engine_commands::<Entry<C>>(&[]).await?;
323330

@@ -617,7 +624,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
617624
id: self.id,
618625

619626
// --- data ---
620-
current_term: self.engine.state.get_vote().term,
627+
current_term: self.engine.state.get_vote().leader_id().get_term(),
621628
last_log_index: self.engine.state.last_log_id().index(),
622629
last_applied: self.engine.state.committed().copied(),
623630
snapshot: self.engine.state.snapshot_meta.last_log_id,
@@ -810,11 +817,14 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
810817
"get current_leader"
811818
);
812819

813-
if !self.engine.state.get_vote().committed {
820+
let vote = self.engine.state.get_vote();
821+
822+
if !vote.is_committed() {
814823
return None;
815824
}
816825

817-
let id = self.engine.state.get_vote().node_id;
826+
// Safe unwrap(): vote that is committed has to already have voted for some node.
827+
let id = vote.leader_id().voted_for().unwrap();
818828

819829
// TODO: `is_voter()` is slow, maybe cache `current_leader`,
820830
// e.g., only update it when membership or vote changes

openraft/src/defensive.rs

+6-10
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,15 @@ where
9999

100100
let curr = h.unwrap_or_default();
101101

102-
if vote.term < curr.term {
103-
return Err(DefensiveError::new(ErrorSubject::Vote, Violation::TermNotAscending {
104-
curr: curr.term,
105-
to: vote.term,
106-
})
107-
.into());
108-
}
109-
110102
if vote >= &curr {
111-
Ok(())
103+
// OK
112104
} else {
113-
Err(DefensiveError::new(ErrorSubject::Vote, Violation::NonIncrementalVote { curr, to: *vote }).into())
105+
return Err(
106+
DefensiveError::new(ErrorSubject::Vote, Violation::VoteNotAscending { curr, to: *vote }).into(),
107+
);
114108
}
109+
110+
Ok(())
115111
}
116112

117113
/// The log entries fed into a store must be consecutive otherwise it is a bug.

openraft/src/engine/elect_test.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ use crate::engine::Engine;
99
use crate::engine::LogIdList;
1010
use crate::raft::VoteRequest;
1111
use crate::raft_state::VoteStateReader;
12+
use crate::CommittedLeaderId;
1213
use crate::EffectiveMembership;
13-
use crate::LeaderId;
1414
use crate::LogId;
1515
use crate::Membership;
1616
use crate::MetricsChangeFlags;
1717
use crate::Vote;
1818

1919
fn log_id(term: u64, index: u64) -> LogId<u64> {
2020
LogId::<u64> {
21-
leader_id: LeaderId { term, node_id: 1 },
21+
leader_id: CommittedLeaderId::new(term, 1),
2222
index,
2323
}
2424
}
@@ -33,7 +33,7 @@ fn m12() -> Membership<u64, ()> {
3333

3434
fn eng() -> Engine<u64, ()> {
3535
let mut eng = Engine::default();
36-
eng.state.log_ids = LogIdList::new([LogId::new(LeaderId::new(0, 0), 0)]);
36+
eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]);
3737
eng.state.enable_validate = false; // Disable validation for incomplete state
3838
eng
3939
}
@@ -76,20 +76,20 @@ fn test_elect() -> anyhow::Result<()> {
7676
Command::RebuildReplicationStreams { targets: vec![] },
7777
Command::AppendBlankLog {
7878
log_id: LogId {
79-
leader_id: LeaderId { term: 1, node_id: 1 },
79+
leader_id: CommittedLeaderId::new(1, 1),
8080
index: 1,
8181
},
8282
},
8383
Command::ReplicateCommitted {
8484
committed: Some(LogId {
85-
leader_id: LeaderId { term: 1, node_id: 1 },
85+
leader_id: CommittedLeaderId::new(1, 1),
8686
index: 1,
8787
},),
8888
},
8989
Command::LeaderCommit {
9090
already_committed: None,
9191
upto: LogId {
92-
leader_id: LeaderId { term: 1, node_id: 1 },
92+
leader_id: CommittedLeaderId::new(1, 1),
9393
index: 1,
9494
},
9595
},
@@ -139,20 +139,20 @@ fn test_elect() -> anyhow::Result<()> {
139139
Command::RebuildReplicationStreams { targets: vec![] },
140140
Command::AppendBlankLog {
141141
log_id: LogId {
142-
leader_id: LeaderId { term: 2, node_id: 1 },
142+
leader_id: CommittedLeaderId::new(2, 1),
143143
index: 1,
144144
},
145145
},
146146
Command::ReplicateCommitted {
147147
committed: Some(LogId {
148-
leader_id: LeaderId { term: 2, node_id: 1 },
148+
leader_id: CommittedLeaderId::new(2, 1),
149149
index: 1,
150150
},),
151151
},
152152
Command::LeaderCommit {
153153
already_committed: None,
154154
upto: LogId {
155-
leader_id: LeaderId { term: 2, node_id: 1 },
155+
leader_id: CommittedLeaderId::new(2, 1),
156156
index: 1,
157157
},
158158
},

openraft/src/engine/engine_impl.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ where
206206
/// Start to elect this node as leader
207207
#[tracing::instrument(level = "debug", skip(self))]
208208
pub(crate) fn elect(&mut self) {
209-
let v = Vote::new(self.state.get_vote().term + 1, self.config.id);
209+
let v = Vote::new(self.state.get_vote().leader_id().term + 1, self.config.id);
210210
// Safe unwrap(): it won't reject itself ˙–˙
211211
self.vote_handler().handle_message_vote(&v).unwrap();
212212

@@ -572,7 +572,10 @@ where
572572
}
573573

574574
fn assign_log_ids<'a, Ent: RaftEntry<NID, N> + 'a>(&mut self, entries: impl Iterator<Item = &'a mut Ent>) {
575-
let mut log_id = LogId::new(self.state.get_vote().leader_id(), self.state.last_log_id().next_index());
575+
let mut log_id = LogId::new(
576+
self.state.get_vote().committed_leader_id().unwrap(),
577+
self.state.last_log_id().next_index(),
578+
);
576579
for entry in entries {
577580
entry.set_log_id(&log_id);
578581
tracing::debug!("assign log id: {}", log_id);

0 commit comments

Comments
 (0)