Skip to content

Commit e39da9f

Browse files
committed
Feature: define custom Entry type for raft log
This commit introduces a new feature that allows applications to define a custom type for Raft log entries in `RaftTypeConfig`. By setting `Entry = MyEntry`, where `MyEntry` implements the `RaftEntry` trait, an application can now define its own log entry type that reflects its architecture. However, the default implementation, the `Entry` type is still available. This change provides more flexibility for applications to tailor the Raft log entry to their specific needs. - Fix: #705 - Changes: `RaftStorage::append_to_log()` and `RaftStorage::apply_to_state_machine()` accepts slice of entries instead of slice of entry references.
1 parent 04a157d commit e39da9f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+436
-417
lines changed

examples/raft-kv-memstore/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub type ExampleNodeId = u64;
3030

3131
openraft::declare_raft_types!(
3232
/// Declare the type configuration for example K/V store.
33-
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode
33+
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode, Entry = openraft::Entry<ExampleTypeConfig>
3434
);
3535

3636
pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;

examples/raft-kv-memstore/src/store/mod.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
198198
}
199199

200200
#[tracing::instrument(level = "trace", skip(self, entries))]
201-
async fn append_to_log(
202-
&mut self,
203-
entries: &[&Entry<ExampleTypeConfig>],
204-
) -> Result<(), StorageError<ExampleNodeId>> {
201+
async fn append_to_log(&mut self, entries: &[Entry<ExampleTypeConfig>]) -> Result<(), StorageError<ExampleNodeId>> {
205202
let mut log = self.log.write().await;
206203
for entry in entries {
207204
log.insert(entry.log_id.index, (*entry).clone());
@@ -258,7 +255,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
258255
#[tracing::instrument(level = "trace", skip(self, entries))]
259256
async fn apply_to_state_machine(
260257
&mut self,
261-
entries: &[&Entry<ExampleTypeConfig>],
258+
entries: &[Entry<ExampleTypeConfig>],
262259
) -> Result<Vec<ExampleResponse>, StorageError<ExampleNodeId>> {
263260
let mut res = Vec::with_capacity(entries.len());
264261

examples/raft-kv-rocksdb/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl Display for ExampleNode {
4343

4444
openraft::declare_raft_types!(
4545
/// Declare the type configuration for example K/V store.
46-
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = ExampleNode
46+
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = ExampleNode, Entry = openraft::Entry<ExampleTypeConfig>
4747
);
4848

4949
pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;

examples/raft-kv-rocksdb/src/store.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
436436
}
437437

438438
#[tracing::instrument(level = "trace", skip(self, entries))]
439-
async fn append_to_log(&mut self, entries: &[&Entry<ExampleTypeConfig>]) -> StorageResult<()> {
439+
async fn append_to_log(&mut self, entries: &[Entry<ExampleTypeConfig>]) -> StorageResult<()> {
440440
for entry in entries {
441441
let id = id_to_bin(entry.log_id.index);
442442
assert_eq!(bin_to_id(&id), entry.log_id.index);
@@ -495,7 +495,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
495495
#[tracing::instrument(level = "trace", skip(self, entries))]
496496
async fn apply_to_state_machine(
497497
&mut self,
498-
entries: &[&Entry<ExampleTypeConfig>],
498+
entries: &[Entry<ExampleTypeConfig>],
499499
) -> Result<Vec<ExampleResponse>, StorageError<ExampleNodeId>> {
500500
let mut res = Vec::with_capacity(entries.len());
501501

memstore/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub type MemNodeId = u64;
7474

7575
openraft::declare_raft_types!(
7676
/// Declare the type configuration for `MemStore`.
77-
pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = ()
77+
pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (), Entry = openraft::Entry<Config>
7878
);
7979

8080
/// The application snapshot type which the `MemStore` works with.
@@ -306,7 +306,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
306306
}
307307

308308
#[tracing::instrument(level = "trace", skip(self, entries))]
309-
async fn append_to_log(&mut self, entries: &[&Entry<Config>]) -> Result<(), StorageError<MemNodeId>> {
309+
async fn append_to_log(&mut self, entries: &[Entry<Config>]) -> Result<(), StorageError<MemNodeId>> {
310310
let mut log = self.log.write().await;
311311
for entry in entries {
312312
log.insert(entry.log_id.index, (*entry).clone());
@@ -317,7 +317,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
317317
#[tracing::instrument(level = "trace", skip(self, entries))]
318318
async fn apply_to_state_machine(
319319
&mut self,
320-
entries: &[&Entry<Config>],
320+
entries: &[Entry<Config>],
321321
) -> Result<Vec<ClientResponse>, StorageError<MemNodeId>> {
322322
let mut res = Vec::with_capacity(entries.len());
323323

openraft/src/compat/compat07.rs

+17-14
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ pub mod testing {
260260

261261
use crate::compat;
262262
use crate::compat::compat07;
263+
use crate::entry::RaftPayload;
264+
use crate::log_id::RaftLogId;
263265

264266
/// Build a v0.7 `RaftStorage` implementation for compatibility test.
265267
#[async_trait::async_trait]
@@ -401,14 +403,14 @@ pub mod testing {
401403
];
402404

403405
assert_eq!(3, got.len());
404-
assert_eq!(want[0].log_id, got[0].log_id);
405-
assert_eq!(want[1].log_id, got[1].log_id);
406-
assert_eq!(want[2].log_id, got[2].log_id);
406+
assert_eq!(want[0].log_id, *got[0].get_log_id());
407+
assert_eq!(want[1].log_id, *got[1].get_log_id());
408+
assert_eq!(want[2].log_id, *got[2].get_log_id());
407409

408-
assert!(matches!(got[0].payload, crate::EntryPayload::Blank));
409-
if let crate::EntryPayload::Membership(m) = &got[1].payload {
410+
assert!(got[0].is_blank());
411+
if let Some(m) = got[1].get_membership() {
410412
assert_eq!(
411-
&crate::Membership::new(
413+
&crate::Membership::<u64, crate::EmptyNode>::new(
412414
vec![btreeset! {1,2}],
413415
btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}},
414416
),
@@ -418,10 +420,11 @@ pub mod testing {
418420
unreachable!("expect Membership");
419421
}
420422

421-
let s = serde_json::to_string(&got[2].payload)?;
422-
let want = serde_json::to_string(&crate::EntryPayload::<BLatest::C>::Normal(
423-
self.builder_latest.sample_app_data(),
424-
))?;
423+
let s = serde_json::to_string(&got[2])?;
424+
let want = serde_json::to_string(&crate::Entry::<BLatest::C> {
425+
log_id: crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 7),
426+
payload: crate::EntryPayload::Normal(self.builder_latest.sample_app_data()),
427+
})?;
425428
assert_eq!(want, s);
426429
}
427430

@@ -464,11 +467,11 @@ pub mod testing {
464467
}];
465468

466469
assert_eq!(1, got.len());
467-
assert_eq!(want[0].log_id, got[0].log_id);
470+
assert_eq!(want[0].log_id, *got[0].get_log_id());
468471

469-
if let crate::EntryPayload::Membership(m) = &got[0].payload {
472+
if let Some(m) = got[0].get_membership() {
470473
assert_eq!(
471-
&crate::Membership::new(
474+
&crate::Membership::<u64, crate::EmptyNode>::new(
472475
vec![btreeset! {1,2}],
473476
btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}},
474477
),
@@ -672,7 +675,7 @@ mod tests {
672675
}
673676

674677
crate::declare_raft_types!(
675-
pub TestingConfig: D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode
678+
pub TestingConfig: D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode, Entry = crate::Entry<TestingConfig>
676679
);
677680

678681
#[test]

openraft/src/core/install_snapshot.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::error::SnapshotMismatch;
88
use crate::raft::InstallSnapshotRequest;
99
use crate::raft::InstallSnapshotResponse;
1010
use crate::raft::InstallSnapshotTx;
11-
use crate::Entry;
1211
use crate::ErrorSubject;
1312
use crate::ErrorVerb;
1413
use crate::MessageSummary;
@@ -36,7 +35,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
3635
tracing::debug!(req = display(req.summary()));
3736

3837
let res = self.engine.vote_handler().handle_message_vote(&req.vote);
39-
self.run_engine_commands::<Entry<C>>(&[]).await?;
38+
self.run_engine_commands(&[]).await?;
4039
if res.is_err() {
4140
tracing::info!(
4241
my_vote = display(self.engine.state.vote_ref()),
@@ -178,7 +177,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
178177
self.received_snapshot.insert(meta.snapshot_id.clone(), snapshot_data);
179178

180179
self.engine.following_handler().install_snapshot(meta);
181-
self.run_engine_commands::<Entry<C>>(&[]).await?;
180+
self.run_engine_commands(&[]).await?;
182181

183182
Ok(())
184183
}

0 commit comments

Comments
 (0)