Skip to content

Commit 7f34793

Browse files
committed
change: simplify membership change
- Change: if leadership is lost, the cluster is left with the **joint** config. One does not receive response of the change-membership request should always re-send to ensure membership config is applied. - Change: remove joint-uniform logic from RaftCore, which brings a lot complexity to raft impl. This logic is now done in Raft(which is a shell to control RaftCore). - Change: RaftCore.membership is changed to `ActiveMembership`, which includes a log id and a membership config. Making this change to let raft be able to check if a membership is committed by comparing the log index and its committed index. - Change: when adding a existent non-voter, it returns an `Ok` value instead of an `Err`. - Change: add arg `blocking` to `add_non_voter` and `change_membership`. A blocking `change_membership` still wait for the two config change log to commit. `blocking` only indicates if to wait for replication to non-voter to be up to date. - Change: remove `non_voters`. Merge it into `nodes`. Now both voters and non-voters share the same replication handle. - Change: remove field `ReplicationState.is_ready_to_join`, it can be just calculated when needed. - Change: remove `is_stepping_down`, `membership.contains()` is quite enough. - Change: remove `consensus_state`.
1 parent a5793df commit 7f34793

27 files changed

+1022
-723
lines changed

async-raft/src/core/admin.rs

+107-174
Large diffs are not rendered by default.

async-raft/src/core/append_entries.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::raft::AppendEntriesResponse;
88
use crate::raft::ConflictOpt;
99
use crate::raft::Entry;
1010
use crate::raft::EntryPayload;
11+
use crate::ActiveMembership;
1112
use crate::AppData;
1213
use crate::AppDataResponse;
1314
use crate::LogId;
@@ -378,13 +379,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
378379
let last_conf_change = entries
379380
.iter()
380381
.filter_map(|ent| match &ent.payload {
381-
EntryPayload::ConfigChange(conf) => Some(conf),
382+
EntryPayload::ConfigChange(conf) => Some(ActiveMembership {
383+
log_id: ent.log_id,
384+
membership: conf.membership.clone(),
385+
}),
382386
_ => None,
383387
})
384388
.last();
389+
385390
if let Some(conf) = last_conf_change {
386391
tracing::debug!({membership=?conf}, "applying new membership config received from leader");
387-
self.update_membership(conf.membership.clone())?;
392+
self.update_membership(conf)?;
388393
};
389394

390395
// Replicate entries to log (same as append, but in follower mode).

async-raft/src/core/client.rs

+102-124
Large diffs are not rendered by default.

async-raft/src/core/mod.rs

+50-81
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ mod append_entries;
55
mod client;
66
mod install_snapshot;
77
pub(crate) mod replication;
8+
#[cfg(test)]
9+
mod replication_state_test;
810
mod vote;
911

1012
use std::collections::BTreeMap;
11-
use std::collections::BTreeSet;
12-
use std::collections::HashSet;
1313
use std::sync::Arc;
1414

1515
use futures::future::AbortHandle;
@@ -60,14 +60,29 @@ use crate::RaftStorage;
6060
use crate::StorageError;
6161
use crate::Update;
6262

63+
/// The currently active membership config.
64+
///
65+
/// It includes:
66+
/// - the id of the log that sets this membership config,
67+
/// - and the config.
68+
///
69+
/// An active config is just the last seen config in raft spec.
70+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71+
pub struct ActiveMembership {
72+
/// The id of the log that applies this membership config
73+
pub log_id: LogId,
74+
75+
pub membership: MembershipConfig,
76+
}
77+
6378
/// The core type implementing the Raft protocol.
6479
pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
6580
/// This node's ID.
6681
id: NodeId,
6782
/// This node's runtime config.
6883
config: Arc<Config>,
6984
/// The cluster's current membership configuration.
70-
membership: MembershipConfig,
85+
membership: ActiveMembership,
7186
/// The `RaftNetwork` implementation.
7287
network: Arc<N>,
7388
/// The `RaftStorage` implementation.
@@ -152,7 +167,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
152167
let this = Self {
153168
id,
154169
config,
155-
membership,
170+
membership: ActiveMembership {
171+
log_id: LogId::default(),
172+
membership,
173+
},
156174
network,
157175
storage,
158176
target_state: State::Follower,
@@ -185,7 +203,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
185203
self.last_log_id = state.last_log_id;
186204
self.current_term = state.hard_state.current_term;
187205
self.voted_for = state.hard_state.voted_for;
188-
self.membership = state.membership;
206+
self.membership = state.last_membership.clone();
189207
self.last_applied = state.last_applied;
190208
// NOTE: this is repeated here for clarity. It is unsafe to initialize the node's commit
191209
// index to any other value. The commit index must be determined by a leader after
@@ -199,8 +217,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
199217
}
200218

201219
let has_log = self.last_log_id.index != u64::MIN;
202-
let single = self.membership.members.len() == 1;
203-
let is_voter = self.membership.contains(&self.id);
220+
let single = self.membership.membership.members.len() == 1;
221+
let is_voter = self.membership.membership.contains(&self.id);
204222

205223
self.target_state = match (has_log, single, is_voter) {
206224
// A restarted raft that already received some logs but was not yet added to a cluster.
@@ -292,7 +310,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
292310
/// Update core's target state, ensuring all invariants are upheld.
293311
#[tracing::instrument(level = "trace", skip(self))]
294312
fn set_target_state(&mut self, target_state: State) {
295-
if target_state == State::Follower && !self.membership.contains(&self.id) {
313+
if target_state == State::Follower && !self.membership.membership.contains(&self.id) {
296314
self.target_state = State::NonVoter;
297315
} else {
298316
self.target_state = target_state;
@@ -375,7 +393,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
375393

376394
/// Update the node's current membership config & save hard state.
377395
#[tracing::instrument(level = "trace", skip(self))]
378-
fn update_membership(&mut self, cfg: MembershipConfig) -> RaftResult<()> {
396+
fn update_membership(&mut self, cfg: ActiveMembership) -> RaftResult<()> {
379397
// If the given config does not contain this node's ID, it means one of the following:
380398
//
381399
// - the node is currently a non-voter and is replicating an old config to which it has
@@ -384,9 +402,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
384402
// transition to the non-voter state as a signal for when it is safe to shutdown a node
385403
// being removed.
386404
self.membership = cfg;
387-
if !self.membership.contains(&self.id) {
405+
if !self.membership.membership.contains(&self.id) {
388406
self.set_target_state(State::NonVoter);
389-
} else if self.target_state == State::NonVoter && self.membership.members.contains(&self.id) {
407+
} else if self.target_state == State::NonVoter && self.membership.membership.members.contains(&self.id) {
390408
// The node is a NonVoter and the new config has it configured as a normal member.
391409
// Transition to follower.
392410
self.set_target_state(State::Follower);
@@ -613,12 +631,9 @@ impl State {
613631
/// Volatile state specific to the Raft leader.
614632
struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
615633
pub(super) core: &'a mut RaftCore<D, R, N, S>,
634+
616635
/// A mapping of node IDs the replication state of the target node.
617636
pub(super) nodes: BTreeMap<NodeId, ReplicationState<D>>,
618-
/// A mapping of new nodes (non-voters) which are being synced in order to join the cluster.
619-
pub(super) non_voters: BTreeMap<NodeId, NonVoterReplicationState<D>>,
620-
/// A bool indicating if this node will be stepping down after committing the current config change.
621-
pub(super) is_stepping_down: bool,
622637

623638
/// The metrics about a leader
624639
pub leader_metrics: LeaderMetrics,
@@ -631,29 +646,18 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: Raf
631646

632647
/// A buffer of client requests which have been appended locally and are awaiting to be committed to the cluster.
633648
pub(super) awaiting_committed: Vec<ClientRequestEntry<D, R>>,
634-
635-
/// A field tracking the cluster's current consensus state, which is used for dynamic membership.
636-
pub(super) consensus_state: ConsensusState,
637649
}
638650

639651
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
640652
/// Create a new instance.
641653
pub(self) fn new(core: &'a mut RaftCore<D, R, N, S>) -> Self {
642-
let consensus_state = if core.membership.is_in_joint_consensus() {
643-
ConsensusState::Joint { is_committed: false }
644-
} else {
645-
ConsensusState::Uniform
646-
};
647654
let (replication_tx, replication_rx) = mpsc::unbounded_channel();
648655
Self {
649656
core,
650657
nodes: BTreeMap::new(),
651-
non_voters: BTreeMap::new(),
652-
is_stepping_down: false,
653658
leader_metrics: LeaderMetrics::default(),
654659
replication_tx,
655660
replication_rx,
656-
consensus_state,
657661
awaiting_committed: Vec::new(),
658662
}
659663
}
@@ -665,13 +669,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
665669
let targets = self
666670
.core
667671
.membership
672+
.membership
668673
.all_nodes()
669674
.into_iter()
670675
.filter(|elem| elem != &self.core.id)
671676
.collect::<Vec<_>>();
672677

673678
for target in targets {
674-
let state = self.spawn_replication_stream(target);
679+
let state = self.spawn_replication_stream(target, None);
675680
self.nodes.insert(target, state);
676681
}
677682

@@ -691,9 +696,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
691696
for node in self.nodes.values() {
692697
let _ = node.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
693698
}
694-
for node in self.non_voters.values() {
695-
let _ = node.state.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
696-
}
697699
return Ok(());
698700
}
699701

@@ -728,13 +730,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
728730
tracing::info!("leader recv from rx_api: Initialize");
729731
self.core.reject_init_with_config(tx);
730732
}
731-
RaftMsg::AddNonVoter{id, tx} => {
733+
RaftMsg::AddNonVoter{id, tx, blocking} => {
732734
tracing::info!("leader recv from rx_api: AddNonVoter, {}", id);
733-
self.add_member(id, tx);
735+
self.add_member(id, tx, blocking);
734736
}
735-
RaftMsg::ChangeMembership{members, tx} => {
737+
RaftMsg::ChangeMembership{members, blocking, tx} => {
736738
tracing::info!("leader recv from rx_api: ChangeMembership, {:?}", members);
737-
self.change_membership(members, tx).await;
739+
self.change_membership(members, blocking, tx).await;
738740
}
739741
}
740742
},
@@ -767,60 +769,27 @@ struct ReplicationState<D: AppData> {
767769
pub matched: LogId,
768770
pub remove_after_commit: Option<u64>,
769771
pub replstream: ReplicationStream<D>,
770-
}
771-
772-
/// The same as `ReplicationState`, except for non-voters.
773-
struct NonVoterReplicationState<D: AppData> {
774-
/// The replication stream state.
775-
pub state: ReplicationState<D>,
776-
/// A bool indicating if this non-voters is ready to join the cluster.
777-
pub is_ready_to_join: bool,
778772

779773
/// The response channel to use for when this node has successfully synced with the cluster.
780774
pub tx: Option<ResponseTx>,
781775
}
782776

783-
/// A state enum used by Raft leaders to navigate the joint consensus protocol.
784-
pub enum ConsensusState {
785-
/// The cluster is preparring to go into joint consensus, but the leader is still syncing
786-
/// some non-voters to prepare them for cluster membership.
787-
NonVoterSync {
788-
/// The set of non-voters nodes which are still being synced.
789-
awaiting: HashSet<NodeId>,
790-
/// The full membership change which has been proposed.
791-
members: BTreeSet<NodeId>,
792-
793-
/// The response channel to use once the consensus state is back into uniform state.
794-
tx: ResponseTx,
795-
},
796-
/// The cluster is in a joint consensus state and is syncing new nodes.
797-
Joint {
798-
/// A bool indicating if the associated config which started this joint consensus has yet been comitted.
799-
///
800-
/// NOTE: when a new leader is elected, it will initialize this value to false, and then
801-
/// update this value to true once the new leader's blank payload has been committed.
802-
is_committed: bool,
803-
},
804-
/// The cluster consensus is uniform; not in a joint consensus state.
805-
Uniform,
806-
}
777+
impl<D> ReplicationState<D>
778+
where D: AppData
779+
{
780+
// TODO(xp): make this a method of Config?
807781

808-
impl ConsensusState {
809-
/// Check the current state to determine if it is in joint consensus, and if it is safe to finalize the joint
810-
/// consensus.
811-
///
812-
/// The return value will be true if:
813-
/// 1. this object currently represents a joint consensus state.
814-
/// 2. the corresponding config for this consensus state has been committed to the cluster.
815-
pub fn is_joint_consensus_safe_to_finalize(&self) -> bool {
816-
match self {
817-
ConsensusState::Joint { is_committed } => *is_committed,
818-
_ => false,
819-
}
782+
/// Return true if the distance behind last_log_id is smaller than the threshold to join.
783+
pub fn is_line_rate(&self, last_log_id: &LogId, config: &Config) -> bool {
784+
is_matched_upto_date(&self.matched, last_log_id, config)
820785
}
821786
}
822787

823-
///////////////////////////////////////////////////////////////////////////////////////////////////
788+
pub fn is_matched_upto_date(matched: &LogId, last_log_id: &LogId, config: &Config) -> bool {
789+
let my_index = matched.index;
790+
let distance = last_log_id.index.saturating_sub(my_index);
791+
distance <= config.replication_lag_threshold
792+
}
824793

825794
/// Volatile state specific to a Raft node in candidate state.
826795
struct CandidateState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
@@ -857,8 +826,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
857826

858827
// Setup initial state per term.
859828
self.votes_granted_old = 1; // We must vote for ourselves per the Raft spec.
860-
self.votes_needed_old = ((self.core.membership.members.len() / 2) + 1) as u64; // Just need a majority.
861-
if let Some(nodes) = &self.core.membership.members_after_consensus {
829+
self.votes_needed_old = ((self.core.membership.membership.members.len() / 2) + 1) as u64; // Just need a majority.
830+
if let Some(nodes) = &self.core.membership.membership.members_after_consensus {
862831
self.votes_granted_new = 1; // We must vote for ourselves per the Raft spec.
863832
self.votes_needed_new = ((nodes.len() / 2) + 1) as u64; // Just need a majority.
864833
}

0 commit comments

Comments
 (0)