Skip to content

Commit a76f41a

Browse files
committed
Change: Refactor storage APIs
This is a larger refactoring of storage APIs to allow more clear ownership of data. RaftStorage is now refactored to: - RaftLogReader to read data from the log in parallel tasks independent of the main Raft loop - RaftStorage to modify the log and the state machine (implements also RaftLogReader) intended to be used in the main Raft loop - RaftSnapshotBuilder to build the snapshot in background independent of the main Raft loop The RaftStorage API offers to create new RaftLogReader or RaftSnapshotBuilder on it. RaftNetwork is also refactored to: - RaftNetwork responsible for sending RPCs - RaftNetworkFactory responsible for creating instances of RaftNetwork for sending data to a particular node All these traits now take &mut self, so it's possible to heavily optimize a lot of stuff by removing any synchronization/lookups and in general to streamline the implementation of the "real" store and network. All traits are built in such a way that they can be implemented for Arc<T> (or similar) to basically model the original behaviour with &self receiver. DefensiveCheck was also split to DefensiveCheckBase and DefensiveCheck to allow creating separate checkers for checked LogReader. There is a single TODO in LogReaderExt for one missing check. Tests run. Further optimizations of the API to yet better streamline applying log entries to the state machine (if the state machine is complex and can process requests in the background) are still pending. Further optimizations of the LogReader API to "invert" the semantics to send the functor to process the log entry to the log instead of materializing the log on heap are also pending. async_trait optimization is pending Configuration of types to use for optimizing various synchronization operations (like one-shot channel) is pending.
1 parent 2055a23 commit a76f41a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+816
-557
lines changed

memstore/src/lib.rs

+107-84
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use openraft::async_trait::async_trait;
1616
use openraft::raft::Entry;
1717
use openraft::raft::EntryPayload;
1818
use openraft::storage::LogState;
19+
use openraft::storage::RaftLogReader;
20+
use openraft::storage::RaftSnapshotBuilder;
1921
use openraft::storage::Snapshot;
2022
use openraft::AppData;
2123
use openraft::AppDataResponse;
@@ -117,35 +119,24 @@ impl MemStore {
117119
current_snapshot,
118120
}
119121
}
122+
123+
pub async fn new_arc() -> Arc<Self> {
124+
Arc::new(Self::new().await)
125+
}
120126
}
121127

122128
#[async_trait]
123-
impl RaftStorageDebug<MemStoreStateMachine> for MemStore {
129+
impl RaftStorageDebug<MemStoreStateMachine> for Arc<MemStore> {
124130
/// Get a handle to the state machine for testing purposes.
125-
async fn get_state_machine(&self) -> MemStoreStateMachine {
131+
async fn get_state_machine(&mut self) -> MemStoreStateMachine {
126132
self.sm.write().await.clone()
127133
}
128134
}
129135

130136
#[async_trait]
131-
impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
132-
type SnapshotData = Cursor<Vec<u8>>;
133-
134-
#[tracing::instrument(level = "trace", skip(self))]
135-
async fn save_vote(&self, vote: &Vote) -> Result<(), StorageError> {
136-
tracing::debug!(?vote, "save_vote");
137-
let mut h = self.vote.write().await;
138-
139-
*h = Some(*vote);
140-
Ok(())
141-
}
142-
143-
async fn read_vote(&self) -> Result<Option<Vote>, StorageError> {
144-
Ok(*self.vote.read().await)
145-
}
146-
137+
impl RaftLogReader<ClientRequest, ClientResponse> for Arc<MemStore> {
147138
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
148-
&self,
139+
&mut self,
149140
range: RB,
150141
) -> Result<Vec<Entry<ClientRequest>>, StorageError> {
151142
let res = {
@@ -156,7 +147,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
156147
Ok(res)
157148
}
158149

159-
async fn get_log_state(&self) -> Result<LogState, StorageError> {
150+
async fn get_log_state(&mut self) -> Result<LogState, StorageError> {
160151
let log = self.log.read().await;
161152
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);
162153

@@ -172,14 +163,91 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
172163
last_log_id: last,
173164
})
174165
}
166+
}
167+
168+
#[async_trait]
169+
impl RaftSnapshotBuilder<ClientRequest, ClientResponse, Cursor<Vec<u8>>> for Arc<MemStore> {
170+
#[tracing::instrument(level = "trace", skip(self))]
171+
async fn build_snapshot(&mut self) -> Result<Snapshot<Cursor<Vec<u8>>>, StorageError> {
172+
let (data, last_applied_log);
173+
174+
{
175+
// Serialize the data of the state machine.
176+
let sm = self.sm.read().await;
177+
data = serde_json::to_vec(&*sm)
178+
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?;
179+
180+
last_applied_log = sm.last_applied_log;
181+
}
182+
183+
let last_applied_log = match last_applied_log {
184+
None => {
185+
panic!("can not compact empty state machine");
186+
}
187+
Some(x) => x,
188+
};
189+
190+
let snapshot_size = data.len();
191+
192+
let snapshot_idx = {
193+
let mut l = self.snapshot_idx.lock().unwrap();
194+
*l += 1;
195+
*l
196+
};
197+
198+
let snapshot_id = format!(
199+
"{}-{}-{}",
200+
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
201+
);
202+
203+
let meta = SnapshotMeta {
204+
last_log_id: last_applied_log,
205+
snapshot_id,
206+
};
207+
208+
let snapshot = MemStoreSnapshot {
209+
meta: meta.clone(),
210+
data: data.clone(),
211+
};
212+
213+
{
214+
let mut current_snapshot = self.current_snapshot.write().await;
215+
*current_snapshot = Some(snapshot);
216+
}
217+
218+
tracing::info!(snapshot_size, "log compaction complete");
219+
220+
Ok(Snapshot {
221+
meta,
222+
snapshot: Box::new(Cursor::new(data)),
223+
})
224+
}
225+
}
175226

176-
async fn last_applied_state(&self) -> Result<(Option<LogId>, Option<EffectiveMembership>), StorageError> {
227+
#[async_trait]
228+
impl RaftStorage<ClientRequest, ClientResponse> for Arc<MemStore> {
229+
type SnapshotData = Cursor<Vec<u8>>;
230+
231+
#[tracing::instrument(level = "trace", skip(self))]
232+
async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> {
233+
tracing::debug!(?vote, "save_vote");
234+
let mut h = self.vote.write().await;
235+
236+
*h = Some(*vote);
237+
Ok(())
238+
}
239+
240+
async fn read_vote(&mut self) -> Result<Option<Vote>, StorageError> {
241+
Ok(*self.vote.read().await)
242+
}
243+
244+
async fn last_applied_state(&mut self) -> Result<(Option<LogId>, Option<EffectiveMembership>), StorageError> {
177245
let sm = self.sm.read().await;
178246
Ok((sm.last_applied_log, sm.last_membership.clone()))
179247
}
180248

181249
#[tracing::instrument(level = "debug", skip(self))]
182-
async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError> {
250+
async fn delete_conflict_logs_since(&mut self, log_id: LogId) -> Result<(), StorageError> {
183251
tracing::debug!("delete_log: [{:?}, +oo)", log_id);
184252

185253
{
@@ -195,7 +263,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
195263
}
196264

197265
#[tracing::instrument(level = "debug", skip(self))]
198-
async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError> {
266+
async fn purge_logs_upto(&mut self, log_id: LogId) -> Result<(), StorageError> {
199267
tracing::debug!("delete_log: [{:?}, +oo)", log_id);
200268

201269
{
@@ -217,7 +285,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
217285
}
218286

219287
#[tracing::instrument(level = "trace", skip(self, entries))]
220-
async fn append_to_log(&self, entries: &[&Entry<ClientRequest>]) -> Result<(), StorageError> {
288+
async fn append_to_log(&mut self, entries: &[&Entry<ClientRequest>]) -> Result<(), StorageError> {
221289
let mut log = self.log.write().await;
222290
for entry in entries {
223291
log.insert(entry.log_id.index, (*entry).clone());
@@ -227,7 +295,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
227295

228296
#[tracing::instrument(level = "trace", skip(self, entries))]
229297
async fn apply_to_state_machine(
230-
&self,
298+
&mut self,
231299
entries: &[&Entry<ClientRequest>],
232300
) -> Result<Vec<ClientResponse>, StorageError> {
233301
let mut res = Vec::with_capacity(entries.len());
@@ -262,69 +330,13 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
262330
}
263331

264332
#[tracing::instrument(level = "trace", skip(self))]
265-
async fn build_snapshot(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError> {
266-
let (data, last_applied_log);
267-
268-
{
269-
// Serialize the data of the state machine.
270-
let sm = self.sm.read().await;
271-
data = serde_json::to_vec(&*sm)
272-
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?;
273-
274-
last_applied_log = sm.last_applied_log;
275-
}
276-
277-
let last_applied_log = match last_applied_log {
278-
None => {
279-
panic!("can not compact empty state machine");
280-
}
281-
Some(x) => x,
282-
};
283-
284-
let snapshot_size = data.len();
285-
286-
let snapshot_idx = {
287-
let mut l = self.snapshot_idx.lock().unwrap();
288-
*l += 1;
289-
*l
290-
};
291-
292-
let snapshot_id = format!(
293-
"{}-{}-{}",
294-
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
295-
);
296-
297-
let meta = SnapshotMeta {
298-
last_log_id: last_applied_log,
299-
snapshot_id,
300-
};
301-
302-
let snapshot = MemStoreSnapshot {
303-
meta: meta.clone(),
304-
data: data.clone(),
305-
};
306-
307-
{
308-
let mut current_snapshot = self.current_snapshot.write().await;
309-
*current_snapshot = Some(snapshot);
310-
}
311-
312-
tracing::info!(snapshot_size, "log compaction complete");
313-
314-
Ok(Snapshot {
315-
meta,
316-
snapshot: Box::new(Cursor::new(data)),
317-
})
318-
}
319-
320-
#[tracing::instrument(level = "trace", skip(self))]
321-
async fn begin_receiving_snapshot(&self) -> Result<Box<Self::SnapshotData>, StorageError> {
333+
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError> {
322334
Ok(Box::new(Cursor::new(Vec::new())))
323335
}
324336

325337
#[tracing::instrument(level = "trace", skip(self, snapshot))]
326338
async fn install_snapshot(
327-
&self,
339+
&mut self,
328340
meta: &SnapshotMeta,
329341
snapshot: Box<Self::SnapshotData>,
330342
) -> Result<StateMachineChanges, StorageError> {
@@ -368,7 +380,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
368380
}
369381

370382
#[tracing::instrument(level = "trace", skip(self))]
371-
async fn get_current_snapshot(&self) -> Result<Option<Snapshot<Self::SnapshotData>>, StorageError> {
383+
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<Self::SnapshotData>>, StorageError> {
372384
match &*self.current_snapshot.read().await {
373385
Some(snapshot) => {
374386
let data = snapshot.data.clone();
@@ -380,4 +392,15 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
380392
None => Ok(None),
381393
}
382394
}
395+
396+
type LogReader = Self;
397+
type SnapshotBuilder = Self;
398+
399+
async fn get_log_reader(&mut self) -> Self::LogReader {
400+
self.clone()
401+
}
402+
403+
async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
404+
self.clone()
405+
}
383406
}

memstore/src/test.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@ use crate::MemStore;
2626
/// ```
2727
#[test]
2828
pub fn test_mem_store() -> Result<(), StorageError> {
29-
Suite::test_all(MemStore::new)?;
29+
Suite::test_all(MemStore::new_arc)?;
3030
Ok(())
3131
}

openraft/src/core/admin.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ use crate::LogId;
2929
use crate::Membership;
3030
use crate::Node;
3131
use crate::NodeId;
32-
use crate::RaftNetwork;
32+
use crate::RaftNetworkFactory;
3333
use crate::RaftStorage;
3434
use crate::StorageError;
3535

36-
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LearnerState<'a, D, R, N, S> {
36+
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> LearnerState<'a, D, R, N, S> {
3737
/// Handle the admin `init_with_config` command.
3838
#[tracing::instrument(level = "debug", skip(self))]
3939
pub(super) async fn handle_init_with_config(&mut self, members: EitherNodesOrIds) -> Result<(), InitializeError> {
@@ -67,7 +67,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
6767
}
6868
}
6969

70-
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
70+
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
7171
// add node into learner,return true if the node is already a member or learner
7272
#[tracing::instrument(level = "debug", skip(self))]
7373
async fn add_learner_into_membership(
@@ -139,10 +139,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
139139
}
140140

141141
if blocking {
142-
let state = self.spawn_replication_stream(target, Some(tx));
142+
let state = self.spawn_replication_stream(target, Some(tx)).await;
143143
self.nodes.insert(target, state);
144144
} else {
145-
let state = self.spawn_replication_stream(target, None);
145+
let state = self.spawn_replication_stream(target, None).await;
146146
self.nodes.insert(target, state);
147147

148148
// non-blocking mode, do not know about the replication stat.

openraft/src/core/append_entries.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ use crate::AppDataResponse;
1212
use crate::EffectiveMembership;
1313
use crate::LogId;
1414
use crate::MessageSummary;
15-
use crate::RaftNetwork;
15+
use crate::RaftNetworkFactory;
1616
use crate::RaftStorage;
1717
use crate::StorageError;
1818
use crate::Update;
1919

20-
impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
20+
impl<D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
2121
/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
2222
///
2323
/// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo.
@@ -238,7 +238,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
238238
/// The entries in request that are matches local ones does not need to be append again.
239239
/// Filter them out.
240240
pub async fn skip_matching_entries<'s, 'e>(
241-
&'s self,
241+
&'s mut self,
242242
entries: &'e [Entry<D>],
243243
) -> Result<(usize, &'e [Entry<D>]), StorageError> {
244244
let l = entries.len();
@@ -271,7 +271,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
271271
///
272272
/// This way to check if the entries in append-entries request is consecutive with local logs.
273273
/// Raft only accept consecutive logs to be appended.
274-
pub async fn does_log_id_match(&self, remote_log_id: Option<LogId>) -> Result<Option<LogId>, StorageError> {
274+
pub async fn does_log_id_match(&mut self, remote_log_id: Option<LogId>) -> Result<Option<LogId>, StorageError> {
275275
let log_id = match remote_log_id {
276276
None => {
277277
return Ok(None);
@@ -368,12 +368,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
368368

369369
let entries_refs: Vec<_> = entries.iter().collect();
370370

371-
apply_to_state_machine(self.storage.clone(), &entries_refs, self.config.max_applied_log_to_keep).await?;
371+
apply_to_state_machine(&mut self.storage, &entries_refs, self.config.max_applied_log_to_keep).await?;
372372

373373
self.last_applied = Some(last_log_id);
374374

375375
self.report_metrics(Update::AsIs);
376-
self.trigger_log_compaction_if_needed(false);
376+
self.trigger_log_compaction_if_needed(false).await;
377377

378378
Ok(())
379379
}

0 commit comments

Comments
 (0)