Skip to content

Commit c8fccb2

Browse files
committed
Fix: when adding a learner, ensure the last membership is committed
Previously, when adding a learner to a Raft cluster, the last membership was not always marked as committed, which could cause issues when a follower tried to truncate logs by reverting to the last committed membership. To prevent this issue, we have updated the code to ensure the last membership is committed when adding a learner. In addition to this fix, we have also made several refactoring changes, including refining method names for trait `Coherent`, renaming `Membership::next_safe()` to `next_coherent()` for consistency, and updating enum `ChangeMembers` to include more variants for adding and removing learners. We have also removed `RaftCore::add_learner()` in favor of using `change_membership()` for all membership operations, and added a `ChangeHandler` to build new membership configurations for change-membership requests. Finally, we have updated the `Membership` API with a new method `new_with_nodes()` for building a new membership configuration, and moved the validation check out into a separate function, `ensure_valid()`. Validation is now done only when needed.
1 parent 0b145e2 commit c8fccb2

15 files changed

+660
-358
lines changed

openraft/src/change_members.rs

+12-17
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,30 @@
1+
use std::collections::BTreeMap;
12
use std::collections::BTreeSet;
23

4+
use crate::Node;
35
use crate::NodeId;
46

57
#[derive(Debug, Clone)]
68
#[derive(PartialEq, Eq)]
79
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
8-
pub enum ChangeMembers<NID: NodeId> {
9-
Add(BTreeSet<NID>),
10-
Remove(BTreeSet<NID>),
11-
Replace(BTreeSet<NID>),
10+
pub enum ChangeMembers<NID: NodeId, N: Node> {
11+
AddVoter(BTreeSet<NID>),
12+
RemoveVoter(BTreeSet<NID>),
13+
ReplaceAllVoters(BTreeSet<NID>),
14+
AddNodes(BTreeMap<NID, N>),
15+
RemoveNodes(BTreeSet<NID>),
16+
ReplaceAllNodes(BTreeMap<NID, N>),
1217
}
1318

1419
/// Convert a series of ids to a `Replace` operation.
15-
impl<NID, I> From<I> for ChangeMembers<NID>
20+
impl<NID, N, I> From<I> for ChangeMembers<NID, N>
1621
where
1722
NID: NodeId,
23+
N: Node,
1824
I: IntoIterator<Item = NID>,
1925
{
2026
fn from(r: I) -> Self {
2127
let ids = r.into_iter().collect::<BTreeSet<NID>>();
22-
ChangeMembers::Replace(ids)
23-
}
24-
}
25-
26-
impl<NID: NodeId> ChangeMembers<NID> {
27-
/// Apply the `ChangeMembers` to `old` node set, return new node set
28-
pub fn apply_to(self, old: &BTreeSet<NID>) -> BTreeSet<NID> {
29-
match self {
30-
ChangeMembers::Replace(c) => c,
31-
ChangeMembers::Add(add_members) => old.union(&add_members).cloned().collect::<BTreeSet<_>>(),
32-
ChangeMembers::Remove(remove_members) => old.difference(&remove_members).cloned().collect::<BTreeSet<_>>(),
33-
}
28+
ChangeMembers::ReplaceAllVoters(ids)
3429
}
3530
}

openraft/src/core/raft_core.rs

+21-45
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use futures::future::Either;
1111
use futures::stream::FuturesUnordered;
1212
use futures::StreamExt;
1313
use futures::TryFutureExt;
14+
use maplit::btreemap;
1415
use maplit::btreeset;
1516
use pin_utils::pin_mut;
1617
use tokio::io::AsyncRead;
@@ -350,48 +351,31 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
350351
Ok(())
351352
}
352353

353-
/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
354-
/// on the given channel.
354+
/// Submit change-membership by writing a Membership log entry.
355355
///
356-
/// Adding a learner does not affect election, thus it does not need to enter joint consensus.
356+
/// If `retain` is `true`, removed `voter` will becomes `learner`. Otherwise they will
357+
/// be just removed.
357358
///
358-
/// TODO: It has to wait for the previous membership to commit.
359-
/// TODO: Otherwise a second proposed membership implies the previous one is committed.
360-
/// TODO: Test it.
361-
/// TODO: This limit can be removed if membership_state is replaced by a list of membership
362-
/// logs. TODO: Because allowing this requires the engine to be able to store more than 2
363-
/// membership logs. And it does not need to wait for the previous membership log to commit
364-
/// to propose the new membership log.
365-
#[tracing::instrument(level = "debug", skip_all)]
366-
pub(super) async fn add_learner(
367-
&mut self,
368-
target: C::NodeId,
369-
node: C::Node,
370-
tx: ClientWriteTx<C>,
371-
) -> Result<(), Fatal<C::NodeId>> {
372-
// TODO: move these logic to Engine?
373-
let curr = &self.engine.state.membership_state.effective().membership;
374-
let new_membership = curr.add_learner(target, node);
375-
376-
tracing::debug!(?new_membership, "new_membership with added learner: {}", target);
377-
378-
self.write_entry(EntryPayload::Membership(new_membership), Some(tx)).await?;
379-
380-
Ok(())
381-
}
382-
383-
/// Submit change-membership by writing a Membership log entry, if the `expect` is satisfied.
359+
/// Changing membership includes changing voters config or adding/removing learners:
384360
///
385-
/// If `turn_to_learner` is `true`, removed `voter` will becomes `learner`. Otherwise they will
386-
/// be just removed.
361+
/// - To change voters config, it will build a new **joint** config. If it already a joint
362+
/// config, it returns the final uniform config.
363+
/// - Adding a learner does not affect election, thus it does not need to enter joint consensus.
364+
/// But it still has to wait for the previous membership to commit. Otherwise a second
365+
/// proposed membership implies the previous one is committed.
366+
// ---
367+
// TODO: This limit can be removed if membership_state is replaced by a list of membership logs.
368+
// Because allowing this requires the engine to be able to store more than 2
369+
// membership logs. And it does not need to wait for the previous membership log to commit
370+
// to propose the new membership log.
387371
#[tracing::instrument(level = "debug", skip(self, tx))]
388372
pub(super) async fn change_membership(
389373
&mut self,
390-
changes: ChangeMembers<C::NodeId>,
391-
turn_to_learner: bool,
374+
changes: ChangeMembers<C::NodeId, C::Node>,
375+
retain: bool,
392376
tx: RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
393377
) -> Result<(), Fatal<C::NodeId>> {
394-
let res = self.engine.state.membership_state.create_updated_membership(changes, turn_to_learner);
378+
let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
395379
let new_membership = match res {
396380
Ok(x) => x,
397381
Err(e) => {
@@ -1046,18 +1030,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
10461030
self.handle_initialize(members, tx).await?;
10471031
}
10481032
RaftMsg::AddLearner { id, node, tx } => {
1049-
if self.engine.state.is_leader(&self.engine.config.id) {
1050-
self.add_learner(id, node, tx).await?;
1051-
} else {
1052-
self.reject_with_forward_to_leader(tx);
1053-
}
1033+
self.change_membership(ChangeMembers::AddNodes(btreemap! {id=>node}), true, tx).await?;
10541034
}
1055-
RaftMsg::ChangeMembership {
1056-
changes,
1057-
turn_to_learner,
1058-
tx,
1059-
} => {
1060-
self.change_membership(changes, turn_to_learner, tx).await?;
1035+
RaftMsg::ChangeMembership { changes, retain, tx } => {
1036+
self.change_membership(changes, retain, tx).await?;
10611037
}
10621038
RaftMsg::ExternalRequest { req } => {
10631039
req(&self.engine.state, &mut self.storage, &mut self.network);
+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
use crate::error::ChangeMembershipError;
2+
use crate::error::InProgress;
3+
use crate::ChangeMembers;
4+
use crate::Membership;
5+
use crate::MembershipState;
6+
use crate::Node;
7+
use crate::NodeId;
8+
9+
pub(crate) struct ChangeHandler<'m, NID, N>
10+
where
11+
NID: NodeId,
12+
N: Node,
13+
{
14+
pub(crate) state: &'m MembershipState<NID, N>,
15+
}
16+
17+
impl<'m, NID, N> ChangeHandler<'m, NID, N>
18+
where
19+
NID: NodeId,
20+
N: Node,
21+
{
22+
/// Builds a new membership configuration by applying changes to the current configuration.
23+
///
24+
/// * `changes`: The changes to apply to the current membership configuration.
25+
/// * `retain` specifies whether to retain the removed voters as a learners, i.e., nodes that
26+
/// continue to receive log replication from the leader.
27+
///
28+
/// A Result containing the new membership configuration if the operation succeeds, or a
29+
/// `ChangeMembershipError` if an error occurs.
30+
///
31+
/// This function ensures that the cluster will have at least one voter in the new membership
32+
/// configuration.
33+
pub(crate) fn apply(
34+
&self,
35+
change: ChangeMembers<NID, N>,
36+
retain: bool,
37+
) -> Result<Membership<NID, N>, ChangeMembershipError<NID>> {
38+
self.ensure_committed()?;
39+
40+
let new_membership = self.state.effective().membership.clone().change(change, retain)?;
41+
Ok(new_membership)
42+
}
43+
44+
/// Ensures that the latest membership has been committed.
45+
///
46+
/// Returns Ok if the last membership is committed, or an InProgress error
47+
/// otherwise, to indicate a change-membership request should be rejected.
48+
pub(crate) fn ensure_committed(&self) -> Result<(), InProgress<NID>> {
49+
let effective = self.state.effective();
50+
let committed = self.state.committed();
51+
52+
if effective.log_id == committed.log_id {
53+
// Ok: last membership(effective) is committed
54+
Ok(())
55+
} else {
56+
Err(InProgress {
57+
committed: committed.log_id,
58+
membership_log_id: effective.log_id,
59+
})
60+
}
61+
}
62+
}
63+
64+
#[cfg(test)]
65+
mod tests {
66+
use std::sync::Arc;
67+
68+
use maplit::btreemap;
69+
use maplit::btreeset;
70+
71+
use crate::error::ChangeMembershipError;
72+
use crate::error::EmptyMembership;
73+
use crate::error::InProgress;
74+
use crate::error::LearnerNotFound;
75+
use crate::testing::log_id;
76+
use crate::ChangeMembers;
77+
use crate::EffectiveMembership;
78+
use crate::Membership;
79+
use crate::MembershipState;
80+
81+
/// Create an Arc<EffectiveMembership>
82+
fn effmem(term: u64, index: u64, m: Membership<u64, ()>) -> Arc<EffectiveMembership<u64, ()>> {
83+
let lid = Some(log_id(term, index));
84+
Arc::new(EffectiveMembership::new(lid, m))
85+
}
86+
87+
fn m1() -> Membership<u64, ()> {
88+
Membership::new(vec![btreeset! {1}], None)
89+
}
90+
91+
fn m12() -> Membership<u64, ()> {
92+
Membership::new(vec![btreeset! {1,2}], None)
93+
}
94+
95+
fn m123_345() -> Membership<u64, ()> {
96+
Membership::new(vec![btreeset! {1,2,3}, btreeset! {3,4,5}], None)
97+
}
98+
99+
#[test]
100+
fn test_apply_not_committed() -> anyhow::Result<()> {
101+
let new = || MembershipState::new(effmem(2, 2, m1()), effmem(3, 4, m123_345()));
102+
let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {1}), false);
103+
104+
assert_eq!(
105+
Err(ChangeMembershipError::InProgress(InProgress {
106+
committed: Some(log_id(2, 2)),
107+
membership_log_id: Some(log_id(3, 4))
108+
})),
109+
res
110+
);
111+
112+
Ok(())
113+
}
114+
115+
#[test]
116+
fn test_apply_empty_voters() -> anyhow::Result<()> {
117+
let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1()));
118+
let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1}), false);
119+
120+
assert_eq!(Err(ChangeMembershipError::EmptyMembership(EmptyMembership {})), res);
121+
122+
Ok(())
123+
}
124+
125+
#[test]
126+
fn test_apply_learner_not_found() -> anyhow::Result<()> {
127+
let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1()));
128+
let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {2}), false);
129+
130+
assert_eq!(
131+
Err(ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: 2 })),
132+
res
133+
);
134+
135+
Ok(())
136+
}
137+
138+
#[test]
139+
fn test_apply_retain_learner() -> anyhow::Result<()> {
140+
let new = || MembershipState::new(effmem(3, 4, m12()), effmem(3, 4, m123_345()));
141+
142+
// Do not leave removed voters as learner
143+
let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), false);
144+
assert_eq!(
145+
Ok(Membership::new(vec![btreeset! {3,4,5}], btreemap! {3=>(),4=>(),5=>()})),
146+
res
147+
);
148+
149+
// Leave removed voters as learner
150+
let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), true);
151+
assert_eq!(
152+
Ok(Membership::new(
153+
vec![btreeset! {3,4,5}],
154+
btreemap! {1=>(),2=>(),3=>(),4=>(),5=>()}
155+
)),
156+
res
157+
);
158+
159+
Ok(())
160+
}
161+
}

0 commit comments

Comments
 (0)