Skip to content

Commit 0c870cc

Browse files
committed
change: reduce one unnecessary snapshot serialization
- Change: `get_current_snapshot()`: remove double-serialization: convert MemStoreSnapshot to CurrentSnapshotData instead of serializing MemStoreSnapshot: Before: ``` MemStoreSnapshot.data = serialize(state-machine) CurrentSnapshotData.data = serialize(MemStoreSnapshot) ``` After: ``` MemStoreSnapshot.data = serialize(state-machine) CurrentSnapshotData.data = MemStoreSnapshot.data ``` when `finalize_snapshot_installation`, extract snapshot meta info from `InstallSnapshotRequest`. Reduce one unnecessary deserialization. - Change: InstallSnapshotRequest: merge `snapshot_id`, `last_log_id`, `membership` into one field `meta`. - Refactor: use SnapshotMeta(`snapshot_id`, `last_log_id`, `membership`) as a container of metadata of a snapshot. Reduce parameters. - Refactor: remove redundent param `delete_through` from `finalize_snapshot_installation`.
1 parent dba2403 commit 0c870cc

File tree

6 files changed

+81
-116
lines changed

6 files changed

+81
-116
lines changed

async-raft/src/core/install_snapshot.rs

+15-24
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
7474
return self.begin_installing_snapshot(req).await;
7575
}
7676
Some(SnapshotState::Streaming { snapshot, id, offset }) => {
77-
if req.snapshot_id == id {
78-
return self.continue_installing_snapshot(req, offset, id, snapshot).await;
77+
if req.meta.snapshot_id == id {
78+
return self.continue_installing_snapshot(req, offset, snapshot).await;
7979
}
8080

8181
if req.offset == 0 {
@@ -85,7 +85,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
8585
Err(RaftError::SnapshotMismatch {
8686
expect: SnapshotSegmentId { id: id.clone(), offset },
8787
got: SnapshotSegmentId {
88-
id: req.snapshot_id.clone(),
88+
id: req.meta.snapshot_id.clone(),
8989
offset: req.offset,
9090
},
9191
})
@@ -96,19 +96,19 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
9696
#[tracing::instrument(level = "trace", skip(self, req))]
9797
async fn begin_installing_snapshot(&mut self, req: InstallSnapshotRequest) -> RaftResult<InstallSnapshotResponse> {
9898
// Create a new snapshot and begin writing its contents.
99-
let id = req.snapshot_id.clone();
99+
let id = req.meta.snapshot_id.clone();
100100
let mut snapshot = self.storage.create_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?;
101101
snapshot.as_mut().write_all(&req.data).await?;
102102

103103
// If this was a small snapshot, and it is already done, then finish up.
104104
if req.done {
105-
self.finalize_snapshot_installation(req, id, snapshot).await?;
105+
self.finalize_snapshot_installation(req, snapshot).await?;
106106
return Ok(InstallSnapshotResponse {
107107
term: self.current_term,
108108
});
109109
}
110110

111-
// Else, retain snapshot components for later segments & respod.
111+
// Else, retain snapshot components for later segments & respond.
112112
self.snapshot_state = Some(SnapshotState::Streaming {
113113
offset: req.data.len() as u64,
114114
id,
@@ -124,9 +124,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
124124
&mut self,
125125
req: InstallSnapshotRequest,
126126
mut offset: u64,
127-
id: String,
128127
mut snapshot: Box<S::Snapshot>,
129128
) -> RaftResult<InstallSnapshotResponse> {
129+
let id = req.meta.snapshot_id.clone();
130+
130131
// Always seek to the target offset if not an exact match.
131132
if req.offset != offset {
132133
if let Err(err) = snapshot.as_mut().seek(SeekFrom::Start(req.offset)).await {
@@ -145,7 +146,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
145146

146147
// If the snapshot stream is done, then finalize.
147148
if req.done {
148-
self.finalize_snapshot_installation(req, id, snapshot).await?;
149+
self.finalize_snapshot_installation(req, snapshot).await?;
149150
} else {
150151
self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot });
151152
}
@@ -161,30 +162,20 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
161162
async fn finalize_snapshot_installation(
162163
&mut self,
163164
req: InstallSnapshotRequest,
164-
id: String,
165165
mut snapshot: Box<S::Snapshot>,
166166
) -> RaftResult<()> {
167167
snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?;
168-
let delete_through = if self.last_log_id.index > req.last_log_id.index {
169-
Some(req.last_log_id.index)
170-
} else {
171-
None
172-
};
168+
173169
self.storage
174-
.finalize_snapshot_installation(
175-
req.last_log_id.index,
176-
req.last_log_id.term,
177-
delete_through,
178-
id,
179-
snapshot,
180-
)
170+
.finalize_snapshot_installation(&req.meta, snapshot)
181171
.await
182172
.map_err(|err| self.map_fatal_storage_error(err))?;
173+
183174
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
184175
self.update_membership(membership)?;
185-
self.last_log_id = req.last_log_id;
186-
self.last_applied = req.last_log_id.index;
187-
self.snapshot_last_log_id = req.last_log_id;
176+
self.last_log_id = req.meta.last_log_id;
177+
self.last_applied = req.meta.last_log_id.index;
178+
self.snapshot_last_log_id = req.meta.last_log_id;
188179
self.report_metrics(Update::Ignore);
189180
Ok(())
190181
}

async-raft/src/raft.rs

+13-20
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ use crate::error::RaftError;
2222
use crate::error::RaftResult;
2323
use crate::metrics::RaftMetrics;
2424
use crate::metrics::Wait;
25-
use crate::raft_types::SnapshotId;
2625
use crate::AppData;
2726
use crate::AppDataResponse;
2827
use crate::LogId;
2928
use crate::NodeId;
3029
use crate::RaftNetwork;
3130
use crate::RaftStorage;
31+
use crate::SnapshotMeta;
3232

3333
struct RaftInner<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
3434
tx_api: mpsc::UnboundedSender<RaftMsg<D, R>>,
@@ -422,21 +422,14 @@ pub struct Entry<D: AppData> {
422422
}
423423

424424
impl<D: AppData> Entry<D> {
425-
/// Create a new snapshot pointer from the given data.
426-
///
427-
/// ### index & term
428-
/// The index and term of the entry being replaced by this snapshot pointer entry.
429-
///
430-
/// ### id
431-
/// The ID of the associated snapshot.
432-
///
433-
/// ### membership
434-
/// The cluster membership config which is contained in the snapshot, which will always be the
435-
/// latest membership covered by the snapshot.
436-
pub fn new_snapshot_pointer(index: u64, term: u64, id: String, membership: MembershipConfig) -> Self {
425+
/// Create a new snapshot pointer from the given snapshot meta.
426+
pub fn new_snapshot_pointer(meta: &SnapshotMeta) -> Self {
437427
Entry {
438-
log_id: LogId { term, index },
439-
payload: EntryPayload::SnapshotPointer(EntrySnapshotPointer { id, membership }),
428+
log_id: meta.last_log_id,
429+
payload: EntryPayload::SnapshotPointer(EntrySnapshotPointer {
430+
id: meta.snapshot_id.clone(),
431+
membership: meta.membership.clone(),
432+
}),
440433
}
441434
}
442435
}
@@ -581,15 +574,15 @@ pub struct InstallSnapshotRequest {
581574
pub term: u64,
582575
/// The leader's ID. Useful in redirecting clients.
583576
pub leader_id: u64,
584-
/// The Id of a snapshot.
585-
/// Every two snapshots should have different snapshot id.
586-
pub snapshot_id: SnapshotId,
587-
/// The snapshot replaces all log entries up through and including this log.
588-
pub last_log_id: LogId,
577+
578+
/// Metadata of a snapshot: snapshot_id, last_log_ed membership etc.
579+
pub meta: SnapshotMeta,
580+
589581
/// The byte offset where this chunk of data is positioned in the snapshot file.
590582
pub offset: u64,
591583
/// The raw bytes of the snapshot chunk, starting at `offset`.
592584
pub data: Vec<u8>,
585+
593586
/// Will be `true` if this is the last chunk in the snapshot.
594587
pub done: bool,
595588
}

async-raft/src/replication/mod.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
829829

830830
#[tracing::instrument(level = "trace", skip(self, snapshot))]
831831
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
832-
let snapshot_id = snapshot.meta.snapshot_id.clone();
833832
let mut offset = 0;
834833
self.core.next_index = snapshot.meta.last_log_id.index + 1;
835834
self.core.matched = snapshot.meta.last_log_id;
@@ -842,8 +841,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
842841
let req = InstallSnapshotRequest {
843842
term: self.core.term,
844843
leader_id: self.core.id,
845-
snapshot_id: snapshot_id.clone(),
846-
last_log_id: snapshot.meta.last_log_id,
844+
meta: snapshot.meta.clone(),
847845
offset,
848846
data: Vec::from(&buf[..nread]),
849847
done,

async-raft/src/storage.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,9 @@ where
219219

220220
/// Finalize the installation of a snapshot which has finished streaming from the cluster leader.
221221
///
222-
/// Delete all entries in the log through `delete_through`, unless `None`, in which case
223-
/// all entries of the log are to be deleted.
222+
/// Delete all entries in the log through `meta.last_log_id.index`.
224223
///
225-
/// Write a new snapshot pointer to the log at the given `index`. The snapshot pointer should be
224+
/// Write a new snapshot pointer to the log at the given `meta.last_log_id.index`. The snapshot pointer should be
226225
/// constructed via the `Entry::new_snapshot_pointer` constructor and the other parameters
227226
/// provided to this method.
228227
///
@@ -235,14 +234,7 @@ where
235234
/// made to the snapshot.
236235
///
237236
/// Errors returned from this method will cause Raft to go into shutdown.
238-
async fn finalize_snapshot_installation(
239-
&self,
240-
index: u64,
241-
term: u64,
242-
delete_through: Option<u64>,
243-
id: String,
244-
snapshot: Box<Self::Snapshot>,
245-
) -> Result<()>;
237+
async fn finalize_snapshot_installation(&self, meta: &SnapshotMeta, snapshot: Box<Self::Snapshot>) -> Result<()>;
246238

247239
/// Get a readable handle to the current snapshot, along with its metadata.
248240
///

async-raft/tests/api_install_snapshot.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use anyhow::Result;
66
use async_raft::raft::InstallSnapshotRequest;
77
use async_raft::Config;
88
use async_raft::LogId;
9+
use async_raft::SnapshotMeta;
910
use async_raft::State;
1011
use fixtures::RaftRouter;
1112
use maplit::hashset;
@@ -46,8 +47,11 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
4647
let req0 = InstallSnapshotRequest {
4748
term: 1,
4849
leader_id: 0,
49-
snapshot_id: "ss1".into(),
50-
last_log_id: LogId { term: 1, index: 0 },
50+
meta: SnapshotMeta {
51+
snapshot_id: "ss1".into(),
52+
last_log_id: LogId { term: 1, index: 0 },
53+
membership: Default::default(),
54+
},
5155
offset: 0,
5256
data: vec![1, 2, 3],
5357
done: false,
@@ -62,7 +66,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
6266
{
6367
let mut req = req0.clone();
6468
req.offset = 3;
65-
req.snapshot_id = "ss2".into();
69+
req.meta.snapshot_id = "ss2".into();
6670
let res = n.0.install_snapshot(req).await;
6771
assert_eq!("expect: ss1+3, got: ss2+3", res.unwrap_err().to_string());
6872
}
@@ -71,20 +75,20 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
7175
{
7276
let mut req = req0.clone();
7377
req.offset = 0;
74-
req.snapshot_id = "ss2".into();
78+
req.meta.snapshot_id = "ss2".into();
7579
n.0.install_snapshot(req).await?;
7680

7781
let mut req = req0.clone();
7882
req.offset = 3;
79-
req.snapshot_id = "ss2".into();
83+
req.meta.snapshot_id = "ss2".into();
8084
n.0.install_snapshot(req).await?;
8185
}
8286

8387
tracing::info!("-- continue write with mismatched offset is allowed");
8488
{
8589
let mut req = req0.clone();
8690
req.offset = 8;
87-
req.snapshot_id = "ss2".into();
91+
req.meta.snapshot_id = "ss2".into();
8892
n.0.install_snapshot(req).await?;
8993
}
9094
Ok(())

0 commit comments

Comments
 (0)