Skip to content

Commit 01a16d0

Browse files
committed
Change: remove tx from spawn_replication_stream()
Replication should not be responsible invoke the callback when replication become upto date. It makes the logic dirty. Such a job can be done by watching the metrics change. - Change: API: AddLearnerResponse has a new field `membership_log_id` which is the log id of the membership log that contains the newly added learner.
1 parent 54ebed1 commit 01a16d0

File tree

6 files changed

+145
-80
lines changed

6 files changed

+145
-80
lines changed

openraft/src/core/admin.rs

+35-27
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::raft::AddLearnerResponse;
2323
use crate::raft::ClientWriteResponse;
2424
use crate::raft::RaftRespTx;
2525
use crate::raft_types::LogIdOptionExt;
26+
use crate::raft_types::RaftLogId;
2627
use crate::runtime::RaftRuntime;
2728
use crate::versioned::Updatable;
2829
use crate::ChangeMembers;
@@ -40,15 +41,15 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
4041
&mut self,
4142
target: C::NodeId,
4243
node: Option<Node>,
43-
) -> Result<(), AddLearnerError<C::NodeId>> {
44+
) -> Result<LogId<C::NodeId>, AddLearnerError<C::NodeId>> {
4445
let curr = &self.core.engine.state.membership_state.effective.membership;
4546
let new_membership = curr.add_learner(target, node)?;
4647

4748
tracing::debug!(?new_membership, "new_config");
4849

49-
self.write_entry(EntryPayload::Membership(new_membership), None).await?;
50+
let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?;
5051

51-
Ok(())
52+
Ok(log_id)
5253
}
5354

5455
/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
@@ -60,24 +61,26 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
6061
///
6162
/// If `blocking` is `true`, the result is sent to `tx` as the target node log has caught up. Otherwise, result is
6263
/// sent at once, no matter whether the target node log is lagging or not.
63-
#[tracing::instrument(level = "debug", skip(self, tx))]
64+
#[tracing::instrument(level = "debug", skip(self))]
6465
pub(super) async fn add_learner(
6566
&mut self,
6667
target: C::NodeId,
6768
node: Option<Node>,
6869
tx: RaftRespTx<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>>,
69-
blocking: bool,
70-
) {
70+
) -> Result<(), Fatal<C::NodeId>> {
7171
tracing::debug!("add target node {} as learner {:?}", target, self.nodes.keys());
7272

7373
// Ensure the node doesn't already exist in the current
7474
// config, in the set of new nodes already being synced, or in the nodes being removed.
75+
// TODO: remove this
7576
if target == self.core.id {
7677
tracing::debug!("target node is this node");
78+
7779
let _ = tx.send(Ok(AddLearnerResponse {
80+
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
7881
matched: self.core.engine.state.last_log_id(),
7982
}));
80-
return;
83+
return Ok(());
8184
}
8285

8386
let curr = &self.core.engine.state.membership_state.effective;
@@ -86,8 +89,11 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
8689

8790
if let Some(t) = self.nodes.get(&target) {
8891
tracing::debug!("target node is already a cluster member or is being synced");
89-
let _ = tx.send(Ok(AddLearnerResponse { matched: t.matched }));
90-
return;
92+
let _ = tx.send(Ok(AddLearnerResponse {
93+
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
94+
matched: t.matched,
95+
}));
96+
return Ok(());
9197
} else {
9298
unreachable!(
9399
"node {} in membership but there is no replication stream for it",
@@ -99,29 +105,31 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
99105
// TODO(xp): when new membership log is appended, write_entry() should be responsible to setup new replication
100106
// stream.
101107
let res = self.write_add_learner_entry(target, node).await;
102-
if let Err(e) = res {
103-
let _ = tx.send(Err(e));
104-
return;
105-
}
106-
107-
if blocking {
108-
let state = self.spawn_replication_stream(target, Some(tx)).await;
109-
// TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for
110-
// sending vote requests etc?
111-
self.nodes.insert(target, state);
112-
} else {
113-
let state = self.spawn_replication_stream(target, None).await;
114-
self.nodes.insert(target, state);
108+
let log_id = match res {
109+
Ok(x) => x,
110+
Err(e) => {
111+
let _ = tx.send(Err(e));
112+
return Ok(());
113+
}
114+
};
115115

116-
// non-blocking mode, do not know about the replication stat.
117-
let _ = tx.send(Ok(AddLearnerResponse { matched: None }));
118-
}
116+
// TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for
117+
// sending vote requests etc?
118+
let state = self.spawn_replication_stream(target).await;
119+
self.nodes.insert(target, state);
119120

120121
tracing::debug!(
121122
"after add target node {} as learner {:?}",
122123
target,
123124
self.core.engine.state.last_log_id()
124125
);
126+
127+
let _ = tx.send(Ok(AddLearnerResponse {
128+
membership_log_id: Some(log_id),
129+
matched: None,
130+
}));
131+
132+
Ok(())
125133
}
126134

127135
/// return true if there is pending uncommitted config change
@@ -250,7 +258,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
250258
&mut self,
251259
payload: EntryPayload<C>,
252260
resp_tx: Option<RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>>,
253-
) -> Result<(), Fatal<C::NodeId>> {
261+
) -> Result<LogId<C::NodeId>, Fatal<C::NodeId>> {
254262
let mut entry_refs = [EntryRef::new(&payload)];
255263
// TODO: it should returns membership config error etc. currently this is done by the caller.
256264
self.core.engine.leader_append_entries(&mut entry_refs);
@@ -262,7 +270,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
262270

263271
self.run_engine_commands(&entry_refs).await?;
264272

265-
Ok(())
273+
Ok(*entry_refs[0].get_log_id())
266274
}
267275

268276
#[tracing::instrument(level = "debug", skip_all)]

openraft/src/core/leader_state.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
8181
};
8282

8383
for target in targets {
84-
let state = self.spawn_replication_stream(target, None).await;
84+
let state = self.spawn_replication_stream(target).await;
8585
self.nodes.insert(target, state);
8686
}
8787

@@ -148,8 +148,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
148148
RaftMsg::ClientWriteRequest { rpc, tx } => {
149149
self.write_entry(rpc.payload, Some(tx)).await?;
150150
}
151-
RaftMsg::AddLearner { id, node, tx, blocking } => {
152-
self.add_learner(id, node, tx, blocking).await;
151+
RaftMsg::AddLearner { id, node, tx } => {
152+
self.add_learner(id, node, tx).await?;
153153
}
154154
RaftMsg::ChangeMembership {
155155
changes,

openraft/src/core/replication.rs

+2-22
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ use crate::core::LeaderState;
88
use crate::core::ReplicationState;
99
use crate::core::ServerState;
1010
use crate::core::SnapshotState;
11-
use crate::error::AddLearnerError;
1211
use crate::metrics::UpdateMatchedLogId;
13-
use crate::raft::AddLearnerResponse;
14-
use crate::raft::RaftRespTx;
1512
use crate::replication::ReplicaEvent;
1613
use crate::replication::ReplicationStream;
1714
use crate::replication::UpdateReplication;
@@ -28,13 +25,9 @@ use crate::StorageError;
2825

2926
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
3027
/// Spawn a new replication stream returning its replication state handle.
31-
#[tracing::instrument(level = "debug", skip(self, caller_tx))]
28+
#[tracing::instrument(level = "debug", skip(self))]
3229
#[allow(clippy::type_complexity)]
33-
pub(super) async fn spawn_replication_stream(
34-
&mut self,
35-
target: C::NodeId,
36-
caller_tx: Option<RaftRespTx<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>>>,
37-
) -> ReplicationState<C::NodeId> {
30+
pub(super) async fn spawn_replication_stream(&mut self, target: C::NodeId) -> ReplicationState<C::NodeId> {
3831
let target_node = self.core.engine.state.membership_state.effective.get_node(&target);
3932

4033
let repl_stream = ReplicationStream::new::<C, N, S>(
@@ -53,7 +46,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
5346
matched: None,
5447
repl_stream,
5548
remove_since: None,
56-
tx: caller_tx,
5749
failures: 0,
5850
}
5951
}
@@ -135,18 +127,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
135127

136128
state.matched = Some(matched);
137129

138-
// Issue a response on the learners response channel if needed.
139-
if state.is_line_rate(&self.core.engine.state.last_log_id(), &self.core.config) {
140-
// This replication became line rate.
141-
142-
// When adding a learner, it blocks until the replication becomes line-rate.
143-
if let Some(tx) = state.tx.take() {
144-
// TODO(xp): define a specific response type for learner matched event.
145-
let x = AddLearnerResponse { matched: state.matched };
146-
let _ = tx.send(Ok(x));
147-
}
148-
}
149-
150130
// Drop replication stream if needed.
151131
if self.try_remove_replication(target).await {
152132
// nothing to do

openraft/src/core/replication_state.rs

-7
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ use std::fmt::Debug;
22
use std::fmt::Formatter;
33

44
use crate::config::Config;
5-
use crate::error::AddLearnerError;
6-
use crate::raft::AddLearnerResponse;
7-
use crate::raft::RaftRespTx;
85
use crate::raft_types::LogIdOptionExt;
96
use crate::replication::ReplicationStream;
107
use crate::LogId;
@@ -23,10 +20,6 @@ pub(crate) struct ReplicationState<NID: NodeId> {
2320
///
2421
/// It will be reset once a successful replication is done.
2522
pub failures: u64,
26-
27-
/// The response channel to use for when this node has successfully synced with the cluster.
28-
#[allow(clippy::type_complexity)]
29-
pub tx: Option<RaftRespTx<AddLearnerResponse<NID>, AddLearnerError<NID>>>,
3023
}
3124

3225
impl<NID: NodeId> MessageSummary<ReplicationState<NID>> for ReplicationState<NID> {

openraft/src/raft.rs

+102-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tokio::task::JoinHandle;
1515
use tracing::Span;
1616

1717
use crate::config::Config;
18+
use crate::core::is_matched_upto_date;
1819
use crate::core::Expectation;
1920
use crate::core::RaftCore;
2021
use crate::error::AddLearnerError;
@@ -118,6 +119,8 @@ enum CoreState<NID: NodeId> {
118119
}
119120

120121
struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
122+
id: C::NodeId,
123+
config: Arc<Config>,
121124
tx_api: mpsc::UnboundedSender<(RaftMsg<C, N, S>, Span)>,
122125
rx_metrics: watch::Receiver<RaftMetrics<C::NodeId>>,
123126
// TODO(xp): it does not need to be a async mutex.
@@ -180,7 +183,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
180183

181184
let raft_handle = RaftCore::spawn(
182185
id,
183-
config,
186+
config.clone(),
184187
network,
185188
storage,
186189
tx_api.clone(),
@@ -190,6 +193,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
190193
);
191194

192195
let inner = RaftInner {
196+
id,
197+
config,
193198
tx_api,
194199
rx_metrics,
195200
tx_shutdown: Mutex::new(Some(tx_shutdown)),
@@ -345,7 +350,96 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
345350
blocking: bool,
346351
) -> Result<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>> {
347352
let (tx, rx) = oneshot::channel();
348-
self.call_core(RaftMsg::AddLearner { id, node, blocking, tx }, rx).await
353+
let resp = self.call_core(RaftMsg::AddLearner { id, node, tx }, rx).await?;
354+
355+
if !blocking {
356+
return Ok(resp);
357+
}
358+
359+
if self.inner.id == id {
360+
return Ok(resp);
361+
}
362+
363+
// Otherwise, blocks until the replication to the new learner becomes up to date.
364+
365+
// The log id of the membership that contains the added learner.
366+
let membership_log_id = resp.membership_log_id;
367+
368+
let res0 = Arc::new(std::sync::Mutex::new(resp));
369+
let res = res0.clone();
370+
371+
let wait_res = self
372+
.wait(None)
373+
.metrics(
374+
|metrics| match self.check_replication_upto_date(metrics, id, membership_log_id) {
375+
Ok(resp) => {
376+
res.lock().unwrap().membership_log_id = resp;
377+
true
378+
}
379+
// keep waiting
380+
Err(_) => false,
381+
},
382+
"wait new learner to become line-rate",
383+
)
384+
.await;
385+
386+
tracing::info!(wait_res = debug(&wait_res), "waiting for replication to new learner");
387+
388+
let r = {
389+
let x = res0.lock().unwrap();
390+
x.clone()
391+
};
392+
Ok(r)
393+
}
394+
395+
/// Returns Ok() with the latest known matched log id if it should quit waiting: leader change, node removed, or
396+
/// replication becomes upto date.
397+
///
398+
/// Returns Err() if it should keep waiting.
399+
fn check_replication_upto_date(
400+
&self,
401+
metrics: &RaftMetrics<C::NodeId>,
402+
node_id: C::NodeId,
403+
membership_log_id: Option<LogId<C::NodeId>>,
404+
) -> Result<Option<LogId<C::NodeId>>, ()> {
405+
if metrics.membership_config.log_id < membership_log_id {
406+
// Waiting for the latest metrics to report.
407+
return Err(());
408+
}
409+
410+
if !metrics.membership_config.membership.contains(&node_id) {
411+
// This learner has been removed.
412+
return Ok(None);
413+
}
414+
415+
let repl = match &metrics.replication {
416+
None => {
417+
// This node is no longer a leader.
418+
return Ok(None);
419+
}
420+
Some(x) => x,
421+
};
422+
423+
let replication_metrics = &repl.data().replication;
424+
let target_metrics = match replication_metrics.get(&node_id) {
425+
None => {
426+
// Maybe replication is not reported yet. Keep waiting.
427+
return Err(());
428+
}
429+
Some(x) => x,
430+
};
431+
432+
let matched = target_metrics.matched();
433+
434+
let last_log_id = LogId::new(matched.leader_id, metrics.last_log_index.unwrap_or_default());
435+
436+
if is_matched_upto_date(&Some(matched), &Some(last_log_id), &self.inner.config) {
437+
// replication became up to date.
438+
return Ok(Some(matched));
439+
}
440+
441+
// Not up to date, keep waiting.
442+
Err(())
349443
}
350444

351445
/// Propose a cluster configuration change.
@@ -604,6 +698,10 @@ pub(crate) type RaftRespRx<T, E> = oneshot::Receiver<Result<T, E>>;
604698
#[derive(Debug, Clone, PartialEq, Eq)]
605699
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
606700
pub struct AddLearnerResponse<NID: NodeId> {
701+
/// The log id of the membership that contains the added learner.
702+
pub membership_log_id: Option<LogId<NID>>,
703+
704+
/// The last log id that matches leader log.
607705
pub matched: Option<LogId<NID>>,
608706
}
609707

@@ -640,9 +738,6 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
640738

641739
node: Option<Node>,
642740

643-
/// If block until the newly added learner becomes line-rate.
644-
blocking: bool,
645-
646741
/// Send the log id when the replication becomes line-rate.
647742
tx: RaftRespTx<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>>,
648743
},
@@ -696,8 +791,8 @@ where
696791
RaftMsg::Initialize { members, .. } => {
697792
format!("Initialize: {:?}", members)
698793
}
699-
RaftMsg::AddLearner { id, blocking, .. } => {
700-
format!("AddLearner: id: {}, blocking: {}", id, blocking)
794+
RaftMsg::AddLearner { id, node, .. } => {
795+
format!("AddLearner: id: {}, node: {:?}", id, node)
701796
}
702797
RaftMsg::ChangeMembership {
703798
changes: members,

0 commit comments

Comments
 (0)