Skip to content

Commit a010fdd

Browse files
committed
Change: Stop replication to removed node at once when new membership is seen
Before this commit, when membership changes, e.g., from a joint config `[(1,2,3), (3,4,5)]` to uniform config `[3,4,5]`(assuming the leader is `3`), the leader stops replication to `1,2` when `[3,4,5]` is committed. This is an unnecessarily complicated solution. It is OK for the leader to stop replication to `1,2` as soon as config `[3,4,5]` is seen, instead of when config `[3,4,5]` is committed. - If the leader(`3`) finally committed `[3,4,5]`, it will eventually stop replication to `1,2`. - If the leader(`3`) crashes before committing `[3,4,5]`: - And a new leader sees the membership config log `[3,4,5]`, it will continue to commit it and finally stop replication to `1,2`. - Or a new leader does not see membership config log `[3,4,5]`, it will re-establish replication to `1,2`. In any case, stopping replication at once is OK. One of the considerations about this modification is: The nodes, e.g., `1,2` do not know they have been removed from the cluster: - Removed node will enter the candidate state and keeps increasing its term and electing itself. This won't affect the working cluster: - The nodes in the working cluster have greater logs; thus, the election will never succeed. - The leader won't try to communicate with the removed nodes thus it won't see their higher `term`. - Removed nodes should be shut down finally. No matter whether the leader replicates the membership without these removed nodes to them, there should always be an external process that shuts them down. Because there is no guarantee that a removed node can receive the membership log in a finite time. Changes: - Change: remove config `remove_replication`, since replication will be removed at once. - Refactor: Engine outputs `Command::UpdateReplicationStream` to inform the Runtime to update replication, when membership changes. - Refactor: remove `ReplicationState.failures`, replication does not need count failures to remove it. - Refactor: remove `ReplicationState.matched`: the **matched** log id has been tracked by `Engine.state.leader.progress`. - Fix: #446
1 parent d67199b commit a010fdd

19 files changed

+187
-337
lines changed

openraft/src/config/config.rs

-53
Original file line numberDiff line numberDiff line change
@@ -53,50 +53,6 @@ fn parse_snapshot_policy(src: &str) -> Result<SnapshotPolicy, ConfigError> {
5353
Ok(SnapshotPolicy::LogsSinceLast(n_logs))
5454
}
5555

56-
/// Policy to remove a replication.
57-
#[derive(Clone, Debug, PartialEq)]
58-
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
59-
pub enum RemoveReplicationPolicy {
60-
/// Leader will remove a replication to a node that is removed from membership,
61-
/// if the `committed` index advanced too many the index of the **uniform** membership log in which the node is
62-
/// removed.
63-
CommittedAdvance(u64),
64-
65-
/// Leader removes a replication if it encountered the specified number of network failures.
66-
MaxNetworkFailures(u64),
67-
}
68-
69-
fn parse_remove_replication_policy(src: &str) -> Result<RemoveReplicationPolicy, ConfigError> {
70-
let elts = src.split(':').collect::<Vec<_>>();
71-
if elts.len() != 2 {
72-
return Err(ConfigError::InvalidRemoveReplicationPolicy {
73-
syntax: "committed_advance:<num>|max_network_failures:<num>".to_string(),
74-
invalid: src.to_string(),
75-
});
76-
}
77-
78-
if elts[0] == "committed_advance" {
79-
let n_logs = elts[1].parse::<u64>().map_err(|e| ConfigError::InvalidNumber {
80-
invalid: src.to_string(),
81-
reason: e.to_string(),
82-
})?;
83-
return Ok(RemoveReplicationPolicy::CommittedAdvance(n_logs));
84-
}
85-
86-
if elts[0] == "max_network_failures" {
87-
let n = elts[1].parse::<u64>().map_err(|e| ConfigError::InvalidNumber {
88-
invalid: src.to_string(),
89-
reason: e.to_string(),
90-
})?;
91-
return Ok(RemoveReplicationPolicy::MaxNetworkFailures(n));
92-
}
93-
94-
Err(ConfigError::InvalidRemoveReplicationPolicy {
95-
syntax: "committed_advance:<num>|max_network_failures:<num>".to_string(),
96-
invalid: src.to_string(),
97-
})
98-
}
99-
10056
/// The runtime configuration for a Raft node.
10157
///
10258
/// The default values used by this type should generally work well for Raft clusters which will
@@ -178,15 +134,6 @@ pub struct Config {
178134
/// The minimal number of applied logs to purge in a batch.
179135
#[clap(long, default_value = "1")]
180136
pub purge_batch_size: u64,
181-
182-
/// Policy to remove a replication stream for an unreachable removed node.
183-
#[clap(
184-
long,
185-
env = "RAFT_FORCE_REMOVE_REPLICATION",
186-
default_value = "max_network_failures:10",
187-
parse(try_from_str=parse_remove_replication_policy)
188-
)]
189-
pub remove_replication: RemoveReplicationPolicy,
190137
}
191138

192139
impl Default for Config {

openraft/src/config/config_test.rs

-24
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::config::config::RemoveReplicationPolicy;
21
use crate::config::error::ConfigError;
32
use crate::Config;
43
use crate::SnapshotPolicy;
@@ -59,7 +58,6 @@ fn test_build() -> anyhow::Result<()> {
5958
"--snapshot-policy=since_last:203",
6059
"--snapshot-max-chunk-size=204",
6160
"--max-applied-log-to-keep=205",
62-
"--remove-replication=max_network_failures:206",
6361
"--purge-batch-size=207",
6462
])?;
6563

@@ -73,29 +71,7 @@ fn test_build() -> anyhow::Result<()> {
7371
assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy);
7472
assert_eq!(204, config.snapshot_max_chunk_size);
7573
assert_eq!(205, config.max_applied_log_to_keep);
76-
assert_eq!(
77-
RemoveReplicationPolicy::MaxNetworkFailures(206),
78-
config.remove_replication
79-
);
8074
assert_eq!(207, config.purge_batch_size);
8175

8276
Ok(())
8377
}
84-
85-
#[test]
86-
fn test_option_remove_replication() -> anyhow::Result<()> {
87-
let config = Config::build(&["foo", "--remove-replication=max_network_failures:206"])?;
88-
89-
assert_eq!(
90-
RemoveReplicationPolicy::MaxNetworkFailures(206),
91-
config.remove_replication
92-
);
93-
94-
let config = Config::build(&["foo", "--remove-replication=committed_advance:206"])?;
95-
96-
assert_eq!(
97-
RemoveReplicationPolicy::CommittedAdvance(206),
98-
config.remove_replication
99-
);
100-
Ok(())
101-
}

openraft/src/config/error.rs

-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,4 @@ pub enum ConfigError {
1818

1919
#[error("{reason} when parsing {invalid:?}")]
2020
InvalidNumber { invalid: String, reason: String },
21-
22-
#[error("remove replication policy string is invalid: '{invalid:?}' expect: '{syntax}'")]
23-
InvalidRemoveReplicationPolicy { invalid: String, syntax: String },
2421
}

openraft/src/config/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,5 @@ mod error;
44
#[cfg(test)] mod config_test;
55

66
pub use config::Config;
7-
pub use config::RemoveReplicationPolicy;
87
pub use config::SnapshotPolicy;
98
pub use error::ConfigError;

openraft/src/core/admin.rs

+40-147
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ use std::option::Option::None;
44

55
use tracing::Level;
66

7-
use crate::config::RemoveReplicationPolicy;
87
use crate::core::replication_state::replication_lag;
9-
use crate::core::replication_state::ReplicationState;
108
use crate::core::Expectation;
119
use crate::core::LeaderState;
1210
use crate::core::ServerState;
@@ -20,10 +18,10 @@ use crate::error::InProgress;
2018
use crate::error::LearnerIsLagging;
2119
use crate::error::LearnerNotFound;
2220
use crate::metrics::RemoveTarget;
21+
use crate::progress::Progress;
2322
use crate::raft::AddLearnerResponse;
2423
use crate::raft::ClientWriteResponse;
2524
use crate::raft::RaftRespTx;
26-
use crate::raft_types::LogIdOptionExt;
2725
use crate::raft_types::RaftLogId;
2826
use crate::runtime::RaftRuntime;
2927
use crate::summary::MessageSummary;
@@ -38,23 +36,6 @@ use crate::RaftTypeConfig;
3836
use crate::StorageError;
3937

4038
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
41-
// add node into learner,return true if the node is already a member or learner
42-
#[tracing::instrument(level = "debug", skip(self))]
43-
async fn write_add_learner_entry(
44-
&mut self,
45-
target: C::NodeId,
46-
node: Option<Node>,
47-
) -> Result<LogId<C::NodeId>, AddLearnerError<C::NodeId>> {
48-
let curr = &self.core.engine.state.membership_state.effective.membership;
49-
let new_membership = curr.add_learner(target, node)?;
50-
51-
tracing::debug!(?new_membership, "new_config");
52-
53-
let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?;
54-
55-
Ok(log_id)
56-
}
57-
5839
/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
5940
/// on the given channel.
6041
///
@@ -79,51 +60,41 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
7960

8061
// Ensure the node doesn't already exist in the current
8162
// config, in the set of new nodes already being synced, or in the nodes being removed.
82-
// TODO: remove this
83-
if target == self.core.id {
84-
tracing::debug!("target node is this node");
63+
64+
let curr = &self.core.engine.state.membership_state.effective;
65+
if curr.contains(&target) {
66+
let matched = if let Some(l) = &self.core.engine.state.leader {
67+
*l.progress.get(&target)
68+
} else {
69+
unreachable!("it has to be a leader!!!");
70+
};
71+
72+
tracing::debug!(
73+
"target {:?} already member or learner, can't add; matched:{:?}",
74+
target,
75+
matched
76+
);
8577

8678
let _ = tx.send(Ok(AddLearnerResponse {
8779
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
88-
matched: self.core.engine.state.last_log_id(),
80+
matched,
8981
}));
9082
return Ok(());
9183
}
9284

93-
let curr = &self.core.engine.state.membership_state.effective;
94-
if curr.contains(&target) {
95-
tracing::debug!("target {:?} already member or learner, can't add", target);
96-
97-
if let Some(t) = self.nodes.get(&target) {
98-
tracing::debug!("target node is already a cluster member or is being synced");
99-
let _ = tx.send(Ok(AddLearnerResponse {
100-
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
101-
matched: t.matched,
102-
}));
103-
return Ok(());
104-
} else {
105-
unreachable!(
106-
"node {} in membership but there is no replication stream for it",
107-
target
108-
)
109-
}
110-
}
111-
112-
// TODO(xp): when new membership log is appended, write_entry() should be responsible to setup new replication
113-
// stream.
114-
let res = self.write_add_learner_entry(target, node).await;
115-
let log_id = match res {
85+
let curr = &self.core.engine.state.membership_state.effective.membership;
86+
let res = curr.add_learner(target, node);
87+
let new_membership = match res {
11688
Ok(x) => x,
11789
Err(e) => {
118-
let _ = tx.send(Err(e));
90+
let _ = tx.send(Err(AddLearnerError::MissingNodeInfo(e)));
11991
return Ok(());
12092
}
12193
};
12294

123-
// TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for
124-
// sending vote requests etc?
125-
let state = self.spawn_replication_stream(target).await;
126-
self.nodes.insert(target, state);
95+
tracing::debug!(?new_membership, "new_membership with added learner: {}", target);
96+
97+
let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?;
12798

12899
tracing::debug!(
129100
"after add target node {} as learner {:?}",
@@ -238,7 +209,12 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
238209
Expectation::AtLineRate => {
239210
// Expect to be at line rate but not.
240211

241-
let matched = self.nodes.get(node_id).map(|x| x.matched).unwrap();
212+
let matched = if let Some(l) = &self.core.engine.state.leader {
213+
*l.progress.get(node_id)
214+
} else {
215+
unreachable!("it has to be a leader!!!");
216+
};
217+
242218
let distance = replication_lag(&matched, &last_log_id);
243219

244220
if distance <= self.core.config.replication_lag_threshold {
@@ -317,64 +293,28 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
317293
/// This is ony called by leader.
318294
#[tracing::instrument(level = "debug", skip(self))]
319295
pub(super) async fn handle_uniform_consensus_committed(&mut self, log_id: &LogId<C::NodeId>) {
320-
let index = log_id.index;
321-
322296
// Step down if needed.
323-
if !self.core.engine.state.membership_state.effective.membership.is_voter(&self.core.id) {
297+
298+
let _ = log_id;
299+
300+
// TODO: Leader does not need to step down. It can keep working.
301+
// This requires to separate Leader(Proposer) and Acceptors.
302+
if !self.core.engine.state.membership_state.effective.is_voter(&self.core.id) {
324303
tracing::debug!("raft node is stepping down");
325304

326305
// TODO(xp): transfer leadership
327306
self.core.set_target_state(ServerState::Learner);
328-
return;
307+
self.core.engine.metrics_flags.set_cluster_changed();
329308
}
330-
331-
let membership = &self.core.engine.state.membership_state.effective.membership;
332-
333-
// remove nodes which not included in nodes and learners
334-
for (id, state) in self.nodes.iter_mut() {
335-
if membership.contains(id) {
336-
continue;
337-
}
338-
339-
tracing::info!(
340-
"set remove_after_commit for {} = {}, membership: {:?}",
341-
id,
342-
index,
343-
self.core.engine.state.membership_state.effective
344-
);
345-
346-
state.remove_since = Some(index)
347-
}
348-
349-
let targets = self.nodes.keys().cloned().collect::<Vec<_>>();
350-
for target in targets {
351-
self.try_remove_replication(target).await;
352-
}
353-
354-
self.core.engine.metrics_flags.set_replication_changed();
355309
}
356310

357311
/// Remove a replication if the membership that does not include it has committed.
358312
///
359313
/// Return true if removed.
360314
#[tracing::instrument(level = "trace", skip(self))]
361-
pub async fn try_remove_replication(&mut self, target: C::NodeId) -> bool {
362-
tracing::debug!(target = display(target), "try_remove_replication");
315+
pub async fn remove_replication(&mut self, target: C::NodeId) -> bool {
316+
tracing::info!("removed_replication to: {}", target);
363317

364-
{
365-
let n = self.nodes.get(&target);
366-
367-
if let Some(n) = n {
368-
if !self.need_to_remove_replication(n) {
369-
return false;
370-
}
371-
} else {
372-
tracing::warn!("trying to remove absent replication to {}", target);
373-
return false;
374-
}
375-
}
376-
377-
tracing::info!("removed replication to: {}", target);
378318
let repl_state = self.nodes.remove(&target);
379319
if let Some(s) = repl_state {
380320
let handle = s.repl_stream.handle;
@@ -385,6 +325,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
385325
tracing::debug!("joining removed replication: {}", target);
386326
let _x = handle.await;
387327
tracing::info!("Done joining removed replication : {}", target);
328+
} else {
329+
unreachable!("try to nonexistent replication to {}", target);
388330
}
389331

390332
self.replication_metrics.update(RemoveTarget { target });
@@ -394,53 +336,4 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
394336

395337
true
396338
}
397-
398-
fn need_to_remove_replication(&self, node: &ReplicationState<C::NodeId>) -> bool {
399-
tracing::debug!(node=?node, "check if to remove a replication");
400-
401-
let cfg = &self.core.config;
402-
let policy = &cfg.remove_replication;
403-
404-
let st = &self.core.engine.state;
405-
let committed = st.committed;
406-
407-
// `remove_since` is set only when the uniform membership log is committed.
408-
// Do not remove replication if it is not committed.
409-
let since = if let Some(since) = node.remove_since {
410-
since
411-
} else {
412-
return false;
413-
};
414-
415-
if node.matched.index() >= Some(since) {
416-
tracing::debug!(
417-
node = debug(node),
418-
committed = debug(committed),
419-
"remove replication: uniform membership log committed and replicated to target"
420-
);
421-
return true;
422-
}
423-
424-
match policy {
425-
RemoveReplicationPolicy::CommittedAdvance(n) => {
426-
// TODO(xp): test this. but not for now. It is meaningless without blank-log heartbeat.
427-
if committed.next_index() - since > *n {
428-
tracing::debug!(
429-
node = debug(node),
430-
committed = debug(committed),
431-
"remove replication: committed index is head of remove_since too much"
432-
);
433-
return true;
434-
}
435-
}
436-
RemoveReplicationPolicy::MaxNetworkFailures(n) => {
437-
if node.failures >= *n {
438-
tracing::debug!(node = debug(node), "remove replication: too many replication failure");
439-
return true;
440-
}
441-
}
442-
}
443-
444-
false
445-
}
446339
}

0 commit comments

Comments
 (0)