Skip to content

Commit c6fe29d

Browse files
committed
Change: change-membership does not return error when replication lags
If `blocking` is `true`, `Raft::change_membership(..., blocking)` will block until repliication to new nodes become upto date. But it won't return an error when proposing change-membership log. - Change: remove two errors: `LearnerIsLagging` and `LearnerNotFound`. - Fix: #581
1 parent 2896b98 commit c6fe29d

File tree

5 files changed

+42
-102
lines changed

5 files changed

+42
-102
lines changed

openraft/src/core/admin.rs

-45
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ use crate::error::ClientWriteError;
1111
use crate::error::EmptyMembership;
1212
use crate::error::InProgress;
1313
use crate::error::InitializeError;
14-
use crate::error::LearnerIsLagging;
15-
use crate::error::LearnerNotFound;
1614
use crate::error::RemoveLearnerError;
1715
use crate::raft::AddLearnerResponse;
1816
use crate::raft::ClientWriteResponse;
@@ -144,7 +142,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
144142
pub(super) async fn change_membership(
145143
&mut self,
146144
members: BTreeSet<NodeId>,
147-
blocking: bool,
148145
tx: RaftRespTx<ClientWriteResponse<R>, ClientWriteError>,
149146
) -> Result<(), StorageError> {
150147
tracing::info!("change_membership: members: {:?}", members);
@@ -174,48 +171,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
174171

175172
tracing::info!("change_membership: new_config: {:?}", new_config);
176173

177-
// Check the proposed config for any new nodes. If ALL new nodes already have replication
178-
// streams AND are ready to join, then we can immediately proceed with entering joint
179-
// consensus. Else, new nodes need to first be brought up-to-speed.
180-
//
181-
// Here, all we do is check to see which nodes still need to be synced, which determines
182-
// if we can proceed.
183-
184-
// TODO(xp): test change membership without adding as learner.
185-
186-
// TODO(xp): 111 test adding a node that is not learner.
187-
// TODO(xp): 111 test adding a node that is lagging.
188-
for new_node in members.difference(curr.all_nodes()) {
189-
match self.nodes.get(new_node) {
190-
Some(node) => {
191-
if node.is_line_rate(&self.core.last_log_id, &self.core.config) {
192-
// Node is ready to join.
193-
continue;
194-
}
195-
196-
if !blocking {
197-
// Node has repl stream, but is not yet ready to join.
198-
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
199-
ChangeMembershipError::LearnerIsLagging(LearnerIsLagging {
200-
node_id: *new_node,
201-
matched: node.matched,
202-
distance: self.core.last_log_id.next_index().saturating_sub(node.matched.next_index()),
203-
}),
204-
)));
205-
return Ok(());
206-
}
207-
}
208-
209-
// Node does not yet have a repl stream, spawn one.
210-
None => {
211-
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
212-
ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: *new_node }),
213-
)));
214-
return Ok(());
215-
}
216-
}
217-
}
218-
219174
self.append_membership_log(new_config, Some(tx)).await?;
220175
Ok(())
221176
}

openraft/src/core/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -809,8 +809,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
809809
RaftMsg::RemoveLearner { id, tx } => {
810810
self.remove_learner(id, tx);
811811
}
812-
RaftMsg::ChangeMembership { members, blocking, tx } => {
813-
self.change_membership(members, blocking, tx).await?;
812+
RaftMsg::ChangeMembership { members, tx } => {
813+
self.change_membership(members, tx).await?;
814814
}
815815
RaftMsg::ForceSnapshotting { tx } => {
816816
for node in self.nodes.values() {

openraft/src/error.rs

-20
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,6 @@ pub enum ChangeMembershipError {
102102

103103
#[error(transparent)]
104104
EmptyMembership(#[from] EmptyMembership),
105-
106-
#[error(transparent)]
107-
LearnerNotFound(#[from] LearnerNotFound),
108-
109-
#[error(transparent)]
110-
LearnerIsLagging(#[from] LearnerIsLagging),
111105
}
112106

113107
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
@@ -261,20 +255,6 @@ pub struct InProgress {
261255
pub membership_log_id: LogId,
262256
}
263257

264-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
265-
#[error("to add a member {node_id} first need to add it as learner")]
266-
pub struct LearnerNotFound {
267-
pub node_id: NodeId,
268-
}
269-
270-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
271-
#[error("replication to learner {node_id} is lagging {distance}, matched: {matched:?}, can not add as member")]
272-
pub struct LearnerIsLagging {
273-
pub node_id: NodeId,
274-
pub matched: Option<LogId>,
275-
pub distance: u64,
276-
}
277-
278258
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
279259
#[error("new membership can not be empty")]
280260
pub struct EmptyMembership {}

openraft/src/raft.rs

+15-17
Original file line numberDiff line numberDiff line change
@@ -284,14 +284,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
284284
/// If a node in the proposed config but is not yet a voter or learner, it first calls `add_learner` to setup
285285
/// replication to the new node.
286286
///
287-
/// Internal:
288-
/// - It proposes a **joint** config.
289-
/// - When the **joint** config is committed, it proposes a uniform config.
290-
///
291-
/// If blocking is true, it blocks until every learner becomes up to date.
292-
/// Otherwise it returns error `ChangeMembershipError::LearnerIsLagging` if there is a lagging learner.
293-
///
294-
/// If it lost leadership or crashed before committing the second **uniform** config log, the cluster is left in the
287+
/// Internally it commits two logs to get this task done:
288+
/// - Proposes a **joint** config, e.g. `[{1,2,3}, {4,5,6}]` if trying to change config from `{1,2,3}` to `{4,5,6}`.
289+
/// - When the **joint** config is committed, then proposes a **uniform** config, e.g. `{4,5,6}`.
290+
///
291+
/// If `blocking` is true, it does not propose the membership log until replication to every learner becomes up to
292+
/// date.
293+
/// Otherwise it just proposes membership log at once. In this case, the change-membership log may be
294+
/// committed very quickly: if the **old** cluster constitutes a quorum.
295+
/// E.g., when changing from `[1,2,3,4]` to `[1,2,3,4,5]`, `[1,2,3]` is a quorum in both the old and the new
296+
/// cluster. Then it will return before the replication to node-5 becomes up to date.
297+
///
298+
/// If this node crashes before committing the second **uniform** config log, the cluster may be left in the
295299
/// **joint** config.
296300
#[tracing::instrument(level = "info", skip_all)]
297301
pub async fn change_membership(
@@ -330,7 +334,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
330334
.call_core(
331335
RaftMsg::ChangeMembership {
332336
members: members.clone(),
333-
blocking,
334337
tx,
335338
},
336339
rx,
@@ -350,7 +353,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
350353
tracing::info!("the second step is to change to uniform config: {:?}", members);
351354

352355
let (tx, rx) = oneshot::channel();
353-
let res = self.call_core(RaftMsg::ChangeMembership { members, blocking, tx }, rx).await?;
356+
let res = self.call_core(RaftMsg::ChangeMembership { members, tx }, rx).await?;
354357

355358
tracing::info!("res of second change_membership: {}", res.summary());
356359

@@ -500,11 +503,6 @@ pub(crate) enum RaftMsg<D: AppData, R: AppDataResponse> {
500503
},
501504
ChangeMembership {
502505
members: BTreeSet<NodeId>,
503-
/// with blocking==false, respond to client a ChangeMembershipError::LearnerIsLagging error at once if a
504-
/// non-member is lagging.
505-
///
506-
/// Otherwise, wait for commit of the member change log.
507-
blocking: bool,
508506
tx: RaftRespTx<ClientWriteResponse<R>, ClientWriteError>,
509507
},
510508
/// Force to switch to snapshot replication to every target.
@@ -542,8 +540,8 @@ where
542540
RaftMsg::RemoveLearner { id, .. } => {
543541
format!("RemoveLearner: id: {}", id)
544542
}
545-
RaftMsg::ChangeMembership { members, blocking, .. } => {
546-
format!("ChangeMembership: members: {:?}, blocking: {}", members, blocking)
543+
RaftMsg::ChangeMembership { members, .. } => {
544+
format!("ChangeMembership: members: {:?}", members)
547545
}
548546
RaftMsg::ForceSnapshotting { .. } => "ForceSnapshotting".to_string(),
549547
}

openraft/tests/membership/t20_change_membership.rs

+25-18
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
use std::convert::TryInto;
21
use std::sync::Arc;
32
use std::time::Duration;
43

54
use maplit::btreeset;
6-
use openraft::error::ChangeMembershipError;
75
use openraft::Config;
86
use openraft::StorageHelper;
97

@@ -59,7 +57,7 @@ async fn change_with_new_learner_blocking() -> anyhow::Result<()> {
5957

6058
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
6159
async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> {
62-
// Add a learner into membership config, expect error NonVoterIsLagging.
60+
// Add a learner into membership config.
6361

6462
let (_log_guard, ut_span) = init_ut!();
6563
let _ent = ut_span.enter();
@@ -90,27 +88,36 @@ async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> {
9088
router.wait(&0, timeout()).await?.log(Some(log_index), "received 500 logs").await?;
9189
}
9290

93-
tracing::info!("--- restore replication and change membership at once, expect NonVoterIsLagging");
91+
tracing::info!(
92+
"--- restore replication and change membership at once, it still blocks until logs are replicated to node-1"
93+
);
9494
{
9595
router.restore_node(1).await;
9696
let res = router.change_membership_with_blocking(0, btreeset! {0,1}, false).await;
97+
log_index += 2;
9798

9899
tracing::info!("--- got res: {:?}", res);
100+
assert!(res.is_ok());
101+
router.wait(&1, timeout()).await?.log(Some(log_index), "received 500+2 logs").await?;
102+
}
99103

100-
let err = res.unwrap_err();
101-
let err: ChangeMembershipError = err.try_into().unwrap();
102-
103-
match err {
104-
ChangeMembershipError::LearnerIsLagging(e) => {
105-
tracing::info!(e.distance, "--- distance");
106-
assert_eq!(1, e.node_id);
107-
assert!(e.distance >= lag_threshold);
108-
assert!(e.distance < 500);
109-
}
110-
_ => {
111-
panic!("expect ChangeMembershipError::NonVoterNotFound");
112-
}
113-
}
104+
tracing::info!("--- add node-3, with blocking=false, won't block, because [0,1] is a quorum");
105+
{
106+
router.new_raft_node(2).await;
107+
108+
router.change_membership_with_blocking(0, btreeset! {0,1,2}, false).await?;
109+
log_index += 2;
110+
let m = router.get_metrics(&2).await?;
111+
assert!(m.last_log_index < Some(log_index));
112+
}
113+
114+
tracing::info!("--- make sure replication to node-3 works as expected");
115+
{
116+
router
117+
.wait(&2, timeout())
118+
.await?
119+
.log(Some(log_index), "received all logs, replication works")
120+
.await?;
114121
}
115122

116123
Ok(())

0 commit comments

Comments
 (0)