Skip to content

Commit 2d1aff0

Browse files
committed
Change: error InProgress: add field committed
- Refactor: Simplify Engine command executor
1 parent 1399b97 commit 2d1aff0

File tree

6 files changed

+95
-73
lines changed

6 files changed

+95
-73
lines changed

openraft/src/core/append_entries.rs

-48
This file was deleted.

openraft/src/core/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
//! Also it receives and execute `Command` emitted by `Engine` to apply raft state to underlying storage or forward
55
//! messages to other raft nodes.
66
7-
mod append_entries;
87
mod install_snapshot;
98
mod internal_msg;
109
mod raft_core;

openraft/src/core/raft_core.rs

+31-22
Original file line numberDiff line numberDiff line change
@@ -480,10 +480,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
480480
///
481481
/// Adding a learner does not affect election, thus it does not need to enter joint consensus.
482482
///
483+
/// TODO: It has to wait for the previous membership to commit.
484+
/// TODO: Otherwise a second proposed membership implies the previous one is committed.
485+
/// TODO: Test it.
486+
/// TODO: This limit can be removed if membership_state is replaced by a list of membership logs.
487+
/// TODO: Because allowing this requires the engine to be able to store more than 2 membership logs.
483488
/// And it does not need to wait for the previous membership log to commit to propose the new membership log.
484-
///
485-
/// If `blocking` is `true`, the result is sent to `tx` as the target node log has caught up. Otherwise, result is
486-
/// sent at once, no matter whether the target node log is lagging or not.
487489
#[tracing::instrument(level = "debug", skip_all)]
488490
pub(super) async fn add_learner(
489491
&mut self,
@@ -553,13 +555,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
553555
Ok(())
554556
}
555557

556-
/// return true if there is pending uncommitted config change
557-
fn has_pending_config(&self) -> bool {
558-
// The last membership config is not committed yet.
559-
// Can not process the next one.
560-
self.engine.state.committed < self.engine.state.membership_state.effective.log_id
561-
}
562-
563558
/// Submit change-membership by writing a Membership log entry, if the `expect` is satisfied.
564559
///
565560
/// If `turn_to_learner` is `true`, removed `voter` will becomes `learner`. Otherwise they will be just removed.
@@ -582,13 +577,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
582577
return Ok(());
583578
}
584579

585-
if self.has_pending_config() {
586-
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
587-
ChangeMembershipError::InProgress(InProgress {
588-
// has_pending_config() implies an existing membership log.
589-
membership_log_id: self.engine.state.membership_state.effective.log_id.unwrap(),
590-
}),
591-
)));
580+
let res = self.check_membership_committed();
581+
if let Err(e) = res {
582+
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
592583
return Ok(());
593584
}
594585

@@ -631,6 +622,20 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
631622
Ok(())
632623
}
633624

625+
/// Check if the effective membership is committed, so that a new membership is allowed to be proposed.
626+
fn check_membership_committed(&self) -> Result<(), ChangeMembershipError<C::NodeId>> {
627+
let st = &self.engine.state;
628+
629+
if st.is_membership_committed() {
630+
return Ok(());
631+
}
632+
633+
Err(ChangeMembershipError::InProgress(InProgress {
634+
committed: st.committed,
635+
membership_log_id: st.membership_state.effective.log_id,
636+
}))
637+
}
638+
634639
/// return Ok if all the current replication states satisfy the `expectation` for changing membership.
635640
fn check_replication_states<'n>(
636641
&self,
@@ -1005,11 +1010,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
10051010

10061011
tracing::debug!(last_applied = display(last_applied), "update last_applied");
10071012

1008-
if let Some(leader_data) = &mut self.leader_data {
1013+
if let Some(l) = &mut self.leader_data {
10091014
let mut results = apply_results.into_iter();
10101015

10111016
for log_index in since..end {
1012-
let tx = leader_data.client_resp_channels.remove(&log_index);
1017+
let tx = l.client_resp_channels.remove(&log_index);
10131018

10141019
let i = log_index - since;
10151020
let entry = &entries[i as usize];
@@ -1023,6 +1028,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
10231028
Ok(())
10241029
}
10251030

1031+
/// Send result of applying a log entry to its client.
10261032
#[tracing::instrument(level = "debug", skip_all)]
10271033
pub(super) async fn send_response(
10281034
entry: &Entry<C>,
@@ -1460,7 +1466,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
14601466

14611467
match msg {
14621468
RaftMsg::AppendEntries { rpc, tx } => {
1463-
let _ = tx.send(self.handle_append_entries_request(rpc).await.extract_fatal()?);
1469+
let resp =
1470+
self.engine.handle_append_entries_req(&rpc.vote, rpc.prev_log_id, &rpc.entries, rpc.leader_commit);
1471+
self.run_engine_commands(rpc.entries.as_slice()).await?;
1472+
let _ = tx.send(Ok(resp));
14641473
}
14651474
RaftMsg::RequestVote { rpc, tx } => {
14661475
let _ = tx.send(self.handle_vote_request(rpc).await.extract_fatal()?);
@@ -1773,8 +1782,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
17731782
self.leader_commit(i).await?;
17741783
}
17751784
}
1776-
Command::FollowerCommit { upto: _, .. } => {
1777-
self.replicate_to_state_machine_if_needed().await?;
1785+
Command::FollowerCommit { upto, .. } => {
1786+
self.apply_to_state_machine(upto.index).await?;
17781787
}
17791788
Command::ReplicateInputEntries { range } => {
17801789
if let Some(last) = range.clone().last() {

openraft/src/error.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ where E: TryInto<Fatal<NID>> + Clone
5555
}
5656
}
5757

58+
// TODO: not used
5859
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
5960
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
6061
pub enum AppendEntriesError<NID: NodeId> {
@@ -368,9 +369,10 @@ pub struct QuorumNotEnough<NID: NodeId> {
368369

369370
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
370371
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
371-
#[error("the cluster is already undergoing a configuration change at log {membership_log_id}")]
372+
#[error("the cluster is already undergoing a configuration change at log {membership_log_id:?}, committed log id: {committed:?}")]
372373
pub struct InProgress<NID: NodeId> {
373-
pub membership_log_id: LogId<NID>,
374+
pub committed: Option<LogId<NID>>,
375+
pub membership_log_id: Option<LogId<NID>>,
374376
}
375377

376378
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]

openraft/src/raft_state.rs

+5
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ where NID: NodeId
118118
self.leader = Some(Leader::new(em.clone(), em.learner_ids()));
119119
}
120120

121+
/// Return true if the currently effective membership is committed.
122+
pub(crate) fn is_membership_committed(&self) -> bool {
123+
self.committed >= self.membership_state.effective.log_id
124+
}
125+
121126
/// Update field `committed` if the input is greater.
122127
/// If updated, it returns the previous value in a `Some()`.
123128
pub(crate) fn update_committed(&mut self, committed: &Option<LogId<NID>>) -> Option<Option<LogId<NID>>> {

openraft/src/raft_state_test.rs

+55
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1+
use std::sync::Arc;
2+
3+
use maplit::btreeset;
4+
15
use crate::engine::LogIdList;
6+
use crate::EffectiveMembership;
27
use crate::LeaderId;
38
use crate::LogId;
9+
use crate::Membership;
10+
use crate::MembershipState;
411
use crate::RaftState;
512

613
fn log_id(term: u64, index: u64) -> LogId<u64> {
@@ -10,6 +17,10 @@ fn log_id(term: u64, index: u64) -> LogId<u64> {
1017
}
1118
}
1219

20+
fn m12() -> Membership<u64> {
21+
Membership::new(vec![btreeset! {1,2}], None)
22+
}
23+
1324
#[test]
1425
fn test_raft_state_has_log_id_empty() -> anyhow::Result<()> {
1526
let rs = RaftState::default();
@@ -100,3 +111,47 @@ fn test_raft_state_last_purged_log_id() -> anyhow::Result<()> {
100111

101112
Ok(())
102113
}
114+
115+
#[test]
116+
fn test_raft_state_is_membership_committed() -> anyhow::Result<()> {
117+
//
118+
let rs = RaftState::<u64> {
119+
committed: None,
120+
membership_state: MembershipState {
121+
committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
122+
effective: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
123+
},
124+
..Default::default()
125+
};
126+
127+
assert!(
128+
!rs.is_membership_committed(),
129+
"committed == effective, but not consider this"
130+
);
131+
132+
let rs = RaftState::<u64> {
133+
committed: Some(log_id(2, 2)),
134+
membership_state: MembershipState {
135+
committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
136+
effective: Arc::new(EffectiveMembership::new(Some(log_id(2, 2)), m12())),
137+
},
138+
..Default::default()
139+
};
140+
141+
assert!(
142+
rs.is_membership_committed(),
143+
"committed != effective, but rs.committed == effective.log_id"
144+
);
145+
146+
let rs = RaftState::<u64> {
147+
committed: Some(log_id(2, 2)),
148+
membership_state: MembershipState {
149+
committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
150+
effective: Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m12())),
151+
},
152+
..Default::default()
153+
};
154+
155+
assert!(!rs.is_membership_committed(), "rs.committed < effective.log_id");
156+
Ok(())
157+
}

0 commit comments

Comments
 (0)