Skip to content

Commit 6f20e1f

Browse files
committed
Feature: add trait RaftPayload RaftEntry to access payload and entry without the need to know about user data, i.e., AppData or AppDataResponse.
- With `RaftPayload` and `RaftEntry`, the protocol can be implememnted in a standalone mod, without depending on type of user data. I.e. the protocol mod does not need to know about `AppData`, or `RaftStorage<RaftTypeConfig>`. This will make test easier to write. - Refactor: move `Entry`, `EntryPayload`, `RaftPayload`, `RaftEntry` into file `entry.rs`.
1 parent 060a9d1 commit 6f20e1f

24 files changed

+194
-131
lines changed

example-raft-kv/src/network/api.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use actix_web::Responder;
55
use openraft::error::CheckIsLeaderError;
66
use openraft::error::Infallible;
77
use openraft::raft::ClientWriteRequest;
8-
use openraft::raft::EntryPayload;
8+
use openraft::EntryPayload;
99
use web::Json;
1010

1111
use crate::app::ExampleApp;

example-raft-kv/src/store/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use std::sync::Arc;
66
use std::sync::Mutex;
77

88
use openraft::async_trait::async_trait;
9-
use openraft::raft::Entry;
10-
use openraft::raft::EntryPayload;
119
use openraft::storage::LogState;
1210
use openraft::storage::Snapshot;
1311
use openraft::AnyError;
1412
use openraft::EffectiveMembership;
13+
use openraft::Entry;
14+
use openraft::EntryPayload;
1515
use openraft::ErrorSubject;
1616
use openraft::ErrorVerb;
1717
use openraft::LogId;

memstore/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use std::sync::Arc;
1010
use std::sync::Mutex;
1111

1212
use openraft::async_trait::async_trait;
13-
use openraft::raft::Entry;
14-
use openraft::raft::EntryPayload;
1513
use openraft::storage::LogState;
1614
use openraft::storage::RaftLogReader;
1715
use openraft::storage::RaftSnapshotBuilder;
1816
use openraft::storage::Snapshot;
1917
use openraft::AnyError;
2018
use openraft::EffectiveMembership;
19+
use openraft::Entry;
20+
use openraft::EntryPayload;
2121
use openraft::ErrorSubject;
2222
use openraft::ErrorVerb;
2323
use openraft::LogId;

openraft/src/core/admin.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use crate::metrics::RemoveTarget;
2323
use crate::raft::AddLearnerResponse;
2424
use crate::raft::ChangeMembers;
2525
use crate::raft::ClientWriteResponse;
26-
use crate::raft::EntryPayload;
2726
use crate::raft::RaftRespTx;
2827
use crate::raft_types::LogIdOptionExt;
2928
use crate::versioned::Updatable;
29+
use crate::EntryPayload;
3030
use crate::LogId;
3131
use crate::Membership;
3232
use crate::Node;

openraft/src/core/append_entries.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use crate::core::State;
44
use crate::error::AppendEntriesError;
55
use crate::raft::AppendEntriesRequest;
66
use crate::raft::AppendEntriesResponse;
7-
use crate::raft::Entry;
8-
use crate::raft::EntryPayload;
97
use crate::raft_types::LogIdOptionExt;
108
use crate::EffectiveMembership;
9+
use crate::Entry;
10+
use crate::EntryPayload;
1111
use crate::LogId;
1212
use crate::MessageSummary;
1313
use crate::RaftNetworkFactory;

openraft/src/core/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use crate::raft::AppendEntriesRequest;
2020
use crate::raft::AppendEntriesResponse;
2121
use crate::raft::ClientWriteRequest;
2222
use crate::raft::ClientWriteResponse;
23-
use crate::raft::Entry;
24-
use crate::raft::EntryPayload;
2523
use crate::raft::RaftRespTx;
2624
use crate::replication::RaftEvent;
25+
use crate::Entry;
26+
use crate::EntryPayload;
2727
use crate::MessageSummary;
2828
use crate::RPCTypes;
2929
use crate::RaftNetwork;

openraft/src/core/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ use crate::error::NotAllowed;
4545
use crate::metrics::RaftMetrics;
4646
use crate::metrics::ReplicationMetrics;
4747
use crate::raft::AddLearnerResponse;
48-
use crate::raft::Entry;
49-
use crate::raft::EntryPayload;
5048
use crate::raft::RaftMsg;
5149
use crate::raft::RaftRespTx;
5250
use crate::raft::VoteResponse;
@@ -56,6 +54,8 @@ use crate::replication::ReplicationStream;
5654
use crate::storage::RaftSnapshotBuilder;
5755
use crate::versioned::Versioned;
5856
use crate::vote::Vote;
57+
use crate::Entry;
58+
use crate::EntryPayload;
5959
use crate::LogId;
6060
use crate::Membership;
6161
use crate::MessageSummary;

openraft/src/defensive.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use std::ops::RangeBounds;
44

55
use async_trait::async_trait;
66

7-
use crate::raft::Entry;
87
use crate::raft_types::LogIdOptionExt;
98
use crate::DefensiveError;
9+
use crate::Entry;
1010
use crate::ErrorSubject;
1111
use crate::LogId;
1212
use crate::RaftStorage;

openraft/src/entry.rs

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use std::fmt::Debug;
2+
3+
use crate::LogId;
4+
use crate::Membership;
5+
use crate::MessageSummary;
6+
use crate::NodeId;
7+
use crate::RaftTypeConfig;
8+
9+
/// Defines operations on an entry payload.
10+
pub trait RaftPayload<NID: NodeId> {
11+
/// Return `Some(())` if the entry payload is blank.
12+
fn is_blank(&self) -> bool;
13+
14+
/// Return `Some(&Membership)` if the entry payload is a membership payload.
15+
fn get_membership(&self) -> Option<&Membership<NID>>;
16+
}
17+
18+
/// Defines operations on an entry.
19+
pub trait RaftEntry<NID: NodeId>: RaftPayload<NID> {
20+
fn get_log_id(&self) -> &LogId<NID>;
21+
22+
fn set_log_id(&mut self, log_id: &LogId<NID>);
23+
}
24+
25+
/// Log entry payload variants.
26+
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
27+
pub enum EntryPayload<C: RaftTypeConfig> {
28+
/// An empty payload committed by a new cluster leader.
29+
Blank,
30+
31+
Normal(C::D),
32+
33+
/// A change-membership log entry.
34+
Membership(Membership<C::NodeId>),
35+
}
36+
37+
impl<C: RaftTypeConfig> MessageSummary for EntryPayload<C> {
38+
fn summary(&self) -> String {
39+
match self {
40+
EntryPayload::Blank => "blank".to_string(),
41+
EntryPayload::Normal(_n) => "normal".to_string(),
42+
EntryPayload::Membership(c) => {
43+
format!("membership: {}", c.summary())
44+
}
45+
}
46+
}
47+
}
48+
49+
/// A Raft log entry.
50+
#[derive(Clone, serde::Serialize, serde::Deserialize)]
51+
pub struct Entry<C: RaftTypeConfig> {
52+
pub log_id: LogId<C::NodeId>,
53+
54+
/// This entry's payload.
55+
#[serde(bound = "")]
56+
pub payload: EntryPayload<C>,
57+
}
58+
59+
impl<C: RaftTypeConfig> Debug for Entry<C>
60+
where C::D: Debug
61+
{
62+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63+
f.debug_struct("Entry").field("log_id", &self.log_id).field("payload", &self.payload).finish()
64+
}
65+
}
66+
67+
impl<C: RaftTypeConfig> Default for Entry<C> {
68+
fn default() -> Self {
69+
Self {
70+
log_id: LogId::default(),
71+
payload: EntryPayload::Blank,
72+
}
73+
}
74+
}
75+
76+
impl<C: RaftTypeConfig> MessageSummary for Entry<C> {
77+
fn summary(&self) -> String {
78+
format!("{}:{}", self.log_id, self.payload.summary())
79+
}
80+
}
81+
82+
impl<C: RaftTypeConfig> MessageSummary for Option<Entry<C>> {
83+
fn summary(&self) -> String {
84+
match self {
85+
None => "None".to_string(),
86+
Some(x) => format!("Some({})", x.summary()),
87+
}
88+
}
89+
}
90+
91+
impl<C: RaftTypeConfig> MessageSummary for &[Entry<C>] {
92+
fn summary(&self) -> String {
93+
let entry_refs: Vec<_> = self.iter().collect();
94+
entry_refs.as_slice().summary()
95+
}
96+
}
97+
98+
impl<C: RaftTypeConfig> MessageSummary for &[&Entry<C>] {
99+
fn summary(&self) -> String {
100+
if self.is_empty() {
101+
return "{}".to_string();
102+
}
103+
let mut res = Vec::with_capacity(self.len());
104+
if self.len() <= 5 {
105+
for x in self.iter() {
106+
let e = format!("{}:{}", x.log_id, x.payload.summary());
107+
res.push(e);
108+
}
109+
110+
res.join(",")
111+
} else {
112+
let first = *self.first().unwrap();
113+
let last = *self.last().unwrap();
114+
115+
format!("{} ... {}", first.summary(), last.summary())
116+
}
117+
}
118+
}
119+
120+
impl<C: RaftTypeConfig> RaftPayload<C::NodeId> for EntryPayload<C> {
121+
fn is_blank(&self) -> bool {
122+
matches!(self, EntryPayload::Blank)
123+
}
124+
125+
fn get_membership(&self) -> Option<&Membership<C::NodeId>> {
126+
if let EntryPayload::Membership(m) = self {
127+
Some(m)
128+
} else {
129+
None
130+
}
131+
}
132+
}
133+
134+
impl<C: RaftTypeConfig> RaftPayload<C::NodeId> for Entry<C> {
135+
fn is_blank(&self) -> bool {
136+
self.payload.is_blank()
137+
}
138+
139+
fn get_membership(&self) -> Option<&Membership<C::NodeId>> {
140+
self.payload.get_membership()
141+
}
142+
}
143+
144+
impl<C: RaftTypeConfig> RaftEntry<C::NodeId> for Entry<C> {
145+
fn get_log_id(&self) -> &LogId<C::NodeId> {
146+
&self.log_id
147+
}
148+
149+
fn set_log_id(&mut self, log_id: &LogId<C::NodeId>) {
150+
self.log_id = *log_id;
151+
}
152+
}

openraft/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
mod config;
1111
mod core;
1212
mod defensive;
13+
mod entry;
1314
mod membership;
1415
mod node;
1516
mod raft_types;
@@ -42,6 +43,9 @@ pub use crate::core::EffectiveMembership;
4243
pub use crate::core::State;
4344
pub use crate::defensive::DefensiveCheck;
4445
pub use crate::defensive::DefensiveCheckBase;
46+
pub use crate::entry::Entry;
47+
pub use crate::entry::EntryPayload;
48+
pub use crate::entry::RaftPayload;
4549
pub use crate::membership::Membership;
4650
pub use crate::metrics::RaftMetrics;
4751
pub use crate::network::RPCTypes;

openraft/src/raft.rs

+2-95
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ use crate::metrics::RaftMetrics;
3131
use crate::metrics::Wait;
3232
use crate::AppData;
3333
use crate::AppDataResponse;
34+
use crate::Entry;
35+
use crate::EntryPayload;
3436
use crate::LogId;
3537
use crate::Membership;
3638
use crate::MessageSummary;
@@ -762,101 +764,6 @@ impl<NID: NodeId> MessageSummary for AppendEntriesResponse<NID> {
762764
}
763765
}
764766

765-
/// A Raft log entry.
766-
#[derive(Clone, Serialize, Deserialize)]
767-
pub struct Entry<C: RaftTypeConfig> {
768-
pub log_id: LogId<C::NodeId>,
769-
770-
/// This entry's payload.
771-
#[serde(bound = "")]
772-
pub payload: EntryPayload<C>,
773-
}
774-
775-
impl<C: RaftTypeConfig> Debug for Entry<C>
776-
where C::D: Debug
777-
{
778-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
779-
f.debug_struct("Entry").field("log_id", &self.log_id).field("payload", &self.payload).finish()
780-
}
781-
}
782-
783-
impl<C: RaftTypeConfig> Default for Entry<C> {
784-
fn default() -> Self {
785-
Self {
786-
log_id: LogId::default(),
787-
payload: EntryPayload::Blank,
788-
}
789-
}
790-
}
791-
792-
impl<C: RaftTypeConfig> MessageSummary for Entry<C> {
793-
fn summary(&self) -> String {
794-
format!("{}:{}", self.log_id, self.payload.summary())
795-
}
796-
}
797-
798-
impl<C: RaftTypeConfig> MessageSummary for Option<Entry<C>> {
799-
fn summary(&self) -> String {
800-
match self {
801-
None => "None".to_string(),
802-
Some(x) => format!("Some({})", x.summary()),
803-
}
804-
}
805-
}
806-
807-
impl<C: RaftTypeConfig> MessageSummary for &[Entry<C>] {
808-
fn summary(&self) -> String {
809-
let entry_refs: Vec<_> = self.iter().collect();
810-
entry_refs.as_slice().summary()
811-
}
812-
}
813-
814-
impl<C: RaftTypeConfig> MessageSummary for &[&Entry<C>] {
815-
fn summary(&self) -> String {
816-
if self.is_empty() {
817-
return "{}".to_string();
818-
}
819-
let mut res = Vec::with_capacity(self.len());
820-
if self.len() <= 5 {
821-
for x in self.iter() {
822-
let e = format!("{}:{}", x.log_id, x.payload.summary());
823-
res.push(e);
824-
}
825-
826-
res.join(",")
827-
} else {
828-
let first = *self.first().unwrap();
829-
let last = *self.last().unwrap();
830-
831-
format!("{} ... {}", first.summary(), last.summary())
832-
}
833-
}
834-
}
835-
836-
/// Log entry payload variants.
837-
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
838-
pub enum EntryPayload<C: RaftTypeConfig> {
839-
/// An empty payload committed by a new cluster leader.
840-
Blank,
841-
842-
Normal(C::D),
843-
844-
/// A change-membership log entry.
845-
Membership(Membership<C::NodeId>),
846-
}
847-
848-
impl<C: RaftTypeConfig> MessageSummary for EntryPayload<C> {
849-
fn summary(&self) -> String {
850-
match self {
851-
EntryPayload::Blank => "blank".to_string(),
852-
EntryPayload::Normal(_n) => "normal".to_string(),
853-
EntryPayload::Membership(c) => {
854-
format!("membership: {}", c.summary())
855-
}
856-
}
857-
}
858-
}
859-
860767
/// An RPC sent by candidates to gather votes (§5.2).
861768
#[derive(Debug, Serialize, Deserialize)]
862769
pub struct VoteRequest<C: RaftTypeConfig> {

0 commit comments

Comments
 (0)