Skip to content

Commit 8e3fbb6

Browse files
authored
Merge pull request #635 from drmingdrmer/30-internal-state
Refactor: move leading state out of RaftState
2 parents bdff23b + ba81b37 commit 8e3fbb6

11 files changed

+71
-73
lines changed

openraft/src/core/raft_core.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
251251
// Spawn parallel requests, all with the standard timeout for heartbeats.
252252
let mut pending = FuturesUnordered::new();
253253

254-
let voter_progresses = if let Some(l) = &self.engine.state.internal_server_state.leading() {
254+
let voter_progresses = if let Some(l) = &self.engine.internal_server_state.leading() {
255255
l.progress
256256
.iter()
257257
.filter(|(id, _v)| l.progress.is_voter(id) == Some(true))
@@ -399,7 +399,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
399399

400400
let curr = &self.engine.state.membership_state.effective;
401401
if curr.contains(&target) {
402-
let matched = if let Some(l) = &self.engine.state.internal_server_state.leading() {
402+
let matched = if let Some(l) = &self.engine.internal_server_state.leading() {
403403
*l.progress.get(&target)
404404
} else {
405405
unreachable!("it has to be a leader!!!");
@@ -538,7 +538,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
538538
Expectation::AtLineRate => {
539539
// Expect to be at line rate but not.
540540

541-
let matched = if let Some(l) = &self.engine.state.internal_server_state.leading() {
541+
let matched = if let Some(l) = &self.engine.internal_server_state.leading() {
542542
*l.progress.get(node_id)
543543
} else {
544544
unreachable!("it has to be a leader!!!");

openraft/src/engine/elect_test.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ fn test_elect() -> anyhow::Result<()> {
4747
assert_eq!(Vote::new_committed(1, 1), eng.state.vote);
4848
assert_eq!(
4949
Some(btreeset! {1},),
50-
eng.state.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
50+
eng.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
5151
);
5252

5353
assert_eq!(ServerState::Leader, eng.state.server_state);
@@ -106,15 +106,15 @@ fn test_elect() -> anyhow::Result<()> {
106106

107107
// Build in-progress election state
108108
eng.state.vote = Vote::new_committed(1, 2);
109-
eng.state.new_leader();
110-
eng.state.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
109+
eng.new_leader();
110+
eng.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
111111

112112
eng.elect();
113113

114114
assert_eq!(Vote::new_committed(2, 1), eng.state.vote);
115115
assert_eq!(
116116
Some(btreeset! {1},),
117-
eng.state.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
117+
eng.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
118118
);
119119

120120
assert_eq!(ServerState::Leader, eng.state.server_state);
@@ -177,7 +177,7 @@ fn test_elect() -> anyhow::Result<()> {
177177
assert_eq!(Vote::new(1, 1), eng.state.vote);
178178
assert_eq!(
179179
Some(btreeset! {1},),
180-
eng.state.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
180+
eng.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
181181
);
182182

183183
assert_eq!(ServerState::Candidate, eng.state.server_state);

openraft/src/engine/engine_impl.rs

+27-11
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::error::NotAllowed;
99
use crate::error::NotInMembers;
1010
use crate::error::RejectVoteRequest;
1111
use crate::internal_server_state::InternalServerState;
12+
use crate::leader::Leader;
1213
use crate::membership::EffectiveMembership;
1314
use crate::membership::NodeRole;
1415
use crate::node::Node;
@@ -79,6 +80,9 @@ where
7980
/// The state of this raft node.
8081
pub(crate) state: RaftState<NID, N>,
8182

83+
/// The internal server state used by Engine.
84+
pub(crate) internal_server_state: InternalServerState<NID>,
85+
8286
/// Tracks what kind of metrics changed
8387
pub(crate) metrics_flags: MetricsChangeFlags,
8488

@@ -96,6 +100,7 @@ where
96100
id,
97101
config,
98102
state: init_state.clone(),
103+
internal_server_state: InternalServerState::default(),
99104
metrics_flags: MetricsChangeFlags::default(),
100105
commands: vec![],
101106
}
@@ -162,7 +167,7 @@ where
162167
self.handle_vote_change(&Vote::new(self.state.vote.term + 1, self.id)).unwrap();
163168

164169
// Safe unwrap()
165-
let leader = self.state.internal_server_state.leading_mut().unwrap();
170+
let leader = self.internal_server_state.leading_mut().unwrap();
166171
leader.grant_vote_by(self.id);
167172
let quorum_granted = leader.is_vote_granted();
168173

@@ -233,7 +238,7 @@ where
233238
);
234239

235240
// If this node is no longer a leader(i.e., electing), just ignore the delayed vote_resp.
236-
let leader = match &mut self.state.internal_server_state {
241+
let leader = match &mut self.internal_server_state {
237242
InternalServerState::Leading(l) => l,
238243
InternalServerState::Following => return,
239244
};
@@ -674,7 +679,7 @@ where
674679
let end = self.state.last_log_id().next_index();
675680

676681
// If membership changes, the progress should be upgraded.
677-
if let Some(leader) = &mut self.state.internal_server_state.leading_mut() {
682+
if let Some(leader) = &mut self.internal_server_state.leading_mut() {
678683
let old_progress = leader.progress.clone();
679684
let learner_ids = em.learner_ids().collect::<Vec<_>>();
680685

@@ -683,7 +688,7 @@ where
683688
}
684689

685690
// A leader that is removed will be shut down when this membership log is committed.
686-
if self.state.internal_server_state.is_leading() {
691+
if self.internal_server_state.is_leading() {
687692
self.update_replications()
688693
}
689694

@@ -699,7 +704,7 @@ where
699704
tracing::debug!("update_progress: node_id:{} log_id:{:?}", node_id, log_id);
700705

701706
let committed = {
702-
let leader = match self.state.internal_server_state.leading_mut() {
707+
let leader = match self.internal_server_state.leading_mut() {
703708
None => {
704709
// TODO: is it a bug if trying to update progress when it is not in leading state?
705710
return;
@@ -943,11 +948,11 @@ where
943948
pub(crate) fn enter_leading(&mut self) {
944949
debug_assert_eq!(self.state.vote.node_id, self.id);
945950
// debug_assert!(
946-
// self.state.internal_server_state.is_following(),
951+
// self.internal_server_state.is_following(),
947952
// "can not enter leading twice"
948953
// );
949954

950-
self.state.new_leader();
955+
self.new_leader();
951956
}
952957

953958
/// Leave leading state and enter following state(vote.node_id != self.id).
@@ -962,7 +967,7 @@ where
962967
// debug_assert_ne!(self.state.vote.node_id, self.id);
963968

964969
// debug_assert!(
965-
// self.state.internal_server_state.is_leading(),
970+
// self.internal_server_state.is_leading(),
966971
// "can not enter following twice"
967972
// );
968973

@@ -980,11 +985,11 @@ where
980985
self.push_command(Command::InstallElectionTimer { can_be_leader: true });
981986
}
982987

983-
if self.state.internal_server_state.is_following() {
988+
if self.internal_server_state.is_following() {
984989
return;
985990
}
986991

987-
self.state.internal_server_state = InternalServerState::Following;
992+
self.internal_server_state = InternalServerState::Following;
988993

989994
self.update_server_state_if_changed();
990995
}
@@ -1005,6 +1010,17 @@ where
10051010
self.append_blank_log();
10061011
}
10071012

1013+
/// Create a new Leader, when raft enters candidate state.
1014+
/// In openraft, Leader and Candidate shares the same state.
1015+
pub(crate) fn new_leader(&mut self) {
1016+
let em = &self.state.membership_state.effective;
1017+
self.internal_server_state = InternalServerState::Leading(Leader::new(
1018+
em.membership.to_quorum_set(),
1019+
em.learner_ids(),
1020+
self.state.last_log_id().index(),
1021+
));
1022+
}
1023+
10081024
fn append_blank_log(&mut self) {
10091025
let log_id = LogId {
10101026
leader_id: self.state.vote.leader_id(),
@@ -1018,7 +1034,7 @@ where
10181034

10191035
/// update replication streams to reflect replication progress change.
10201036
fn update_replications(&mut self) {
1021-
if let Some(leader) = self.state.internal_server_state.leading() {
1037+
if let Some(leader) = self.internal_server_state.leading() {
10221038
let mut targets = vec![];
10231039
for (node_id, matched) in leader.progress.iter() {
10241040
if node_id != &self.id {

openraft/src/engine/handle_vote_req_test.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn eng() -> Engine<u64, ()> {
3131
eng.state.vote = Vote::new(2, 1);
3232
eng.state.server_state = ServerState::Candidate;
3333
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()));
34-
eng.state.new_leader();
34+
eng.new_leader();
3535
eng
3636
}
3737

@@ -54,7 +54,7 @@ fn test_handle_vote_req_reject_smaller_vote() -> anyhow::Result<()> {
5454
);
5555

5656
assert_eq!(Vote::new(2, 1), eng.state.vote);
57-
assert!(eng.state.internal_server_state.is_leading());
57+
assert!(eng.internal_server_state.is_leading());
5858

5959
assert_eq!(ServerState::Candidate, eng.state.server_state);
6060
assert_eq!(
@@ -91,7 +91,7 @@ fn test_handle_vote_req_reject_smaller_last_log_id() -> anyhow::Result<()> {
9191
);
9292

9393
assert_eq!(Vote::new(2, 1), eng.state.vote);
94-
assert!(eng.state.internal_server_state.is_leading());
94+
assert!(eng.internal_server_state.is_leading());
9595

9696
assert_eq!(ServerState::Candidate, eng.state.server_state);
9797
assert_eq!(
@@ -129,7 +129,7 @@ fn test_handle_vote_req_granted_equal_vote_and_last_log_id() -> anyhow::Result<(
129129
);
130130

131131
assert_eq!(Vote::new(2, 1), eng.state.vote);
132-
assert!(eng.state.internal_server_state.is_following());
132+
assert!(eng.internal_server_state.is_following());
133133

134134
assert_eq!(ServerState::Follower, eng.state.server_state);
135135
assert_eq!(
@@ -174,7 +174,7 @@ fn test_handle_vote_req_granted_greater_vote() -> anyhow::Result<()> {
174174
);
175175

176176
assert_eq!(Vote::new(3, 1), eng.state.vote);
177-
assert!(eng.state.internal_server_state.is_following());
177+
assert!(eng.internal_server_state.is_following());
178178

179179
assert_eq!(ServerState::Follower, eng.state.server_state);
180180
assert_eq!(

openraft/src/engine/handle_vote_resp_test.rs

+16-16
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
5050
});
5151

5252
assert_eq!(Vote::new(2, 1), eng.state.vote);
53-
assert!(eng.state.internal_server_state.is_following());
53+
assert!(eng.internal_server_state.is_following());
5454

5555
assert_eq!(ServerState::Follower, eng.state.server_state);
5656
assert_eq!(
@@ -71,8 +71,8 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
7171
eng.id = 1;
7272
eng.state.vote = Vote::new(2, 1);
7373
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12()));
74-
eng.state.new_leader();
75-
eng.state.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
74+
eng.new_leader();
75+
eng.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
7676
eng.state.server_state = ServerState::Candidate;
7777

7878
eng.handle_vote_resp(2, VoteResponse {
@@ -84,7 +84,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
8484
assert_eq!(Vote::new(2, 1), eng.state.vote);
8585
assert_eq!(
8686
Some(btreeset! {1},),
87-
eng.state.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
87+
eng.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
8888
);
8989

9090
assert_eq!(ServerState::Candidate, eng.state.server_state);
@@ -110,8 +110,8 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
110110
eng.state.vote = Vote::new(2, 1);
111111
eng.state.log_ids = LogIdList::new(vec![log_id(3, 3)]);
112112
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12()));
113-
eng.state.new_leader();
114-
eng.state.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
113+
eng.new_leader();
114+
eng.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
115115
eng.state.server_state = ServerState::Candidate;
116116

117117
eng.handle_vote_resp(2, VoteResponse {
@@ -121,7 +121,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
121121
});
122122

123123
assert_eq!(Vote::new(2, 2), eng.state.vote);
124-
assert!(eng.state.internal_server_state.is_leading());
124+
assert!(eng.internal_server_state.is_leading());
125125

126126
assert_eq!(ServerState::Candidate, eng.state.server_state);
127127
assert_eq!(
@@ -148,8 +148,8 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
148148
eng.id = 1;
149149
eng.state.vote = Vote::new(2, 1);
150150
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12()));
151-
eng.state.new_leader();
152-
eng.state.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
151+
eng.new_leader();
152+
eng.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
153153
eng.state.server_state = ServerState::Candidate;
154154

155155
eng.handle_vote_resp(2, VoteResponse {
@@ -161,7 +161,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
161161
assert_eq!(Vote::new(2, 1), eng.state.vote);
162162
assert_eq!(
163163
Some(btreeset! {1},),
164-
eng.state.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
164+
eng.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
165165
);
166166

167167
assert_eq!(ServerState::Candidate, eng.state.server_state);
@@ -186,8 +186,8 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
186186
eng.id = 1;
187187
eng.state.vote = Vote::new(2, 1);
188188
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m1234()));
189-
eng.state.new_leader();
190-
eng.state.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
189+
eng.new_leader();
190+
eng.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
191191
eng.state.server_state = ServerState::Candidate;
192192

193193
eng.handle_vote_resp(2, VoteResponse {
@@ -199,7 +199,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
199199
assert_eq!(Vote::new(2, 1), eng.state.vote);
200200
assert_eq!(
201201
Some(btreeset! {1,2},),
202-
eng.state.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
202+
eng.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
203203
);
204204

205205
assert_eq!(ServerState::Candidate, eng.state.server_state);
@@ -221,8 +221,8 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
221221
eng.id = 1;
222222
eng.state.vote = Vote::new(2, 1);
223223
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12()));
224-
eng.state.new_leader();
225-
eng.state.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
224+
eng.new_leader();
225+
eng.internal_server_state.leading_mut().map(|l| l.vote_granted_by.insert(1));
226226
eng.state.server_state = ServerState::Candidate;
227227

228228
eng.handle_vote_resp(2, VoteResponse {
@@ -234,7 +234,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
234234
assert_eq!(Vote::new_committed(2, 1), eng.state.vote);
235235
assert_eq!(
236236
Some(btreeset! {1,2},),
237-
eng.state.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
237+
eng.internal_server_state.leading().map(|x| x.vote_granted_by.clone())
238238
);
239239

240240
assert_eq!(ServerState::Leader, eng.state.server_state);

openraft/src/engine/internal_handle_vote_req_test.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ fn eng() -> Engine<u64, ()> {
3030
eng.state.vote = Vote::new(2, 1);
3131
eng.state.server_state = ServerState::Candidate;
3232
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()));
33-
eng.state.new_leader();
33+
eng.new_leader();
3434
eng
3535
}
3636

@@ -43,7 +43,7 @@ fn test_handle_vote_change_reject_smaller_vote() -> anyhow::Result<()> {
4343
assert_eq!(Err(RejectVoteRequest::ByVote(Vote::new(2, 1))), resp);
4444

4545
assert_eq!(Vote::new(2, 1), eng.state.vote);
46-
assert!(eng.state.internal_server_state.is_leading());
46+
assert!(eng.internal_server_state.is_leading());
4747

4848
assert_eq!(ServerState::Candidate, eng.state.server_state);
4949
assert_eq!(
@@ -70,7 +70,7 @@ fn test_handle_vote_change_committed_vote() -> anyhow::Result<()> {
7070
assert_eq!(Ok(()), resp);
7171

7272
assert_eq!(Vote::new_committed(3, 2), eng.state.vote);
73-
assert!(eng.state.internal_server_state.is_following());
73+
assert!(eng.internal_server_state.is_following());
7474

7575
assert_eq!(ServerState::Follower, eng.state.server_state);
7676
assert_eq!(
@@ -108,7 +108,7 @@ fn test_handle_vote_change_granted_equal_vote() -> anyhow::Result<()> {
108108
assert_eq!(Ok(()), resp);
109109

110110
assert_eq!(Vote::new(2, 1), eng.state.vote);
111-
assert!(eng.state.internal_server_state.is_following());
111+
assert!(eng.internal_server_state.is_following());
112112

113113
assert_eq!(ServerState::Follower, eng.state.server_state);
114114
assert_eq!(
@@ -142,7 +142,7 @@ fn test_handle_vote_change_granted_greater_vote() -> anyhow::Result<()> {
142142
assert_eq!(Ok(()), resp);
143143

144144
assert_eq!(Vote::new(3, 1), eng.state.vote);
145-
assert!(eng.state.internal_server_state.is_following());
145+
assert!(eng.internal_server_state.is_following());
146146

147147
assert_eq!(ServerState::Follower, eng.state.server_state);
148148
assert_eq!(

0 commit comments

Comments
 (0)