Skip to content

Commit 85859d0

Browse files
committed
change: CurrentSnapshotData: merge term and index into included.
1 parent 933e0b3 commit 85859d0

File tree

6 files changed

+19
-21
lines changed

6 files changed

+19
-21
lines changed

async-raft/src/core/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
211211
if let Some(snapshot) =
212212
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
213213
{
214-
self.snapshot_index = snapshot.index;
214+
self.snapshot_index = snapshot.included.index;
215215
}
216216

217217
let has_log = self.last_log_index != u64::min_value();
@@ -452,8 +452,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
452452
match res {
453453
Ok(res) => match res {
454454
Ok(snapshot) => {
455-
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.index));
456-
let _ = chan_tx.send(snapshot.index); // This will always succeed.
455+
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included.index));
456+
let _ = chan_tx.send(snapshot.included.index); // This will always succeed.
457457
}
458458
Err(err) => {
459459
tracing::error!({error=%err}, "error while generating snapshot");

async-raft/src/core/replication.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
288288
if let Some(snapshot) = current_snapshot_opt {
289289
// If snapshot exists, ensure its distance from the leader's last log index is <= half
290290
// of the configured snapshot threshold, else create a new snapshot.
291-
if snapshot_is_within_half_of_threshold(&snapshot.index, &self.core.last_log_index, &threshold) {
291+
if snapshot_is_within_half_of_threshold(&snapshot.included.index, &self.core.last_log_index, &threshold) {
292292
let _ = tx.send(snapshot);
293293
return Ok(());
294294
}

async-raft/src/replication/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -832,8 +832,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
832832
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
833833
let snapshot_id = snapshot.snapshot_id.clone();
834834
let mut offset = 0;
835-
self.core.next_index = snapshot.index + 1;
836-
self.core.matched = (snapshot.term, snapshot.index).into();
835+
self.core.next_index = snapshot.included.index + 1;
836+
self.core.matched = snapshot.included;
837837
let mut buf = Vec::with_capacity(self.core.config.snapshot_max_chunk_size as usize);
838838
loop {
839839
// Build the RPC.
@@ -844,7 +844,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
844844
term: self.core.term,
845845
leader_id: self.core.id,
846846
snapshot_id: snapshot_id.clone(),
847-
last_included: (snapshot.term, snapshot.index).into(),
847+
last_included: snapshot.included,
848848
offset,
849849
data: Vec::from(&buf[..nread]),
850850
done,

async-raft/src/storage.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@ use crate::raft::MembershipConfig;
1515
use crate::raft_types::SnapshotId;
1616
use crate::AppData;
1717
use crate::AppDataResponse;
18+
use crate::LogId;
1819
use crate::NodeId;
1920

2021
/// The data associated with the current snapshot.
2122
pub struct CurrentSnapshotData<S>
2223
where S: AsyncRead + AsyncSeek + Send + Unpin + 'static
2324
{
24-
/// The snapshot entry's term.
25-
pub term: u64,
26-
/// The snapshot entry's index.
27-
pub index: u64,
25+
// Log entries upto which this snapshot includes, inclusive.
26+
pub included: LogId,
27+
2828
/// The latest membership configuration covered by the snapshot.
2929
pub membership: MembershipConfig,
3030

async-raft/tests/fixtures/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -485,22 +485,22 @@ impl RaftRouter {
485485
.unwrap_or_else(|| panic!("no snapshot present for node {}", id));
486486
match index_test {
487487
ValueTest::Exact(index) => assert_eq!(
488-
&snap.index, index,
488+
&snap.included.index, index,
489489
"expected node {} to have snapshot with index {}, got {}",
490-
id, index, snap.index
490+
id, index, snap.included.index
491491
),
492492
ValueTest::Range(range) => assert!(
493-
range.contains(&snap.index),
493+
range.contains(&snap.included.index),
494494
"expected node {} to have snapshot within range {:?}, got {}",
495495
id,
496496
range,
497-
snap.index
497+
snap.included.index
498498
),
499499
}
500500
assert_eq!(
501-
&snap.term, term,
501+
&snap.included.term, term,
502502
"expected node {} to have snapshot with term {}, got {}",
503-
id, term, snap.term
503+
id, term, snap.included.term
504504
);
505505
assert_eq!(
506506
&snap.membership, cfg,

memstore/src/lib.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
352352

353353
tracing::trace!({ snapshot_size = snapshot_bytes.len() }, "log compaction complete");
354354
Ok(CurrentSnapshotData {
355-
term,
356-
index: last_applied_log,
355+
included: (term, last_applied_log).into(),
357356
membership: membership_config.clone(),
358357
snapshot_id,
359358
snapshot: Box::new(Cursor::new(snapshot_bytes)),
@@ -423,8 +422,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
423422
Some(snapshot) => {
424423
let reader = serde_json::to_vec(&snapshot)?;
425424
Ok(Some(CurrentSnapshotData {
426-
index: snapshot.index,
427-
term: snapshot.term,
425+
included: (snapshot.term, snapshot.index).into(),
428426
membership: snapshot.membership.clone(),
429427
snapshot_id: snapshot.snapshot_id.clone(),
430428
snapshot: Box::new(Cursor::new(reader)),

0 commit comments

Comments
 (0)