Skip to content

Commit 961469c

Browse files
committed
Feature: add PayloadTooLarge error
If a `RaftNetwork` implmentation found an `AppendEntriesRequest` is too large, it could return a `PayloadTooLarge::new_entries_hint(n)` error to tell openraft devide request into smaller chunks containing at most `n` entries. Openraft will limit the number of entries in the next 10 `AppendEntrie` RPC. Exmaple: ```rust impl<C: RaftTypeConfig> RaftNetwork<C> for MyNetwork { fn append_entries(&self, rpc: AppendEntriesRequest<C>, option: RPCOption ) -> Result<_, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> { if rpc.entries.len() > 10 { return Err(PayloadTooLarge::new_entries_hint(10).into()); } // ... } } ```
1 parent 9c04cb0 commit 961469c

File tree

9 files changed

+428
-47
lines changed

9 files changed

+428
-47
lines changed

openraft/src/error.rs

+160
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::collections::BTreeSet;
44
use std::error::Error;
5+
use std::fmt;
56
use std::fmt::Debug;
67
use std::time::Duration;
78

@@ -256,6 +257,10 @@ pub enum RPCError<NID: NodeId, N: Node, E: Error> {
256257
#[error(transparent)]
257258
Unreachable(#[from] Unreachable),
258259

260+
/// The RPC payload is too large and should be split into smaller chunks.
261+
#[error(transparent)]
262+
PayloadTooLarge(#[from] PayloadTooLarge),
263+
259264
/// Failed to send the RPC request and should retry immediately.
260265
#[error(transparent)]
261266
Network(#[from] NetworkError),
@@ -276,6 +281,7 @@ where
276281
match self {
277282
RPCError::Timeout(_) => None,
278283
RPCError::Unreachable(_) => None,
284+
RPCError::PayloadTooLarge(_) => None,
279285
RPCError::Network(_) => None,
280286
RPCError::RemoteError(remote_err) => remote_err.source.forward_to_leader(),
281287
}
@@ -359,6 +365,136 @@ impl Unreachable {
359365
}
360366
}
361367

368+
/// Error indicating that an RPC is too large and cannot be sent.
369+
///
370+
/// This is a retryable error:
371+
/// A [`RaftNetwork`] implementation returns this error to inform Openraft to divide an
372+
/// [`AppendEntriesRequest`] into smaller chunks.
373+
/// Openraft will immediately retry sending in smaller chunks.
374+
/// If the request cannot be divided(contains only one entry), Openraft interprets it as
375+
/// [`Unreachable`].
376+
///
377+
/// A hint can be provided to help Openraft in splitting the request.
378+
///
379+
/// The application should also set an appropriate value for [`Config::max_payload_entries`] to
380+
/// avoid returning this error if possible.
381+
///
382+
/// Example:
383+
///
384+
/// ```ignore
385+
/// impl<C: RaftTypeConfig> RaftNetwork<C> for MyNetwork {
386+
/// fn append_entries(&self,
387+
/// rpc: AppendEntriesRequest<C>,
388+
/// option: RPCOption
389+
/// ) -> Result<_, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
390+
/// if rpc.entries.len() > 10 {
391+
/// return Err(PayloadTooLarge::new_entries_hint(10).into());
392+
/// }
393+
/// // ...
394+
/// }
395+
/// }
396+
/// ```
397+
///
398+
/// [`RaftNetwork`]: crate::network::RaftNetwork
399+
/// [`AppendEntriesRequest`]: crate::raft::AppendEntriesRequest
400+
/// [`Config::max_payload_entries`]: crate::config::Config::max_payload_entries
401+
///
402+
/// [`InstallSnapshotRequest`]: crate::raft::InstallSnapshotRequest
403+
/// [`Config::snapshot_max_chunk_size`]: crate::config::Config::snapshot_max_chunk_size
404+
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
405+
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
406+
pub struct PayloadTooLarge {
407+
action: RPCTypes,
408+
409+
/// An optional hint indicating the anticipated number of entries.
410+
/// Used only for append-entries replication.
411+
entries_hint: u64,
412+
413+
/// An optional hint indicating the anticipated size in bytes.
414+
/// Used for snapshot replication.
415+
bytes_hint: u64,
416+
417+
#[source]
418+
source: Option<AnyError>,
419+
}
420+
421+
impl fmt::Display for PayloadTooLarge {
422+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423+
write!(f, "RPC",)?;
424+
write!(f, "({})", self.action)?;
425+
write!(f, " payload too large:",)?;
426+
427+
write!(f, " hint:(")?;
428+
match self.action {
429+
RPCTypes::Vote => {
430+
unreachable!("vote rpc should not have payload")
431+
}
432+
RPCTypes::AppendEntries => {
433+
write!(f, "entries:{}", self.entries_hint)?;
434+
}
435+
RPCTypes::InstallSnapshot => {
436+
write!(f, "bytes:{}", self.bytes_hint)?;
437+
}
438+
}
439+
write!(f, ")")?;
440+
441+
if let Some(s) = &self.source {
442+
write!(f, ", source: {}", s)?;
443+
}
444+
445+
Ok(())
446+
}
447+
}
448+
449+
impl PayloadTooLarge {
450+
/// Create a new PayloadTooLarge, with entries hint, without the causing error.
451+
pub fn new_entries_hint(entries_hint: u64) -> Self {
452+
debug_assert!(entries_hint > 0, "entries_hint should be greater than 0");
453+
454+
Self {
455+
action: RPCTypes::AppendEntries,
456+
entries_hint,
457+
bytes_hint: u64::MAX,
458+
source: None,
459+
}
460+
}
461+
462+
// No used yet.
463+
/// Create a new PayloadTooLarge, with bytes hint, without the causing error.
464+
#[allow(dead_code)]
465+
pub(crate) fn new_bytes_hint(bytes_hint: u64) -> Self {
466+
debug_assert!(bytes_hint > 0, "bytes_hint should be greater than 0");
467+
468+
Self {
469+
action: RPCTypes::InstallSnapshot,
470+
entries_hint: u64::MAX,
471+
bytes_hint,
472+
source: None,
473+
}
474+
}
475+
476+
/// Set the source error that causes this PayloadTooLarge error.
477+
pub fn with_source_error(mut self, e: &(impl Error + 'static)) -> Self {
478+
self.source = Some(AnyError::new(e));
479+
self
480+
}
481+
482+
pub fn action(&self) -> RPCTypes {
483+
self.action
484+
}
485+
486+
/// Get the hint for entries number.
487+
pub fn entries_hint(&self) -> u64 {
488+
self.entries_hint
489+
}
490+
491+
// No used yet.
492+
#[allow(dead_code)]
493+
pub(crate) fn bytes_hint(&self) -> u64 {
494+
self.bytes_hint
495+
}
496+
}
497+
362498
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
363499
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
364500
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
@@ -532,3 +668,27 @@ impl<NID: NodeId> From<Result<(), RejectAppendEntries<NID>>> for AppendEntriesRe
532668
}
533669
}
534670
}
671+
672+
#[cfg(test)]
673+
mod tests {
674+
use anyerror::AnyError;
675+
676+
use crate::error::PayloadTooLarge;
677+
678+
#[test]
679+
fn test_append_too_large() -> anyhow::Result<()> {
680+
let a = PayloadTooLarge::new_entries_hint(5);
681+
assert_eq!("RPC(AppendEntries) payload too large: hint:(entries:5)", a.to_string());
682+
683+
let a = PayloadTooLarge::new_bytes_hint(5);
684+
assert_eq!("RPC(InstallSnapshot) payload too large: hint:(bytes:5)", a.to_string());
685+
686+
let a = PayloadTooLarge::new_entries_hint(5).with_source_error(&AnyError::error("test"));
687+
assert_eq!(
688+
"RPC(AppendEntries) payload too large: hint:(entries:5), source: test",
689+
a.to_string()
690+
);
691+
692+
Ok(())
693+
}
694+
}

openraft/src/log_id_range.rs

+6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::fmt::Formatter;
55
use crate::less_equal;
66
use crate::validate::Validate;
77
use crate::LogId;
8+
use crate::LogIdOptionExt;
89
use crate::MessageSummary;
910
use crate::NodeId;
1011

@@ -43,6 +44,11 @@ impl<NID: NodeId> LogIdRange<NID> {
4344
last_log_id: last,
4445
}
4546
}
47+
48+
#[allow(dead_code)]
49+
pub(crate) fn len(&self) -> u64 {
50+
self.last_log_id.next_index() - self.prev_log_id.next_index()
51+
}
4652
}
4753

4854
#[cfg(test)]

openraft/src/network/rpc_type.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::fmt;
22

3-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
3+
#[derive(Debug, Clone, Copy)]
4+
#[derive(PartialEq, Eq)]
5+
#[derive(Hash)]
46
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
57
pub enum RPCTypes {
68
Vote,

openraft/src/replication/hint.rs

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//! Defines config hint for replication RPC
2+
3+
/// Temporary config hint for replication
4+
#[derive(Clone, Debug, Default)]
5+
pub(crate) struct ReplicationHint {
6+
n: u64,
7+
8+
/// How many times this hint can be used.
9+
ttl: u64,
10+
}
11+
12+
impl ReplicationHint {
13+
/// Create a new `ReplicationHint`
14+
pub(crate) fn new(n: u64, ttl: u64) -> Self {
15+
Self { n, ttl }
16+
}
17+
18+
pub(crate) fn get(&mut self) -> Option<u64> {
19+
if self.ttl > 0 {
20+
self.ttl -= 1;
21+
Some(self.n)
22+
} else {
23+
None
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)