Skip to content

Commit f40c205

Browse files
committed
Change: Use a RaftConfig trait to configure common types
Currently only the request and response type are configured, but other types can be now easily configured as well, such as `NodeId` (to be done in the next change) or `Runtime`. Storage and network factory types are still provided as a separate type, since otherwise it would be hard to provide storage wrappers for testing. This addresses #204.
1 parent 46644c8 commit f40c205

28 files changed

+398
-337
lines changed

example-raft-kv/src/bin/main.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ use clap::Parser;
22
use env_logger::Env;
33
use example_raft_key_value::network::raft_network_impl::ExampleNetwork;
44
use example_raft_key_value::start_example_raft_node;
5-
use example_raft_key_value::store::ExampleRequest;
6-
use example_raft_key_value::store::ExampleResponse;
75
use example_raft_key_value::store::ExampleStore;
6+
use example_raft_key_value::ExampleConfig;
87
use openraft::Raft;
98

10-
pub type ExampleRaft = Raft<ExampleRequest, ExampleResponse, ExampleNetwork, ExampleStore>;
9+
pub type ExampleRaft = Raft<ExampleConfig, ExampleNetwork, ExampleStore>;
1110

1211
#[derive(Parser, Clone, Debug)]
1312
#[clap(author, version, about, long_about = None)]

example-raft-kv/src/client.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use serde::de::DeserializeOwned;
1919
use serde::Deserialize;
2020
use serde::Serialize;
2121

22+
use crate::ExampleConfig;
2223
use crate::ExampleRequest;
23-
use crate::ExampleResponse;
2424

2525
#[derive(Debug, Clone, Serialize, Deserialize)]
2626
pub struct Empty {}
@@ -54,7 +54,7 @@ impl ExampleClient {
5454
pub async fn write(
5555
&self,
5656
req: &ExampleRequest,
57-
) -> Result<ClientWriteResponse<ExampleResponse>, RPCError<ClientWriteError>> {
57+
) -> Result<ClientWriteResponse<ExampleConfig>, RPCError<ClientWriteError>> {
5858
self.send_rpc_to_leader("write", Some(req)).await
5959
}
6060

@@ -91,7 +91,7 @@ impl ExampleClient {
9191
pub async fn change_membership(
9292
&self,
9393
req: &BTreeSet<NodeId>,
94-
) -> Result<ClientWriteResponse<ExampleResponse>, RPCError<ClientWriteError>> {
94+
) -> Result<ClientWriteResponse<ExampleConfig>, RPCError<ClientWriteError>> {
9595
self.send_rpc_to_leader("change-membership", Some(req)).await
9696
}
9797

example-raft-kv/src/lib.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use actix_web::HttpServer;
88
use openraft::Config;
99
use openraft::NodeId;
1010
use openraft::Raft;
11+
use openraft::RaftConfig;
1112

1213
use crate::app::ExampleApp;
1314
use crate::network::api;
@@ -23,7 +24,14 @@ pub mod client;
2324
pub mod network;
2425
pub mod store;
2526

26-
pub type ExampleRaft = Raft<ExampleRequest, ExampleResponse, ExampleNetwork, Arc<ExampleStore>>;
27+
pub struct ExampleConfig {}
28+
29+
impl RaftConfig for ExampleConfig {
30+
type D = ExampleRequest;
31+
type R = ExampleResponse;
32+
}
33+
34+
pub type ExampleRaft = Raft<ExampleConfig, ExampleNetwork, Arc<ExampleStore>>;
2735

2836
pub async fn start_example_raft_node(node_id: NodeId, http_addr: String) -> std::io::Result<()> {
2937
// Create a configuration for the raft instance.

example-raft-kv/src/network/raft.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use openraft::raft::VoteRequest;
88
use web::Json;
99

1010
use crate::app::ExampleApp;
11-
use crate::store::ExampleRequest;
11+
use crate::ExampleConfig;
1212

1313
// --- Raft communication
1414

@@ -21,7 +21,7 @@ pub async fn vote(app: Data<ExampleApp>, req: Json<VoteRequest>) -> actix_web::R
2121
#[post("/raft-append")]
2222
pub async fn append(
2323
app: Data<ExampleApp>,
24-
req: Json<AppendEntriesRequest<ExampleRequest>>,
24+
req: Json<AppendEntriesRequest<ExampleConfig>>,
2525
) -> actix_web::Result<impl Responder> {
2626
let res = app.raft.append_entries(req.0).await;
2727
Ok(Json(res))

example-raft-kv/src/network/raft_network_impl.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use openraft::RaftNetworkFactory;
1818
use serde::de::DeserializeOwned;
1919
use serde::Serialize;
2020

21-
use crate::store::ExampleRequest;
21+
use crate::ExampleConfig;
2222

2323
pub struct ExampleNetwork {}
2424

@@ -50,7 +50,7 @@ impl ExampleNetwork {
5050

5151
// NOTE: This could be implemented also on `Arc<ExampleNetwork>`, but since it's empty, implemented directly.
5252
#[async_trait]
53-
impl RaftNetworkFactory<ExampleRequest> for ExampleNetwork {
53+
impl RaftNetworkFactory<ExampleConfig> for ExampleNetwork {
5454
type Network = ExampleNetworkConnection;
5555

5656
async fn connect(&mut self, target: NodeId, node: Option<&Node>) -> Self::Network {
@@ -69,10 +69,10 @@ pub struct ExampleNetworkConnection {
6969
}
7070

7171
#[async_trait]
72-
impl RaftNetwork<ExampleRequest> for ExampleNetworkConnection {
72+
impl RaftNetwork<ExampleConfig> for ExampleNetworkConnection {
7373
async fn send_append_entries(
7474
&mut self,
75-
req: AppendEntriesRequest<ExampleRequest>,
75+
req: AppendEntriesRequest<ExampleConfig>,
7676
) -> Result<AppendEntriesResponse, RPCError<AppendEntriesError>> {
7777
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-append", req).await
7878
}

example-raft-kv/src/store/mod.rs

+9-7
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use serde::Deserialize;
2929
use serde::Serialize;
3030
use tokio::sync::RwLock;
3131

32+
use crate::ExampleConfig;
33+
3234
/**
3335
* Here you will set the types of request that will interact with the raft nodes.
3436
* For example the `Set` will be used to write data (key and value) to the raft database.
@@ -87,7 +89,7 @@ pub struct ExampleStore {
8789
last_purged_log_id: RwLock<Option<LogId>>,
8890

8991
/// The Raft log.
90-
log: RwLock<BTreeMap<u64, Entry<ExampleRequest>>>,
92+
log: RwLock<BTreeMap<u64, Entry<ExampleConfig>>>,
9193

9294
/// The Raft state machine.
9395
pub state_machine: RwLock<ExampleStateMachine>,
@@ -101,7 +103,7 @@ pub struct ExampleStore {
101103
}
102104

103105
#[async_trait]
104-
impl RaftLogReader<ExampleRequest, ExampleResponse> for Arc<ExampleStore> {
106+
impl RaftLogReader<ExampleConfig> for Arc<ExampleStore> {
105107
async fn get_log_state(&mut self) -> Result<LogState, StorageError> {
106108
let log = self.log.read().await;
107109
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);
@@ -122,15 +124,15 @@ impl RaftLogReader<ExampleRequest, ExampleResponse> for Arc<ExampleStore> {
122124
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
123125
&mut self,
124126
range: RB,
125-
) -> Result<Vec<Entry<ExampleRequest>>, StorageError> {
127+
) -> Result<Vec<Entry<ExampleConfig>>, StorageError> {
126128
let log = self.log.read().await;
127129
let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>();
128130
Ok(response)
129131
}
130132
}
131133

132134
#[async_trait]
133-
impl RaftSnapshotBuilder<ExampleRequest, ExampleResponse, Cursor<Vec<u8>>> for Arc<ExampleStore> {
135+
impl RaftSnapshotBuilder<ExampleConfig, Cursor<Vec<u8>>> for Arc<ExampleStore> {
134136
#[tracing::instrument(level = "trace", skip(self))]
135137
async fn build_snapshot(&mut self) -> Result<Snapshot<Cursor<Vec<u8>>>, StorageError> {
136138
let (data, last_applied_log);
@@ -185,7 +187,7 @@ impl RaftSnapshotBuilder<ExampleRequest, ExampleResponse, Cursor<Vec<u8>>> for A
185187
}
186188

187189
#[async_trait]
188-
impl RaftStorage<ExampleRequest, ExampleResponse> for Arc<ExampleStore> {
190+
impl RaftStorage<ExampleConfig> for Arc<ExampleStore> {
189191
type SnapshotData = Cursor<Vec<u8>>;
190192
type LogReader = Self;
191193
type SnapshotBuilder = Self;
@@ -203,7 +205,7 @@ impl RaftStorage<ExampleRequest, ExampleResponse> for Arc<ExampleStore> {
203205
}
204206

205207
#[tracing::instrument(level = "trace", skip(self, entries))]
206-
async fn append_to_log(&mut self, entries: &[&Entry<ExampleRequest>]) -> Result<(), StorageError> {
208+
async fn append_to_log(&mut self, entries: &[&Entry<ExampleConfig>]) -> Result<(), StorageError> {
207209
let mut log = self.log.write().await;
208210
for entry in entries {
209211
log.insert(entry.log_id.index, (*entry).clone());
@@ -254,7 +256,7 @@ impl RaftStorage<ExampleRequest, ExampleResponse> for Arc<ExampleStore> {
254256
#[tracing::instrument(level = "trace", skip(self, entries))]
255257
async fn apply_to_state_machine(
256258
&mut self,
257-
entries: &[&Entry<ExampleRequest>],
259+
entries: &[&Entry<ExampleConfig>],
258260
) -> Result<Vec<ExampleResponse>, StorageError> {
259261
let mut res = Vec::with_capacity(entries.len());
260262

memstore/src/lib.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use openraft::EffectiveMembership;
2525
use openraft::ErrorSubject;
2626
use openraft::ErrorVerb;
2727
use openraft::LogId;
28+
use openraft::RaftConfig;
2829
use openraft::RaftStorage;
2930
use openraft::RaftStorageDebug;
3031
use openraft::SnapshotMeta;
@@ -62,6 +63,13 @@ pub struct ClientResponse(Option<String>);
6263

6364
impl AppDataResponse for ClientResponse {}
6465

66+
pub struct Config {}
67+
68+
impl RaftConfig for Config {
69+
type D = ClientRequest;
70+
type R = ClientResponse;
71+
}
72+
6573
/// The application snapshot type which the `MemStore` works with.
6674
#[derive(Debug)]
6775
pub struct MemStoreSnapshot {
@@ -89,7 +97,7 @@ pub struct MemStore {
8997
last_purged_log_id: RwLock<Option<LogId>>,
9098

9199
/// The Raft log.
92-
log: RwLock<BTreeMap<u64, Entry<ClientRequest>>>,
100+
log: RwLock<BTreeMap<u64, Entry<Config>>>,
93101

94102
/// The Raft state machine.
95103
sm: RwLock<MemStoreStateMachine>,
@@ -134,11 +142,11 @@ impl RaftStorageDebug<MemStoreStateMachine> for Arc<MemStore> {
134142
}
135143

136144
#[async_trait]
137-
impl RaftLogReader<ClientRequest, ClientResponse> for Arc<MemStore> {
145+
impl RaftLogReader<Config> for Arc<MemStore> {
138146
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
139147
&mut self,
140148
range: RB,
141-
) -> Result<Vec<Entry<ClientRequest>>, StorageError> {
149+
) -> Result<Vec<Entry<Config>>, StorageError> {
142150
let res = {
143151
let log = self.log.read().await;
144152
log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>()
@@ -166,7 +174,7 @@ impl RaftLogReader<ClientRequest, ClientResponse> for Arc<MemStore> {
166174
}
167175

168176
#[async_trait]
169-
impl RaftSnapshotBuilder<ClientRequest, ClientResponse, Cursor<Vec<u8>>> for Arc<MemStore> {
177+
impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {
170178
#[tracing::instrument(level = "trace", skip(self))]
171179
async fn build_snapshot(&mut self) -> Result<Snapshot<Cursor<Vec<u8>>>, StorageError> {
172180
let (data, last_applied_log);
@@ -225,7 +233,7 @@ impl RaftSnapshotBuilder<ClientRequest, ClientResponse, Cursor<Vec<u8>>> for Arc
225233
}
226234

227235
#[async_trait]
228-
impl RaftStorage<ClientRequest, ClientResponse> for Arc<MemStore> {
236+
impl RaftStorage<Config> for Arc<MemStore> {
229237
type SnapshotData = Cursor<Vec<u8>>;
230238

231239
#[tracing::instrument(level = "trace", skip(self))]
@@ -285,7 +293,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for Arc<MemStore> {
285293
}
286294

287295
#[tracing::instrument(level = "trace", skip(self, entries))]
288-
async fn append_to_log(&mut self, entries: &[&Entry<ClientRequest>]) -> Result<(), StorageError> {
296+
async fn append_to_log(&mut self, entries: &[&Entry<Config>]) -> Result<(), StorageError> {
289297
let mut log = self.log.write().await;
290298
for entry in entries {
291299
log.insert(entry.log_id.index, (*entry).clone());
@@ -296,7 +304,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for Arc<MemStore> {
296304
#[tracing::instrument(level = "trace", skip(self, entries))]
297305
async fn apply_to_state_machine(
298306
&mut self,
299-
entries: &[&Entry<ClientRequest>],
307+
entries: &[&Entry<Config>],
300308
) -> Result<Vec<ClientResponse>, StorageError> {
301309
let mut res = Vec::with_capacity(entries.len());
302310

openraft/src/core/admin.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,16 @@ use crate::raft::ClientWriteResponse;
2323
use crate::raft::EntryPayload;
2424
use crate::raft::RaftRespTx;
2525
use crate::raft_types::LogIdOptionExt;
26-
use crate::AppData;
27-
use crate::AppDataResponse;
2826
use crate::LogId;
2927
use crate::Membership;
3028
use crate::Node;
3129
use crate::NodeId;
30+
use crate::RaftConfig;
3231
use crate::RaftNetworkFactory;
3332
use crate::RaftStorage;
3433
use crate::StorageError;
3534

36-
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> LearnerState<'a, D, R, N, S> {
35+
impl<'a, C: RaftConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LearnerState<'a, C, N, S> {
3736
/// Handle the admin `init_with_config` command.
3837
#[tracing::instrument(level = "debug", skip(self))]
3938
pub(super) async fn handle_init_with_config(&mut self, members: EitherNodesOrIds) -> Result<(), InitializeError> {
@@ -67,7 +66,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorag
6766
}
6867
}
6968

70-
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
69+
impl<'a, C: RaftConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
7170
// add node into learner,return true if the node is already a member or learner
7271
#[tracing::instrument(level = "debug", skip(self))]
7372
async fn add_learner_into_membership(
@@ -162,7 +161,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorag
162161
members: BTreeSet<NodeId>,
163162
blocking: bool,
164163
turn_to_learner: bool,
165-
tx: RaftRespTx<ClientWriteResponse<R>, ClientWriteError>,
164+
tx: RaftRespTx<ClientWriteResponse<C>, ClientWriteError>,
166165
) -> Result<(), StorageError> {
167166
// Ensure cluster will have at least one node.
168167
if members.is_empty() {
@@ -251,7 +250,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorag
251250
pub async fn append_membership_log(
252251
&mut self,
253252
mem: Membership,
254-
resp_tx: Option<RaftRespTx<ClientWriteResponse<R>, ClientWriteError>>,
253+
resp_tx: Option<RaftRespTx<ClientWriteResponse<C>, ClientWriteError>>,
255254
) -> Result<(), StorageError> {
256255
let payload = EntryPayload::Membership(mem.clone());
257256
let entry = self.core.append_payload_to_log(payload).await?;

openraft/src/core/append_entries.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,23 @@ use crate::raft::AppendEntriesResponse;
77
use crate::raft::Entry;
88
use crate::raft::EntryPayload;
99
use crate::raft_types::LogIdOptionExt;
10-
use crate::AppData;
11-
use crate::AppDataResponse;
1210
use crate::EffectiveMembership;
1311
use crate::LogId;
1412
use crate::MessageSummary;
13+
use crate::RaftConfig;
1514
use crate::RaftNetworkFactory;
1615
use crate::RaftStorage;
1716
use crate::StorageError;
1817
use crate::Update;
1918

20-
impl<D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
19+
impl<C: RaftConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C, N, S> {
2120
/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
2221
///
2322
/// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo.
2423
#[tracing::instrument(level = "debug", skip(self, req))]
2524
pub(super) async fn handle_append_entries_request(
2625
&mut self,
27-
req: AppendEntriesRequest<D>,
26+
req: AppendEntriesRequest<C>,
2827
) -> Result<AppendEntriesResponse, AppendEntriesError> {
2928
tracing::debug!(last_log_id=?self.last_log_id, ?self.last_applied, msg=%req.summary(), "handle_append_entries_request");
3029

@@ -134,10 +133,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D,
134133
/// R5 2 4 4
135134
/// ```
136135
///
137-
/// If log 5 is committed by R1, and log 3 is not removed, R5 in future could become a new leader and overrides log
136+
/// If log 5 is committed by R1, and log 3 is not removeC5 in future could become a new leader and overrides log
138137
/// 5 on R3.
139138
#[tracing::instrument(level="trace", skip(self, msg_entries), fields(msg_entries=%msg_entries.summary()))]
140-
async fn find_and_delete_conflict_logs(&mut self, msg_entries: &[Entry<D>]) -> Result<(), StorageError> {
139+
async fn find_and_delete_conflict_logs(&mut self, msg_entries: &[Entry<C>]) -> Result<(), StorageError> {
141140
// all msg_entries are inconsistent logs
142141

143142
tracing::debug!(msg_entries=%msg_entries.summary(), "try to delete_inconsistent_log");
@@ -172,7 +171,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D,
172171
async fn append_apply_log_entries(
173172
&mut self,
174173
prev_log_id: Option<LogId>,
175-
entries: &[Entry<D>],
174+
entries: &[Entry<C>],
176175
committed: Option<LogId>,
177176
) -> Result<AppendEntriesResponse, StorageError> {
178177
let mismatched = self.does_log_id_match(prev_log_id).await?;
@@ -239,8 +238,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D,
239238
/// Filter them out.
240239
pub async fn skip_matching_entries<'s, 'e>(
241240
&'s mut self,
242-
entries: &'e [Entry<D>],
243-
) -> Result<(usize, &'e [Entry<D>]), StorageError> {
241+
entries: &'e [Entry<C>],
242+
) -> Result<(usize, &'e [Entry<C>]), StorageError> {
244243
let l = entries.len();
245244

246245
for i in 0..l {
@@ -307,7 +306,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D,
307306
/// Configuration changes are also detected and applied here. See `configuration changes`
308307
/// in the raft-essentials.md in this repo.
309308
#[tracing::instrument(level = "trace", skip(self, entries), fields(entries=%entries.summary()))]
310-
async fn append_log_entries(&mut self, entries: &[Entry<D>]) -> Result<(), StorageError> {
309+
async fn append_log_entries(&mut self, entries: &[Entry<C>]) -> Result<(), StorageError> {
311310
if entries.is_empty() {
312311
return Ok(());
313312
}

0 commit comments

Comments
 (0)