Skip to content

Commit f08a3e6

Browse files
committed
Change: RaftNetwork return RPCError instead of anyhow::Error
- When a remote error encountered when replication, the replication will be stopped at once. - Fix: #140
1 parent 01867e8 commit f08a3e6

File tree

6 files changed

+147
-39
lines changed

6 files changed

+147
-39
lines changed

openraft/src/core/client.rs

+27-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::sync::Arc;
22

3-
use anyhow::anyhow;
43
use futures::future::TryFutureExt;
54
use futures::stream::FuturesUnordered;
65
use futures::stream::StreamExt;
@@ -15,6 +14,8 @@ use crate::core::State;
1514
use crate::error::ClientReadError;
1615
use crate::error::ClientWriteError;
1716
use crate::error::QuorumNotEnough;
17+
use crate::error::RPCError;
18+
use crate::error::Timeout;
1819
use crate::raft::AppendEntriesRequest;
1920
use crate::raft::ClientWriteRequest;
2021
use crate::raft::ClientWriteResponse;
@@ -25,6 +26,7 @@ use crate::replication::RaftEvent;
2526
use crate::AppData;
2627
use crate::AppDataResponse;
2728
use crate::MessageSummary;
29+
use crate::RPCTypes;
2830
use crate::RaftNetwork;
2931
use crate::RaftStorage;
3032
use crate::StorageError;
@@ -93,8 +95,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
9395
let mut pending = FuturesUnordered::new();
9496
let membership = &self.core.effective_membership.membership;
9597

96-
for (id, node) in self.nodes.iter() {
97-
if !membership.is_member(id) {
98+
for (target, node) in self.nodes.iter() {
99+
if !membership.is_member(target) {
98100
continue;
99101
}
100102

@@ -106,22 +108,37 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
106108
leader_commit: self.core.committed,
107109
};
108110

109-
let target = *id;
111+
let my_id = self.core.id;
112+
let target = *target;
110113
let network = self.core.network.clone();
111114

112115
let ttl = Duration::from_millis(self.core.config.heartbeat_interval);
113116

114117
let task = tokio::spawn(
115118
async move {
116-
match timeout(ttl, network.send_append_entries(target, rpc)).await {
117-
Ok(Ok(data)) => Ok((target, data)),
118-
Ok(Err(err)) => Err((target, err)),
119-
Err(_timeout) => Err((target, anyhow!("timeout waiting for leadership confirmation"))),
119+
let outer_res = timeout(ttl, network.send_append_entries(target, rpc)).await;
120+
match outer_res {
121+
Ok(append_res) => match append_res {
122+
Ok(x) => Ok((target, x)),
123+
Err(err) => Err((target, err)),
124+
},
125+
Err(_timeout) => {
126+
let timeout_err = Timeout {
127+
action: RPCTypes::AppendEntries,
128+
id: my_id,
129+
target,
130+
timeout: ttl,
131+
};
132+
133+
Err((target, RPCError::Timeout(timeout_err)))
134+
}
120135
}
121136
}
122-
.instrument(tracing::debug_span!("spawn")),
137+
// TODO(xp): add target to span
138+
.instrument(tracing::debug_span!("SPAWN_append_entries")),
123139
)
124-
.map_err(move |err| (*id, err));
140+
.map_err(move |err| (target, err));
141+
125142
pending.push(task);
126143
}
127144

openraft/src/error.rs

+30-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use serde::Serialize;
1212
use crate::raft_types::SnapshotSegmentId;
1313
use crate::LogId;
1414
use crate::NodeId;
15+
use crate::RPCTypes;
1516
use crate::StorageError;
1617

1718
/// Fatal is unrecoverable and shuts down raft at once.
@@ -85,8 +86,6 @@ pub enum ClientReadError {
8586
/// An error related to a client write request.
8687
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
8788
pub enum ClientWriteError {
88-
// #[error("{0}")]
89-
// RaftError(#[from] RaftError),
9089
#[error(transparent)]
9190
ForwardToLeader(#[from] ForwardToLeader),
9291

@@ -177,7 +176,6 @@ impl From<StorageError> for AddLearnerError {
177176

178177
/// Error variants related to the Replication.
179178
#[derive(Debug, thiserror::Error)]
180-
#[non_exhaustive]
181179
#[allow(clippy::large_enum_variant)]
182180
pub enum ReplicationError {
183181
#[error(transparent)]
@@ -202,6 +200,34 @@ pub enum ReplicationError {
202200

203201
#[error(transparent)]
204202
Network(#[from] NetworkError),
203+
204+
#[error(transparent)]
205+
RemoteError(#[from] RemoteError<AppendEntriesError>),
206+
}
207+
208+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
209+
pub enum RPCError<T: Error> {
210+
#[error(transparent)]
211+
Timeout(#[from] Timeout),
212+
213+
#[error(transparent)]
214+
Network(#[from] NetworkError),
215+
216+
#[error(transparent)]
217+
RemoteError(#[from] RemoteError<T>),
218+
}
219+
220+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
221+
#[error("error occur on remote peer {target}: {source}")]
222+
pub struct RemoteError<T: std::error::Error> {
223+
pub target: NodeId,
224+
pub source: T,
225+
}
226+
227+
impl<T: std::error::Error> RemoteError<T> {
228+
pub fn new(target: NodeId, e: T) -> Self {
229+
Self { target, source: e }
230+
}
205231
}
206232

207233
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
@@ -244,7 +270,7 @@ impl From<anyhow::Error> for NetworkError {
244270
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
245271
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
246272
pub struct Timeout {
247-
pub action: String,
273+
pub action: RPCTypes,
248274
pub id: NodeId,
249275
pub target: NodeId,
250276
pub timeout: Duration,

openraft/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub use crate::core::State;
3434
pub use crate::defensive::DefensiveCheck;
3535
pub use crate::membership::Membership;
3636
pub use crate::metrics::RaftMetrics;
37+
pub use crate::network::RPCTypes;
3738
pub use crate::network::RaftNetwork;
3839
pub use crate::raft::Raft;
3940
pub use crate::raft_types::LogId;

openraft/src/network.rs

+28-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
//! The Raft network interface.
22
3-
use anyhow::Result;
3+
use std::fmt::Formatter;
4+
45
use async_trait::async_trait;
6+
use serde::Deserialize;
7+
use serde::Serialize;
58

9+
use crate::error::AppendEntriesError;
10+
use crate::error::InstallSnapshotError;
11+
use crate::error::RPCError;
12+
use crate::error::VoteError;
613
use crate::raft::AppendEntriesRequest;
714
use crate::raft::AppendEntriesResponse;
815
use crate::raft::InstallSnapshotRequest;
@@ -12,6 +19,19 @@ use crate::raft::VoteResponse;
1219
use crate::AppData;
1320
use crate::NodeId;
1421

22+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23+
pub enum RPCTypes {
24+
Vote,
25+
AppendEntries,
26+
InstallSnapshot,
27+
}
28+
29+
impl std::fmt::Display for RPCTypes {
30+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
31+
write!(f, "{:?}", self)
32+
}
33+
}
34+
1535
/// A trait defining the interface for a Raft network between cluster members.
1636
///
1737
/// See the [network chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#3-impl-raftnetwork)
@@ -21,15 +41,19 @@ pub trait RaftNetwork<D>: Send + Sync + 'static
2141
where D: AppData
2242
{
2343
/// Send an AppendEntries RPC to the target Raft node (§5).
24-
async fn send_append_entries(&self, target: NodeId, rpc: AppendEntriesRequest<D>) -> Result<AppendEntriesResponse>;
44+
async fn send_append_entries(
45+
&self,
46+
target: NodeId,
47+
rpc: AppendEntriesRequest<D>,
48+
) -> Result<AppendEntriesResponse, RPCError<AppendEntriesError>>;
2549

2650
/// Send an InstallSnapshot RPC to the target Raft node (§7).
2751
async fn send_install_snapshot(
2852
&self,
2953
target: NodeId,
3054
rpc: InstallSnapshotRequest,
31-
) -> Result<InstallSnapshotResponse>;
55+
) -> Result<InstallSnapshotResponse, RPCError<InstallSnapshotError>>;
3256

3357
/// Send a RequestVote RPC to the target Raft node (§5).
34-
async fn send_vote(&self, target: NodeId, rpc: VoteRequest) -> Result<VoteResponse>;
58+
async fn send_vote(&self, target: NodeId, rpc: VoteRequest) -> Result<VoteResponse, RPCError<VoteError>>;
3559
}

openraft/src/replication/mod.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ use tracing::Span;
2121

2222
use crate::config::Config;
2323
use crate::config::SnapshotPolicy;
24+
use crate::error::AppendEntriesError;
2425
use crate::error::CommittedAdvanceTooMany;
2526
use crate::error::HigherTerm;
2627
use crate::error::LackEntry;
27-
use crate::error::NetworkError;
28+
use crate::error::RPCError;
2829
use crate::error::ReplicationError;
2930
use crate::error::Timeout;
3031
use crate::raft::AppendEntriesRequest;
@@ -39,6 +40,7 @@ use crate::ErrorVerb;
3940
use crate::LogId;
4041
use crate::MessageSummary;
4142
use crate::NodeId;
43+
use crate::RPCTypes;
4244
use crate::RaftNetwork;
4345
use crate::RaftStorage;
4446
use crate::ToStorageResult;
@@ -253,6 +255,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
253255
ReplicationError::Network { .. } => {
254256
// nothing to do
255257
}
258+
ReplicationError::RemoteError(remote_err) => {
259+
tracing::error!(%remote_err, "remote peer error");
260+
match remote_err.source {
261+
AppendEntriesError::Fatal(fatal) => {
262+
tracing::error!(%fatal, target=%remote_err.target, "remote fatal error, close replication");
263+
return;
264+
}
265+
}
266+
}
256267
};
257268
}
258269
}
@@ -359,13 +370,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
359370
Ok(res) => res,
360371
Err(err) => {
361372
tracing::warn!(error=%err, "error sending AppendEntries RPC to target");
362-
return Err(ReplicationError::Network(NetworkError::from(err)));
373+
let repl_err = match err {
374+
RPCError::Timeout(e) => ReplicationError::Timeout(e),
375+
RPCError::Network(e) => ReplicationError::Network(e),
376+
RPCError::RemoteError(e) => ReplicationError::RemoteError(e),
377+
};
378+
return Err(repl_err);
363379
}
364380
},
365381
Err(timeout_err) => {
366382
tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target");
367383
return Err(ReplicationError::Timeout(Timeout {
368-
action: "send_append_entries".to_string(),
384+
action: RPCTypes::AppendEntries,
369385
id: self.id,
370386
target: self.target,
371387
timeout: the_timeout,

openraft/tests/fixtures/mod.rs

+42-18
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@ use memstore::ClientResponse as MemClientResponse;
2323
use memstore::MemStore;
2424
use openraft::async_trait::async_trait;
2525
use openraft::error::AddLearnerError;
26+
use openraft::error::AppendEntriesError;
2627
use openraft::error::ClientReadError;
2728
use openraft::error::ClientWriteError;
29+
use openraft::error::InstallSnapshotError;
30+
use openraft::error::NetworkError;
31+
use openraft::error::RPCError;
32+
use openraft::error::RemoteError;
33+
use openraft::error::VoteError;
2834
use openraft::metrics::Wait;
2935
use openraft::raft::AddLearnerResponse;
3036
use openraft::raft::AppendEntriesRequest;
@@ -805,6 +811,18 @@ impl RaftRouter {
805811

806812
Ok(())
807813
}
814+
815+
pub async fn check_reachable(&self, id: NodeId, target: NodeId) -> std::result::Result<(), NetworkError> {
816+
let isolated = self.isolated_nodes.read().await;
817+
818+
if isolated.contains(&target) || isolated.contains(&id) {
819+
let err = anyhow!("target node is isolated");
820+
let network_err = NetworkError::from(err);
821+
return Err(network_err);
822+
}
823+
824+
Ok(())
825+
}
808826
}
809827

810828
#[async_trait]
@@ -814,46 +832,52 @@ impl RaftNetwork<MemClientRequest> for RaftRouter {
814832
&self,
815833
target: u64,
816834
rpc: AppendEntriesRequest<MemClientRequest>,
817-
) -> Result<AppendEntriesResponse> {
835+
) -> std::result::Result<AppendEntriesResponse, RPCError<AppendEntriesError>> {
818836
tracing::debug!("append_entries to id={} {:?}", target, rpc);
819837
self.rand_send_delay().await;
820838

839+
self.check_reachable(rpc.leader_id, target).await?;
840+
821841
let rt = self.routing_table.read().await;
822-
let isolated = self.isolated_nodes.read().await;
823842
let addr = rt.get(&target).expect("target node not found in routing table");
824-
if isolated.contains(&target) || isolated.contains(&rpc.leader_id) {
825-
return Err(anyhow!("target node is isolated"));
826-
}
843+
827844
let resp = addr.0.append_entries(rpc).await;
828845

829846
tracing::debug!("append_entries: recv resp from id={} {:?}", target, resp);
830-
Ok(resp?)
847+
let resp = resp.map_err(|e| RemoteError::new(target, e))?;
848+
Ok(resp)
831849
}
832850

833851
/// Send an InstallSnapshot RPC to the target Raft node (§7).
834-
async fn send_install_snapshot(&self, target: u64, rpc: InstallSnapshotRequest) -> Result<InstallSnapshotResponse> {
852+
async fn send_install_snapshot(
853+
&self,
854+
target: u64,
855+
rpc: InstallSnapshotRequest,
856+
) -> std::result::Result<InstallSnapshotResponse, RPCError<InstallSnapshotError>> {
835857
self.rand_send_delay().await;
836858

859+
self.check_reachable(rpc.leader_id, target).await?;
860+
837861
let rt = self.routing_table.read().await;
838-
let isolated = self.isolated_nodes.read().await;
839862
let addr = rt.get(&target).expect("target node not found in routing table");
840-
if isolated.contains(&target) || isolated.contains(&rpc.leader_id) {
841-
return Err(anyhow!("target node is isolated"));
842-
}
843-
Ok(addr.0.install_snapshot(rpc).await?)
863+
864+
let resp = addr.0.install_snapshot(rpc).await;
865+
let resp = resp.map_err(|e| RemoteError::new(target, e))?;
866+
Ok(resp)
844867
}
845868

846869
/// Send a RequestVote RPC to the target Raft node (§5).
847-
async fn send_vote(&self, target: u64, rpc: VoteRequest) -> Result<VoteResponse> {
870+
async fn send_vote(&self, target: u64, rpc: VoteRequest) -> std::result::Result<VoteResponse, RPCError<VoteError>> {
848871
self.rand_send_delay().await;
849872

873+
self.check_reachable(rpc.candidate_id, target).await?;
874+
850875
let rt = self.routing_table.read().await;
851-
let isolated = self.isolated_nodes.read().await;
852876
let addr = rt.get(&target).expect("target node not found in routing table");
853-
if isolated.contains(&target) || isolated.contains(&rpc.candidate_id) {
854-
return Err(anyhow!("target node is isolated"));
855-
}
856-
Ok(addr.0.vote(rpc).await?)
877+
878+
let resp = addr.0.vote(rpc).await;
879+
let resp = resp.map_err(|e| RemoteError::new(target, e))?;
880+
Ok(resp)
857881
}
858882
}
859883

0 commit comments

Comments
 (0)