Skip to content

Commit 2dd8101

Browse files
committed
Change: make Raft::new() async and let it return error during startup
- Change: move startup process from `RaftCore::do_main()` to `Raft::new()`, so that an error during startup can be returned earlier. Upgrade guide: application has to consume the returned future with `Raft::new().await`, and the error returned by the future. - Refactor: move id from `Engine.id` to `Engine.config.id`, so that accessing constant attribute does not depend on a reference to `Engine`.
1 parent 8e3fbb6 commit 2dd8101

File tree

52 files changed

+167
-179
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+167
-179
lines changed

examples/raft-kv-memstore/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub async fn start_example_raft_node(node_id: ExampleNodeId, http_addr: String)
5353
let network = ExampleNetwork {};
5454

5555
// Create a local raft instance.
56-
let raft = Raft::new(node_id, config.clone(), network, store.clone());
56+
let raft = Raft::new(node_id, config.clone(), network, store.clone()).await.unwrap();
5757

5858
// Create an application that will store all the instances created above, this will
5959
// be later used on the actix-web services.

examples/raft-kv-rocksdb/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ where
7373
let network = ExampleNetwork {};
7474

7575
// Create a local raft instance.
76-
let raft = Raft::new(node_id, config.clone(), network, store.clone());
76+
let raft = Raft::new(node_id, config.clone(), network, store.clone()).await.unwrap();
7777

7878
let app = Arc::new(ExampleApp {
7979
id: node_id,

openraft/src/core/raft_core.rs

-16
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use crate::core::SnapshotState;
3535
use crate::core::VoteWiseTime;
3636
use crate::engine::Command;
3737
use crate::engine::Engine;
38-
use crate::engine::EngineConfig;
3938
use crate::entry::EntryRef;
4039
use crate::error::AddLearnerError;
4140
use crate::error::ChangeMembershipError;
@@ -80,7 +79,6 @@ use crate::replication::ReplicationHandle;
8079
use crate::replication::ReplicationSessionId;
8180
use crate::runtime::RaftRuntime;
8281
use crate::storage::RaftSnapshotBuilder;
83-
use crate::storage::StorageHelper;
8482
use crate::versioned::Updatable;
8583
use crate::versioned::Versioned;
8684
use crate::ChangeMembers;
@@ -195,20 +193,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
195193
async fn do_main(&mut self, rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal<C::NodeId>> {
196194
tracing::debug!("raft node is initializing");
197195

198-
let state = {
199-
let mut helper = StorageHelper::new(&mut self.storage);
200-
helper.get_initial_state().await?
201-
};
202-
203-
// TODO(xp): this is not necessary.
204-
self.storage.save_vote(&state.vote).await?;
205-
206-
self.engine = Engine::new(self.id, &state, EngineConfig {
207-
max_in_snapshot_log_to_keep: self.config.max_in_snapshot_log_to_keep,
208-
purge_batch_size: self.config.purge_batch_size,
209-
max_payload_entries: self.config.max_payload_entries,
210-
});
211-
212196
self.engine.startup();
213197
// No output commands
214198

openraft/src/engine/elect_test.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ fn test_elect() -> anyhow::Result<()> {
3939
tracing::info!("--- single node: become leader at once");
4040
{
4141
let mut eng = eng();
42-
eng.id = 1;
42+
eng.config.id = 1;
4343
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(0, 1)), m1()));
4444

4545
eng.elect();
@@ -101,7 +101,7 @@ fn test_elect() -> anyhow::Result<()> {
101101
tracing::info!("--- single node: electing again will override previous state");
102102
{
103103
let mut eng = eng();
104-
eng.id = 1;
104+
eng.config.id = 1;
105105
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(0, 1)), m1()));
106106

107107
// Build in-progress election state
@@ -168,7 +168,7 @@ fn test_elect() -> anyhow::Result<()> {
168168
tracing::info!("--- multi nodes: enter candidate state");
169169
{
170170
let mut eng = eng();
171-
eng.id = 1;
171+
eng.config.id = 1;
172172
eng.state.membership_state.effective = Arc::new(EffectiveMembership::new(Some(log_id(0, 1)), m12()));
173173
eng.state.log_ids = LogIdList::new(vec![log_id(1, 1)]);
174174

openraft/src/engine/engine_impl.rs

+33-30
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ use crate::Vote;
3434
/// Config for Engine
3535
#[derive(Clone, Debug)]
3636
#[derive(PartialEq, Eq)]
37-
pub(crate) struct EngineConfig {
37+
pub(crate) struct EngineConfig<NID: NodeId> {
38+
/// The id of this node.
39+
pub(crate) id: NID,
40+
3841
/// The maximum number of applied logs to keep before purging.
3942
pub(crate) max_in_snapshot_log_to_keep: u64,
4043

@@ -45,9 +48,10 @@ pub(crate) struct EngineConfig {
4548
pub(crate) max_payload_entries: u64,
4649
}
4750

48-
impl Default for EngineConfig {
51+
impl<NID: NodeId> Default for EngineConfig<NID> {
4952
fn default() -> Self {
5053
Self {
54+
id: NID::default(),
5155
max_in_snapshot_log_to_keep: 1000,
5256
purge_batch_size: 256,
5357
max_payload_entries: 300,
@@ -71,11 +75,7 @@ where
7175
N: Node,
7276
NID: NodeId,
7377
{
74-
/// TODO:
75-
#[allow(dead_code)]
76-
pub(crate) id: NID,
77-
78-
pub(crate) config: EngineConfig,
78+
pub(crate) config: EngineConfig<NID>,
7979

8080
/// The state of this raft node.
8181
pub(crate) state: RaftState<NID, N>,
@@ -95,9 +95,8 @@ where
9595
N: Node,
9696
NID: NodeId,
9797
{
98-
pub(crate) fn new(id: NID, init_state: &RaftState<NID, N>, config: EngineConfig) -> Self {
98+
pub(crate) fn new(init_state: &RaftState<NID, N>, config: EngineConfig<NID>) -> Self {
9999
Self {
100-
id,
101100
config,
102101
state: init_state.clone(),
103102
internal_server_state: InternalServerState::default(),
@@ -112,13 +111,17 @@ where
112111
// On startup, do not assume a leader. Becoming a leader require initialization on several fields.
113112
// TODO: allows starting up as a leader. After `server_state` is removed from Engine.
114113

115-
let server_state = if self.state.membership_state.effective.is_voter(&self.id) {
114+
let server_state = if self.state.membership_state.effective.is_voter(&self.config.id) {
116115
ServerState::Follower
117116
} else {
118117
ServerState::Learner
119118
};
120119

121-
tracing::debug!("startup: id={} target_state: {:?}", self.id, self.state.server_state);
120+
tracing::debug!(
121+
"startup: id={} target_state: {:?}",
122+
self.config.id,
123+
self.state.server_state
124+
);
122125

123126
self.state.server_state = server_state;
124127
}
@@ -164,11 +167,11 @@ where
164167
/// Start to elect this node as leader
165168
#[tracing::instrument(level = "debug", skip(self))]
166169
pub(crate) fn elect(&mut self) {
167-
self.handle_vote_change(&Vote::new(self.state.vote.term + 1, self.id)).unwrap();
170+
self.handle_vote_change(&Vote::new(self.state.vote.term + 1, self.config.id)).unwrap();
168171

169172
// Safe unwrap()
170173
let leader = self.internal_server_state.leading_mut().unwrap();
171-
leader.grant_vote_by(self.id);
174+
leader.grant_vote_by(self.config.id);
172175
let quorum_granted = leader.is_vote_granted();
173176

174177
// Fast-path: if there is only one node in the cluster.
@@ -262,7 +265,7 @@ where
262265

263266
debug_assert_eq!(
264267
Some(NodeRole::Voter),
265-
self.state.membership_state.effective.get_node_role(&self.id)
268+
self.state.membership_state.effective.get_node_role(&self.config.id)
266269
);
267270

268271
// If peer's vote is greater than current vote, revert to follower state.
@@ -340,7 +343,7 @@ where
340343

341344
if log_index > 0 {
342345
if let Some(prev_log_id) = self.state.get_log_id(log_index - 1) {
343-
self.update_progress(self.id, Some(prev_log_id));
346+
self.update_progress(self.config.id, Some(prev_log_id));
344347
}
345348
}
346349

@@ -349,7 +352,7 @@ where
349352
}
350353
}
351354
if let Some(last) = entries.last() {
352-
self.update_progress(self.id, Some(*last.get_log_id()));
355+
self.update_progress(self.config.id, Some(*last.get_log_id()));
353356
}
354357

355358
// Still need to replicate to learners, even when it is fast-committed.
@@ -749,7 +752,7 @@ where
749752

750753
debug_assert!(log_id.is_some(), "a valid update can never set matching to None");
751754

752-
if node_id != self.id {
755+
if node_id != self.config.id {
753756
self.push_command(Command::UpdateReplicationMetrics {
754757
target: node_id,
755758
matching: log_id.unwrap(),
@@ -779,7 +782,7 @@ where
779782
/// This is only called by leader.
780783
#[tracing::instrument(level = "debug", skip_all)]
781784
pub(crate) fn leader_step_down(&mut self) {
782-
tracing::debug!("leader_step_down: node_id:{}", self.id);
785+
tracing::debug!("leader_step_down: node_id:{}", self.config.id);
783786

784787
// Step down:
785788
// Keep acting as leader until a membership without this node is committed.
@@ -794,8 +797,8 @@ where
794797

795798
#[allow(clippy::collapsible_if)]
796799
if em.log_id <= self.state.committed {
797-
if !em.is_voter(&self.id) && self.is_leading() {
798-
tracing::debug!("leader {} is stepping down", self.id);
800+
if !em.is_voter(&self.config.id) && self.is_leading() {
801+
tracing::debug!("leader {} is stepping down", self.config.id);
799802
self.enter_following();
800803
}
801804
}
@@ -935,7 +938,7 @@ where
935938
/// `vote.node_id == self.id`: Leading state;
936939
/// `vote.node_id != self.id`: Following state;
937940
pub(crate) fn switch_internal_server_state(&mut self) {
938-
if self.state.vote.node_id == self.id {
941+
if self.state.vote.node_id == self.config.id {
939942
self.enter_leading();
940943
} else {
941944
self.enter_following();
@@ -946,7 +949,7 @@ where
946949
///
947950
/// Leader state has two phase: election phase and replication phase, similar to paxos phase-1 and phase-2
948951
pub(crate) fn enter_leading(&mut self) {
949-
debug_assert_eq!(self.state.vote.node_id, self.id);
952+
debug_assert_eq!(self.state.vote.node_id, self.config.id);
950953
// debug_assert!(
951954
// self.internal_server_state.is_following(),
952955
// "can not enter leading twice"
@@ -1028,7 +1031,7 @@ where
10281031
};
10291032
self.state.log_ids.append(log_id);
10301033
self.push_command(Command::AppendBlankLog { log_id });
1031-
self.update_progress(self.id, Some(log_id));
1034+
self.update_progress(self.config.id, Some(log_id));
10321035
self.push_command(Command::ReplicateEntries { upto: Some(log_id) });
10331036
}
10341037

@@ -1037,7 +1040,7 @@ where
10371040
if let Some(leader) = self.internal_server_state.leading() {
10381041
let mut targets = vec![];
10391042
for (node_id, matched) in leader.progress.iter() {
1040-
if node_id != &self.id {
1043+
if node_id != &self.config.id {
10411044
targets.push((*node_id, *matched));
10421045
}
10431046
}
@@ -1133,7 +1136,7 @@ where
11331136
let server_state = self.calc_server_state();
11341137

11351138
tracing::debug!(
1136-
id = display(self.id),
1139+
id = display(self.config.id),
11371140
prev_server_state = debug(self.state.server_state),
11381141
server_state = debug(server_state),
11391142
"update_server_state_if_changed"
@@ -1176,9 +1179,9 @@ where
11761179

11771180
/// When initialize, the node that accept initialize request has to be a member of the initial config.
11781181
fn check_members_contain_me(&self, m: &Membership<NID, N>) -> Result<(), NotInMembers<NID, N>> {
1179-
if !m.is_voter(&self.id) {
1182+
if !m.is_voter(&self.config.id) {
11801183
let e = NotInMembers {
1181-
node_id: self.id,
1184+
node_id: self.config.id,
11821185
membership: m.clone(),
11831186
};
11841187
Err(e)
@@ -1270,16 +1273,16 @@ where
12701273
}
12711274

12721275
fn is_voter(&self) -> bool {
1273-
self.state.membership_state.is_voter(&self.id)
1276+
self.state.membership_state.is_voter(&self.config.id)
12741277
}
12751278

12761279
/// The node is candidate or leader
12771280
fn is_leading(&self) -> bool {
1278-
self.state.vote.node_id == self.id
1281+
self.state.vote.node_id == self.config.id
12791282
}
12801283

12811284
pub(crate) fn is_leader(&self) -> bool {
1282-
self.state.vote.node_id == self.id && self.state.vote.committed
1285+
self.state.vote.node_id == self.config.id && self.state.vote.committed
12831286
}
12841287

12851288
fn push_command(&mut self, cmd: Command<NID, N>) {

openraft/src/engine/follower_do_append_entries_test.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,8 @@ fn m45() -> Membership<u64, ()> {
5050
}
5151

5252
fn eng() -> Engine<u64, ()> {
53-
let mut eng = Engine {
54-
id: 2, // make it a member
55-
..Default::default()
56-
};
53+
let mut eng = Engine::default();
54+
eng.config.id = 2;
5755
eng.state.log_ids.append(log_id(1, 1));
5856
eng.state.log_ids.append(log_id(2, 3));
5957
eng.state.membership_state.committed = Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()));
@@ -156,7 +154,7 @@ fn test_follower_do_append_entries_one_membership_entry() -> anyhow::Result<()>
156154
// - The membership entry in the input becomes effective membership. The previous effective becomes committed.
157155
// - Follower become Learner, since it is not in the new effective membership.
158156
let mut eng = eng();
159-
eng.id = 2; // make it a member, the become learner
157+
eng.config.id = 2; // make it a member, the become learner
160158

161159
eng.follower_do_append_entries(
162160
&[
@@ -225,7 +223,7 @@ fn test_follower_do_append_entries_three_membership_entries() -> anyhow::Result<
225223
// - A learner become follower.
226224

227225
let mut eng = eng();
228-
eng.id = 5; // make it a learner, then become follower
226+
eng.config.id = 5; // make it a learner, then become follower
229227
eng.state.server_state = eng.calc_server_state();
230228

231229
eng.follower_do_append_entries(

openraft/src/engine/handle_append_entries_req_test.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,8 @@ fn m34() -> Membership<u64, ()> {
4949
}
5050

5151
fn eng() -> Engine<u64, ()> {
52-
let mut eng = Engine::<u64, ()> {
53-
id: 2, // make it a member
54-
..Default::default()
55-
};
52+
let mut eng = Engine::default();
53+
eng.config.id = 2;
5654
eng.state.vote = Vote::new(2, 1);
5755
eng.state.log_ids.append(log_id(1, 1));
5856
eng.state.log_ids.append(log_id(2, 3));

openraft/src/engine/handle_vote_req_test.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ fn test_handle_vote_req_granted_follower_learner_does_not_emit_update_server_sta
205205
let st = ServerState::Learner;
206206

207207
let mut eng = eng();
208-
eng.id = 100; // make it a non-voter
208+
eng.config.id = 100; // make it a non-voter
209209
eng.enter_following();
210210
eng.state.server_state = st;
211211
eng.commands = vec![];
@@ -230,7 +230,7 @@ fn test_handle_vote_req_granted_follower_learner_does_not_emit_update_server_sta
230230
let st = ServerState::Follower;
231231

232232
let mut eng = eng();
233-
eng.id = 0; // make it a voter
233+
eng.config.id = 0; // make it a voter
234234
eng.enter_following();
235235
eng.state.server_state = st;
236236
eng.commands = vec![];

0 commit comments

Comments
 (0)