Skip to content

Commit e4b705c

Browse files
authored
Change: Turn Node into a trait (#480)
Structs that depend on `Node` now have to implement `trait Node`, or use a predefined basic implementation `BasicNode`. E.g., `struct Membership` now has two type parameters: `impl<NID, N> Membership<NID, N> where N: Node, NID: NodeId`. Signed-off-by: Heinz N. Gies <[email protected]>
1 parent 0d024b3 commit e4b705c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1014
-555
lines changed

examples/raft-kv-memstore/src/client.rs

+30-12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use openraft::error::RPCError;
1313
use openraft::error::RemoteError;
1414
use openraft::raft::AddLearnerResponse;
1515
use openraft::raft::ClientWriteResponse;
16+
use openraft::BasicNode;
1617
use openraft::RaftMetrics;
1718
use reqwest::Client;
1819
use serde::de::DeserializeOwned;
@@ -55,14 +56,17 @@ impl ExampleClient {
5556
pub async fn write(
5657
&self,
5758
req: &ExampleRequest,
58-
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
59+
) -> Result<
60+
ClientWriteResponse<ExampleTypeConfig>,
61+
RPCError<ExampleNodeId, BasicNode, ClientWriteError<ExampleNodeId, BasicNode>>,
62+
> {
5963
self.send_rpc_to_leader("write", Some(req)).await
6064
}
6165

6266
/// Read value by key, in an inconsistent mode.
6367
///
6468
/// This method may return stale value because it does not force to read on a legal leader.
65-
pub async fn read(&self, req: &String) -> Result<String, RPCError<ExampleNodeId, Infallible>> {
69+
pub async fn read(&self, req: &String) -> Result<String, RPCError<ExampleNodeId, BasicNode, Infallible>> {
6670
self.do_send_rpc_to_leader("read", Some(req)).await
6771
}
6872

@@ -72,7 +76,7 @@ impl ExampleClient {
7276
pub async fn consistent_read(
7377
&self,
7478
req: &String,
75-
) -> Result<String, RPCError<ExampleNodeId, CheckIsLeaderError<ExampleNodeId>>> {
79+
) -> Result<String, RPCError<ExampleNodeId, BasicNode, CheckIsLeaderError<ExampleNodeId, BasicNode>>> {
7680
self.do_send_rpc_to_leader("consistent_read", Some(req)).await
7781
}
7882

@@ -84,7 +88,9 @@ impl ExampleClient {
8488
/// With a initialized cluster, new node can be added with [`write`].
8589
/// Then setup replication with [`add_learner`].
8690
/// Then make the new node a member with [`change_membership`].
87-
pub async fn init(&self) -> Result<(), RPCError<ExampleNodeId, InitializeError<ExampleNodeId>>> {
91+
pub async fn init(
92+
&self,
93+
) -> Result<(), RPCError<ExampleNodeId, BasicNode, InitializeError<ExampleNodeId, BasicNode>>> {
8894
self.do_send_rpc_to_leader("init", Some(&Empty {})).await
8995
}
9096

@@ -94,7 +100,10 @@ impl ExampleClient {
94100
pub async fn add_learner(
95101
&self,
96102
req: (ExampleNodeId, String),
97-
) -> Result<AddLearnerResponse<ExampleNodeId>, RPCError<ExampleNodeId, AddLearnerError<ExampleNodeId>>> {
103+
) -> Result<
104+
AddLearnerResponse<ExampleNodeId>,
105+
RPCError<ExampleNodeId, BasicNode, AddLearnerError<ExampleNodeId, BasicNode>>,
106+
> {
98107
self.send_rpc_to_leader("add-learner", Some(&req)).await
99108
}
100109

@@ -105,7 +114,10 @@ impl ExampleClient {
105114
pub async fn change_membership(
106115
&self,
107116
req: &BTreeSet<ExampleNodeId>,
108-
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
117+
) -> Result<
118+
ClientWriteResponse<ExampleTypeConfig>,
119+
RPCError<ExampleNodeId, BasicNode, ClientWriteError<ExampleNodeId, BasicNode>>,
120+
> {
109121
self.send_rpc_to_leader("change-membership", Some(req)).await
110122
}
111123

@@ -114,7 +126,9 @@ impl ExampleClient {
114126
/// Metrics contains various information about the cluster, such as current leader,
115127
/// membership config, replication status etc.
116128
/// See [`RaftMetrics`].
117-
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleNodeId>, RPCError<ExampleNodeId, Infallible>> {
129+
pub async fn metrics(
130+
&self,
131+
) -> Result<RaftMetrics<ExampleNodeId, BasicNode>, RPCError<ExampleNodeId, BasicNode, Infallible>> {
118132
self.do_send_rpc_to_leader("metrics", None::<&()>).await
119133
}
120134

@@ -129,7 +143,7 @@ impl ExampleClient {
129143
&self,
130144
uri: &str,
131145
req: Option<&Req>,
132-
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
146+
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
133147
where
134148
Req: Serialize + 'static,
135149
Resp: Serialize + DeserializeOwned,
@@ -174,17 +188,21 @@ impl ExampleClient {
174188
&self,
175189
uri: &str,
176190
req: Option<&Req>,
177-
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
191+
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
178192
where
179193
Req: Serialize + 'static,
180194
Resp: Serialize + DeserializeOwned,
181-
Err: std::error::Error + Serialize + DeserializeOwned + TryInto<ForwardToLeader<ExampleNodeId>> + Clone,
195+
Err: std::error::Error
196+
+ Serialize
197+
+ DeserializeOwned
198+
+ TryInto<ForwardToLeader<ExampleNodeId, BasicNode>>
199+
+ Clone,
182200
{
183201
// Retry at most 3 times to find a valid leader.
184202
let mut n_retry = 3;
185203

186204
loop {
187-
let res: Result<Resp, RPCError<ExampleNodeId, Err>> = self.do_send_rpc_to_leader(uri, req).await;
205+
let res: Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>> = self.do_send_rpc_to_leader(uri, req).await;
188206

189207
let rpc_err = match res {
190208
Ok(x) => return Ok(x),
@@ -193,7 +211,7 @@ impl ExampleClient {
193211

194212
if let RPCError::RemoteError(remote_err) = &rpc_err {
195213
let forward_err_res =
196-
<Err as TryInto<ForwardToLeader<ExampleNodeId>>>::try_into(remote_err.source.clone());
214+
<Err as TryInto<ForwardToLeader<ExampleNodeId, BasicNode>>>::try_into(remote_err.source.clone());
197215

198216
if let Ok(ForwardToLeader {
199217
leader_id: Some(leader_id),

examples/raft-kv-memstore/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use actix_web::middleware::Logger;
55
use actix_web::web::Data;
66
use actix_web::App;
77
use actix_web::HttpServer;
8+
use openraft::BasicNode;
89
use openraft::Config;
910
use openraft::Raft;
1011

@@ -26,7 +27,7 @@ pub type ExampleNodeId = u64;
2627

2728
openraft::declare_raft_types!(
2829
/// Declare the type configuration for example K/V store.
29-
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId
30+
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode
3031
);
3132

3233
pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;

examples/raft-kv-memstore/src/network/api.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use actix_web::web::Data;
44
use actix_web::Responder;
55
use openraft::error::CheckIsLeaderError;
66
use openraft::error::Infallible;
7+
use openraft::BasicNode;
78
use web::Json;
89

910
use crate::app::ExampleApp;
@@ -45,7 +46,7 @@ pub async fn consistent_read(app: Data<ExampleApp>, req: Json<String>) -> actix_
4546
let key = req.0;
4647
let value = state_machine.data.get(&key).cloned();
4748

48-
let res: Result<String, CheckIsLeaderError<ExampleNodeId>> = Ok(value.unwrap_or_default());
49+
let res: Result<String, CheckIsLeaderError<ExampleNodeId, BasicNode>> = Ok(value.unwrap_or_default());
4950
Ok(Json(res))
5051
}
5152
Err(e) => Ok(Json(Err(e))),

examples/raft-kv-memstore/src/network/management.rs

+4-10
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use actix_web::web;
77
use actix_web::web::Data;
88
use actix_web::Responder;
99
use openraft::error::Infallible;
10-
use openraft::Node;
10+
use openraft::BasicNode;
1111
use openraft::RaftMetrics;
1212
use web::Json;
1313

@@ -27,10 +27,7 @@ pub async fn add_learner(
2727
req: Json<(ExampleNodeId, String)>,
2828
) -> actix_web::Result<impl Responder> {
2929
let node_id = req.0 .0;
30-
let node = Node {
31-
addr: req.0 .1.clone(),
32-
..Default::default()
33-
};
30+
let node = BasicNode { addr: req.0 .1.clone() };
3431
let res = app.raft.add_learner(node_id, Some(node), true).await;
3532
Ok(Json(res))
3633
}
@@ -49,10 +46,7 @@ pub async fn change_membership(
4946
#[post("/init")]
5047
pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
5148
let mut nodes = BTreeMap::new();
52-
nodes.insert(app.id, Node {
53-
addr: app.addr.clone(),
54-
data: Default::default(),
55-
});
49+
nodes.insert(app.id, BasicNode { addr: app.addr.clone() });
5650
let res = app.raft.initialize(nodes).await;
5751
Ok(Json(res))
5852
}
@@ -62,6 +56,6 @@ pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
6256
pub async fn metrics(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
6357
let metrics = app.raft.metrics().borrow().clone();
6458

65-
let res: Result<RaftMetrics<ExampleNodeId>, Infallible> = Ok(metrics);
59+
let res: Result<RaftMetrics<ExampleNodeId, BasicNode>, Infallible> = Ok(metrics);
6660
Ok(Json(res))
6761
}

examples/raft-kv-memstore/src/network/raft_network_impl.rs

+14-9
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use openraft::raft::InstallSnapshotRequest;
1111
use openraft::raft::InstallSnapshotResponse;
1212
use openraft::raft::VoteRequest;
1313
use openraft::raft::VoteResponse;
14-
use openraft::Node;
14+
use openraft::BasicNode;
1515
use openraft::RaftNetwork;
1616
use openraft::RaftNetworkFactory;
1717
use serde::de::DeserializeOwned;
@@ -26,10 +26,10 @@ impl ExampleNetwork {
2626
pub async fn send_rpc<Req, Resp, Err>(
2727
&self,
2828
target: ExampleNodeId,
29-
target_node: Option<&Node>,
29+
target_node: Option<&BasicNode>,
3030
uri: &str,
3131
req: Req,
32-
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
32+
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
3333
where
3434
Req: Serialize,
3535
Err: std::error::Error + DeserializeOwned,
@@ -57,7 +57,7 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
5757
async fn connect(
5858
&mut self,
5959
target: ExampleNodeId,
60-
node: Option<&Node>,
60+
node: Option<&BasicNode>,
6161
) -> Result<Self::Network, Self::ConnectionError> {
6262
Ok(ExampleNetworkConnection {
6363
owner: ExampleNetwork {},
@@ -70,30 +70,35 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
7070
pub struct ExampleNetworkConnection {
7171
owner: ExampleNetwork,
7272
target: ExampleNodeId,
73-
target_node: Option<Node>,
73+
target_node: Option<BasicNode>,
7474
}
7575

7676
#[async_trait]
7777
impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
7878
async fn send_append_entries(
7979
&mut self,
8080
req: AppendEntriesRequest<ExampleTypeConfig>,
81-
) -> Result<AppendEntriesResponse<ExampleNodeId>, RPCError<ExampleNodeId, AppendEntriesError<ExampleNodeId>>> {
81+
) -> Result<
82+
AppendEntriesResponse<ExampleNodeId>,
83+
RPCError<ExampleNodeId, BasicNode, AppendEntriesError<ExampleNodeId>>,
84+
> {
8285
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-append", req).await
8386
}
8487

8588
async fn send_install_snapshot(
8689
&mut self,
8790
req: InstallSnapshotRequest<ExampleTypeConfig>,
88-
) -> Result<InstallSnapshotResponse<ExampleNodeId>, RPCError<ExampleNodeId, InstallSnapshotError<ExampleNodeId>>>
89-
{
91+
) -> Result<
92+
InstallSnapshotResponse<ExampleNodeId>,
93+
RPCError<ExampleNodeId, BasicNode, InstallSnapshotError<ExampleNodeId>>,
94+
> {
9095
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-snapshot", req).await
9196
}
9297

9398
async fn send_vote(
9499
&mut self,
95100
req: VoteRequest<ExampleNodeId>,
96-
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, VoteError<ExampleNodeId>>> {
101+
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, BasicNode, VoteError<ExampleNodeId>>> {
97102
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-vote", req).await
98103
}
99104
}

examples/raft-kv-memstore/src/store/mod.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use openraft::async_trait::async_trait;
99
use openraft::storage::LogState;
1010
use openraft::storage::Snapshot;
1111
use openraft::AnyError;
12+
use openraft::BasicNode;
1213
use openraft::EffectiveMembership;
1314
use openraft::Entry;
1415
use openraft::EntryPayload;
@@ -56,7 +57,7 @@ pub struct ExampleResponse {
5657

5758
#[derive(Debug)]
5859
pub struct ExampleSnapshot {
59-
pub meta: SnapshotMeta<ExampleNodeId>,
60+
pub meta: SnapshotMeta<ExampleNodeId, BasicNode>,
6061

6162
/// The data of the state machine at the time of this snapshot.
6263
pub data: Vec<u8>,
@@ -73,7 +74,7 @@ pub struct ExampleStateMachine {
7374
pub last_applied_log: Option<LogId<ExampleNodeId>>,
7475

7576
// TODO: it should not be Option.
76-
pub last_membership: EffectiveMembership<ExampleNodeId>,
77+
pub last_membership: EffectiveMembership<ExampleNodeId, BasicNode>,
7778

7879
/// Application data.
7980
pub data: BTreeMap<String, String>,
@@ -131,7 +132,7 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
131132
#[tracing::instrument(level = "trace", skip(self))]
132133
async fn build_snapshot(
133134
&mut self,
134-
) -> Result<Snapshot<ExampleNodeId, Cursor<Vec<u8>>>, StorageError<ExampleNodeId>> {
135+
) -> Result<Snapshot<ExampleNodeId, BasicNode, Cursor<Vec<u8>>>, StorageError<ExampleNodeId>> {
135136
let data;
136137
let last_applied_log;
137138
let last_membership;
@@ -256,7 +257,13 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
256257

257258
async fn last_applied_state(
258259
&mut self,
259-
) -> Result<(Option<LogId<ExampleNodeId>>, EffectiveMembership<ExampleNodeId>), StorageError<ExampleNodeId>> {
260+
) -> Result<
261+
(
262+
Option<LogId<ExampleNodeId>>,
263+
EffectiveMembership<ExampleNodeId, BasicNode>,
264+
),
265+
StorageError<ExampleNodeId>,
266+
> {
260267
let state_machine = self.state_machine.read().await;
261268
Ok((state_machine.last_applied_log, state_machine.last_membership.clone()))
262269
}
@@ -302,7 +309,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
302309
#[tracing::instrument(level = "trace", skip(self, snapshot))]
303310
async fn install_snapshot(
304311
&mut self,
305-
meta: &SnapshotMeta<ExampleNodeId>,
312+
meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
306313
snapshot: Box<Self::SnapshotData>,
307314
) -> Result<StateMachineChanges<ExampleTypeConfig>, StorageError<ExampleNodeId>> {
308315
tracing::info!(
@@ -341,7 +348,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
341348
#[tracing::instrument(level = "trace", skip(self))]
342349
async fn get_current_snapshot(
343350
&mut self,
344-
) -> Result<Option<Snapshot<ExampleNodeId, Self::SnapshotData>>, StorageError<ExampleNodeId>> {
351+
) -> Result<Option<Snapshot<ExampleNodeId, BasicNode, Self::SnapshotData>>, StorageError<ExampleNodeId>> {
345352
match &*self.current_snapshot.read().await {
346353
Some(snapshot) => {
347354
let data = snapshot.data.clone();

examples/raft-kv-memstore/tests/cluster/test_cluster.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use maplit::btreemap;
1010
use maplit::btreeset;
1111
use openraft::error::NodeNotFound;
1212
use openraft::AnyError;
13-
use openraft::Node;
13+
use openraft::BasicNode;
1414
use tokio::runtime::Runtime;
1515

1616
/// Setup a cluster of 3 nodes.
@@ -90,9 +90,9 @@ async fn test_cluster() -> anyhow::Result<()> {
9090
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
9191
assert_eq!(
9292
btreemap! {
93-
1 => Some(Node::new("127.0.0.1:21001")),
94-
2 => Some(Node::new("127.0.0.1:21002")),
95-
3 => Some(Node::new("127.0.0.1:21003")),
93+
1 => Some(BasicNode::new("127.0.0.1:21001")),
94+
2 => Some(BasicNode::new("127.0.0.1:21002")),
95+
3 => Some(BasicNode::new("127.0.0.1:21003")),
9696
},
9797
nodes_in_cluster
9898
);
@@ -188,7 +188,7 @@ async fn test_cluster() -> anyhow::Result<()> {
188188
match x {
189189
Err(e) => {
190190
let s = e.to_string();
191-
let expect_err:String = "error occur on remote peer 2: has to forward request to: Some(1), Some(Node { addr: \"127.0.0.1:21001\", data: {} })".to_string();
191+
let expect_err:String = "error occur on remote peer 2: has to forward request to: Some(1), Some(BasicNode { addr: \"127.0.0.1:21001\" })".to_string();
192192

193193
assert_eq!(s, expect_err);
194194
}

0 commit comments

Comments
 (0)