Skip to content

Commit 884f0da

Browse files
committed
Feature: add trait RaftLogStorageExt to provide additional raft-log methods
The `RaftLogReaderExt::blocking_append()` method enables the caller to append logs to storage in a blocking manner, eliminating the need to create and await a callback. This method simplifies the process of writing tests.
1 parent 273232c commit 884f0da

File tree

9 files changed

+61
-39
lines changed

9 files changed

+61
-39
lines changed

openraft/src/storage/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub use log_store_ext::RaftLogReaderExt;
1717
use macros::add_async_trait;
1818
pub use snapshot_signature::SnapshotSignature;
1919
pub use v2::RaftLogStorage;
20+
pub use v2::RaftLogStorageExt;
2021
pub use v2::RaftStateMachine;
2122

2223
use crate::display_ext::DisplayOption;

openraft/src/storage/v2.rs

+3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
//! [`RaftStorage`](`crate::storage::RaftStorage`). [`RaftLogStorage`] is responsible for storing
33
//! logs, and [`RaftStateMachine`] is responsible for storing state machine and snapshot.
44
5+
mod raft_log_storage_ext;
6+
57
use macros::add_async_trait;
8+
pub use raft_log_storage_ext::RaftLogStorageExt;
69

710
use crate::storage::callback::LogFlushed;
811
use crate::storage::v2::sealed::Sealed;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use anyerror::AnyError;
2+
use macros::add_async_trait;
3+
4+
use crate::storage::LogFlushed;
5+
use crate::storage::RaftLogStorage;
6+
use crate::type_config::alias::AsyncRuntimeOf;
7+
use crate::AsyncRuntime;
8+
use crate::OptionalSend;
9+
use crate::RaftTypeConfig;
10+
use crate::StorageError;
11+
use crate::StorageIOError;
12+
13+
/// Extension trait for RaftLogStorage to provide utility methods.
14+
///
15+
/// All methods in this trait are provided with default implementation.
16+
#[add_async_trait]
17+
pub trait RaftLogStorageExt<C>: RaftLogStorage<C>
18+
where C: RaftTypeConfig
19+
{
20+
/// Blocking mode append log entries to the storage.
21+
///
22+
/// It blocks until the callback is called by the underlying storage implementation.
23+
async fn blocking_append<I>(&mut self, entries: I) -> Result<(), StorageError<C::NodeId>>
24+
where
25+
I: IntoIterator<Item = C::Entry> + OptionalSend,
26+
I::IntoIter: OptionalSend,
27+
{
28+
let (tx, rx) = AsyncRuntimeOf::<C>::oneshot();
29+
30+
let callback = LogFlushed::new(None, tx);
31+
self.append(entries, callback).await?;
32+
rx.await
33+
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
34+
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;
35+
36+
Ok(())
37+
}
38+
}
39+
40+
impl<C, T> RaftLogStorageExt<C> for T
41+
where
42+
T: RaftLogStorage<C>,
43+
C: RaftTypeConfig,
44+
{
45+
}

openraft/src/testing/mod.rs

-26
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,13 @@ mod suite;
33

44
use std::collections::BTreeSet;
55

6-
use anyerror::AnyError;
76
pub use store_builder::StoreBuilder;
87
pub use suite::Suite;
98

109
use crate::entry::RaftEntry;
11-
use crate::log_id::RaftLogId;
12-
use crate::storage::LogFlushed;
13-
use crate::storage::RaftLogStorage;
14-
use crate::AsyncRuntime;
1510
use crate::CommittedLeaderId;
1611
use crate::LogId;
1712
use crate::RaftTypeConfig;
18-
use crate::StorageError;
19-
use crate::StorageIOError;
2013

2114
/// Builds a log id, for testing purposes.
2215
pub fn log_id<NID: crate::NodeId>(term: u64, node_id: NID, index: u64) -> LogId<NID> {
@@ -43,22 +36,3 @@ pub fn membership_ent<C: RaftTypeConfig>(
4336
crate::Membership::new(config, None),
4437
)
4538
}
46-
47-
/// Append to log and wait for the log to be flushed.
48-
pub async fn blocking_append<C: RaftTypeConfig, LS: RaftLogStorage<C>, I>(
49-
log_store: &mut LS,
50-
entries: I,
51-
) -> Result<(), StorageError<C::NodeId>>
52-
where
53-
I: IntoIterator<Item = C::Entry>,
54-
{
55-
let entries = entries.into_iter().collect::<Vec<_>>();
56-
let last_log_id = entries.last().map(|e| *e.get_log_id()).unwrap();
57-
58-
let (tx, rx) = <C::AsyncRuntime as AsyncRuntime>::oneshot();
59-
let cb = LogFlushed::new(Some(last_log_id), tx);
60-
log_store.append(entries, cb).await?;
61-
rx.await.unwrap().map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;
62-
63-
Ok(())
64-
}

tests/tests/append_entries/t11_append_inconsistent_log.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::Result;
55
use maplit::btreeset;
66
use openraft::storage::RaftLogReaderExt;
77
use openraft::storage::RaftLogStorage;
8-
use openraft::testing;
8+
use openraft::storage::RaftLogStorageExt;
99
use openraft::testing::blank_ent;
1010
use openraft::Config;
1111
use openraft::ServerState;
@@ -57,9 +57,8 @@ async fn append_inconsistent_log() -> Result<()> {
5757
r2.shutdown().await?;
5858

5959
for i in log_index + 1..=100 {
60-
testing::blocking_append(&mut sto0, [blank_ent(2, 0, i)]).await?;
61-
62-
testing::blocking_append(&mut sto2, [blank_ent(3, 0, i)]).await?;
60+
sto0.blocking_append([blank_ent(2, 0, i)]).await?;
61+
sto2.blocking_append([blank_ent(3, 0, i)]).await?;
6362
}
6463

6564
sto0.save_vote(&Vote::new(2, 0)).await?;

tests/tests/elect/t10_elect_compare_last_log.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::time::Duration;
44
use anyhow::Result;
55
use maplit::btreeset;
66
use openraft::storage::RaftLogStorage;
7-
use openraft::testing;
7+
use openraft::storage::RaftLogStorageExt;
88
use openraft::testing::blank_ent;
99
use openraft::testing::membership_ent;
1010
use openraft::Config;
@@ -37,7 +37,7 @@ async fn elect_compare_last_log() -> Result<()> {
3737
{
3838
sto0.save_vote(&Vote::new(10, 0)).await?;
3939

40-
testing::blocking_append(&mut sto0, [
40+
sto0.blocking_append([
4141
//
4242
blank_ent(0, 0, 0),
4343
membership_ent(2, 0, 1, vec![btreeset! {0,1}]),
@@ -49,7 +49,7 @@ async fn elect_compare_last_log() -> Result<()> {
4949
{
5050
sto1.save_vote(&Vote::new(10, 0)).await?;
5151

52-
testing::blocking_append(&mut sto1, [
52+
sto1.blocking_append([
5353
blank_ent(0, 0, 0),
5454
membership_ent(1, 0, 1, vec![btreeset! {0,1}]),
5555
blank_ent(1, 0, 2),

tests/tests/membership/t99_new_leader_auto_commit_uniform_config.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::Arc;
22

33
use anyhow::Result;
44
use maplit::btreeset;
5-
use openraft::testing;
5+
use openraft::storage::RaftLogStorageExt;
66
use openraft::testing::log_id;
77
use openraft::Config;
88
use openraft::Entry;
@@ -39,7 +39,7 @@ async fn new_leader_auto_commit_uniform_config() -> Result<()> {
3939
router.remove_node(0);
4040

4141
{
42-
testing::blocking_append(&mut sto, [Entry {
42+
sto.blocking_append([Entry {
4343
log_id: log_id(1, 0, log_index + 1),
4444
payload: EntryPayload::Membership(Membership::new(
4545
vec![btreeset! {0}, btreeset! {0,1,2}],

tests/tests/snapshot_building/t10_build_snapshot.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use openraft::network::RaftNetwork;
88
use openraft::network::RaftNetworkFactory;
99
use openraft::raft::AppendEntriesRequest;
1010
use openraft::storage::RaftLogReaderExt;
11-
use openraft::testing;
11+
use openraft::storage::RaftLogStorageExt;
1212
use openraft::testing::blank_ent;
1313
use openraft::CommittedLeaderId;
1414
use openraft::Config;
@@ -80,7 +80,7 @@ async fn build_snapshot() -> Result<()> {
8080

8181
// Add a new node and assert that it received the same snapshot.
8282
let (mut sto1, sm1) = router.new_store();
83-
testing::blocking_append(&mut sto1, [blank_ent(0, 0, 0), Entry {
83+
sto1.blocking_append([blank_ent(0, 0, 0), Entry {
8484
log_id: LogId::new(CommittedLeaderId::new(1, 0), 1),
8585
payload: EntryPayload::Membership(Membership::new(vec![btreeset! {0}], None)),
8686
}])

tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use openraft::network::RaftNetworkFactory;
99
use openraft::raft::AppendEntriesRequest;
1010
use openraft::raft::InstallSnapshotRequest;
1111
use openraft::storage::RaftLogStorage;
12+
use openraft::storage::RaftLogStorageExt;
1213
use openraft::storage::RaftStateMachine;
13-
use openraft::testing;
1414
use openraft::testing::blank_ent;
1515
use openraft::testing::log_id;
1616
use openraft::testing::membership_ent;
@@ -60,7 +60,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> {
6060

6161
// When the node starts, it will become candidate and increment its vote to (5,0)
6262
sto0.save_vote(&Vote::new(4, 0)).await?;
63-
testing::blocking_append(&mut sto0, [
63+
sto0.blocking_append([
6464
// manually insert the initializing log
6565
membership_ent(0, 0, 0, vec![btreeset! {0}]),
6666
])

0 commit comments

Comments
 (0)