Skip to content

Commit 70e3318

Browse files
committed
Change: SnapshotMeta.last_log_id from LogId to Option of LogId
`SnapshotMeta.last_log_id` should be the same type as `StateMachine.last_applied`. By making `SnapshotMeta.last_log_id` an Option of LogId, a snapshot can be build on an empty state-machine(in which `last_applied` is None).
1 parent 56ded06 commit 70e3318

File tree

13 files changed

+78
-91
lines changed

13 files changed

+78
-91
lines changed

examples/raft-kv-memstore/src/store/mod.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -147,23 +147,17 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
147147
last_membership = state_machine.last_membership.clone();
148148
}
149149

150-
let last_applied_log = match last_applied_log {
151-
None => {
152-
panic!("can not compact empty state machine");
153-
}
154-
Some(x) => x,
155-
};
156-
157150
let snapshot_idx = {
158151
let mut l = self.snapshot_idx.lock().unwrap();
159152
*l += 1;
160153
*l
161154
};
162155

163-
let snapshot_id = format!(
164-
"{}-{}-{}",
165-
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
166-
);
156+
let snapshot_id = if let Some(last) = last_applied_log {
157+
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
158+
} else {
159+
format!("--{}", snapshot_idx)
160+
};
167161

168162
let meta = SnapshotMeta {
169163
last_log_id: last_applied_log,

examples/raft-kv-rocksdb/src/store.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -380,21 +380,15 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
380380
last_membership = state_machine.last_membership.clone();
381381
}
382382

383-
let last_applied_log = match last_applied_log {
384-
None => {
385-
panic!("can not compact empty state machine");
386-
}
387-
Some(x) => x,
388-
};
389-
390383
// TODO: we probably want thius to be atomic.
391384
let snapshot_idx: u64 = self.get_snapshot_index_()? + 1;
392385
self.set_snapshot_indesx_(snapshot_idx)?;
393386

394-
let snapshot_id = format!(
395-
"{}-{}-{}",
396-
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
397-
);
387+
let snapshot_id = if let Some(last) = last_applied_log {
388+
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
389+
} else {
390+
format!("--{}", snapshot_idx)
391+
};
398392

399393
let meta = SnapshotMeta {
400394
last_log_id: last_applied_log,

memstore/src/lib.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,6 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {
202202
last_membership = sm.last_membership.clone();
203203
}
204204

205-
let last_applied_log = match last_applied_log {
206-
None => {
207-
panic!("can not compact empty state machine");
208-
}
209-
Some(x) => x,
210-
};
211-
212205
let snapshot_size = data.len();
213206

214207
let snapshot_idx = {
@@ -217,10 +210,11 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {
217210
*l
218211
};
219212

220-
let snapshot_id = format!(
221-
"{}-{}-{}",
222-
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
223-
);
213+
let snapshot_id = if let Some(last) = last_applied_log {
214+
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
215+
} else {
216+
format!("--{}", snapshot_idx)
217+
};
224218

225219
let meta = SnapshotMeta {
226220
last_log_id: last_applied_log,

openraft/src/core/install_snapshot.rs

+22-18
Original file line numberDiff line numberDiff line change
@@ -228,19 +228,21 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
228228
// Unlike normal append-entries RPC, if conflicting logs are found, it is not **necessary** to delete them.
229229
// See: [Snapshot-replication](https://datafuselabs.github.io/openraft/replication.html#snapshot-replication)
230230
{
231-
let local = self.storage.try_get_log_entry(snap_last_log_id.index).await?;
232-
233-
if let Some(local_log) = local {
234-
if local_log.log_id != snap_last_log_id {
235-
tracing::info!(
236-
local_log_id = display(&local_log.log_id),
237-
snap_last_log_id = display(&snap_last_log_id),
238-
"found conflict log id, when installing snapshot"
239-
);
231+
if let Some(last) = snap_last_log_id {
232+
let local = self.storage.try_get_log_entry(last.index).await?;
233+
234+
if let Some(local_log) = local {
235+
if local_log.log_id != last {
236+
tracing::info!(
237+
local_log_id = display(&local_log.log_id),
238+
snap_last_log_id = display(&last),
239+
"found conflict log id, when installing snapshot"
240+
);
241+
}
242+
243+
self.engine.truncate_logs(last.index);
244+
self.run_engine_commands::<Entry<C>>(&[]).await?;
240245
}
241-
242-
self.engine.truncate_logs(snap_last_log_id.index);
243-
self.run_engine_commands::<Entry<C>>(&[]).await?;
244246
}
245247
}
246248

@@ -251,17 +253,19 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
251253

252254
let last_applied = changes.last_applied;
253255

254-
if st.committed < Some(last_applied) {
255-
st.committed = Some(last_applied);
256+
if st.committed < last_applied {
257+
st.committed = last_applied;
256258
}
257259

258-
debug_assert!(st.last_purged_log_id() <= Some(last_applied));
260+
debug_assert!(st.last_purged_log_id() <= last_applied);
259261

260262
// A local log that is <= last_applied may be inconsistent with the leader.
261263
// It has to purge all of them to prevent these log form being replicated, when this node becomes leader.
262-
self.engine.snapshot_last_log_id = Some(last_applied); // update and make last applied log removable
263-
self.engine.purge_log(last_applied);
264-
self.run_engine_commands::<Entry<C>>(&[]).await?;
264+
self.engine.snapshot_last_log_id = last_applied; // update and make last applied log removable
265+
if let Some(last) = last_applied {
266+
self.engine.purge_log(last);
267+
self.run_engine_commands::<Entry<C>>(&[]).await?;
268+
}
265269

266270
self.engine.update_committed_membership(req.meta.last_membership);
267271
self.run_engine_commands::<Entry<C>>(&[]).await?;

openraft/src/core/raft_core.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<
148148
pub(crate) leader_data: Option<LeaderData<C>>,
149149

150150
/// The node's current snapshot state.
151-
pub(crate) snapshot_state: Option<SnapshotState<S::SnapshotData>>,
151+
pub(crate) snapshot_state: Option<SnapshotState<C, S::SnapshotData>>,
152152

153153
/// The time to elect if a follower does not receive any append-entry message.
154154
pub(crate) next_election_time: VoteWiseTime<C::NodeId>,
@@ -252,7 +252,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
252252

253253
// Fetch the most recent snapshot in the system.
254254
if let Some(snapshot) = self.storage.get_current_snapshot().await? {
255-
self.engine.snapshot_last_log_id = Some(snapshot.meta.last_log_id);
255+
self.engine.snapshot_last_log_id = snapshot.meta.last_log_id;
256256
self.engine.metrics_flags.set_data_changed();
257257
}
258258

@@ -836,7 +836,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
836836
#[tracing::instrument(level = "trace", skip(self))]
837837
pub(crate) fn update_snapshot_state(&mut self, update: SnapshotUpdate<C::NodeId>) {
838838
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
839-
self.engine.snapshot_last_log_id = Some(log_id);
839+
self.engine.snapshot_last_log_id = log_id;
840840
self.engine.metrics_flags.set_data_changed();
841841
}
842842
// If snapshot state is anything other than streaming, then drop it.
@@ -894,7 +894,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
894894
update: SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id),
895895
});
896896
// This will always succeed.
897-
let _ = chan_tx.send(snapshot.meta.last_log_id.index);
897+
let _ = chan_tx.send(snapshot.meta.last_log_id);
898898
}
899899
Err(err) => {
900900
tracing::error!({error=%err}, "error while generating snapshot");
@@ -1514,16 +1514,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
15141514
let current_snapshot_opt = self.storage.get_current_snapshot().await?;
15151515

15161516
if let Some(snapshot) = current_snapshot_opt {
1517-
if let Some(must_inc) = must_include {
1518-
if snapshot.meta.last_log_id >= must_inc {
1517+
if must_include.is_some() {
1518+
if snapshot.meta.last_log_id >= must_include {
15191519
let _ = tx.send(snapshot);
15201520
return Ok(());
15211521
}
15221522
} else {
15231523
// If snapshot exists, ensure its distance from the leader's last log index is <= half
15241524
// of the configured snapshot threshold, else create a new snapshot.
15251525
if snapshot_is_within_half_of_threshold(
1526-
&snapshot.meta.last_log_id.index,
1526+
&snapshot.meta.last_log_id.unwrap_or_default().index,
15271527
&self.engine.state.last_log_id().unwrap_or_default().index,
15281528
&threshold,
15291529
) {

openraft/src/core/snapshot_state.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ use tokio::sync::broadcast;
33

44
use crate::LogId;
55
use crate::NodeId;
6+
use crate::RaftTypeConfig;
67

78
/// The current snapshot state of the Raft node.
8-
pub(crate) enum SnapshotState<S> {
9+
pub(crate) enum SnapshotState<C: RaftTypeConfig, SD> {
910
/// The Raft node is compacting itself.
1011
Snapshotting {
1112
/// A handle to abort the compaction process early if needed.
1213
handle: AbortHandle,
1314
/// A sender for notifying any other tasks of the completion of this compaction.
14-
sender: broadcast::Sender<u64>,
15+
sender: broadcast::Sender<Option<LogId<C::NodeId>>>,
1516
},
1617
/// The Raft node is streaming in a snapshot from the leader.
1718
Streaming {
@@ -20,15 +21,15 @@ pub(crate) enum SnapshotState<S> {
2021
/// The ID of the snapshot being written.
2122
id: String,
2223
/// A handle to the snapshot writer.
23-
snapshot: Box<S>,
24+
snapshot: Box<SD>,
2425
},
2526
}
2627

2728
/// An update on a snapshot creation process.
2829
#[derive(Debug, Clone)]
2930
pub(crate) enum SnapshotUpdate<NID: NodeId> {
3031
/// Snapshot creation has finished successfully and covers the given index.
31-
SnapshotComplete(LogId<NID>),
32+
SnapshotComplete(Option<LogId<NID>>),
3233

3334
/// Snapshot creation failed.
3435
SnapshotFailed,

openraft/src/raft_types.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,6 @@ impl MetricsChangeFlags {
202202
/// E.g. when applying a log to state machine, or installing a state machine from snapshot.
203203
#[derive(Debug, Clone, PartialEq, Eq)]
204204
pub struct StateMachineChanges<C: RaftTypeConfig> {
205-
pub last_applied: LogId<C::NodeId>,
205+
pub last_applied: Option<LogId<C::NodeId>>,
206206
pub is_snapshot: bool,
207207
}

openraft/src/replication/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -779,12 +779,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
779779
// If we just sent the final chunk of the snapshot, then transition to lagging state.
780780
if done {
781781
tracing::debug!(
782-
"done install snapshot: snapshot last_log_id: {}, matched: {:?}",
782+
"done install snapshot: snapshot last_log_id: {:?}, matched: {:?}",
783783
snapshot.meta.last_log_id,
784784
self.matched,
785785
);
786786

787-
self.update_matched(Some(snapshot.meta.last_log_id));
787+
self.update_matched(snapshot.meta.last_log_id);
788788

789789
return Ok(());
790790
}

openraft/src/storage/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ where
3232
N: Node,
3333
{
3434
/// Log entries upto which this snapshot includes, inclusive.
35-
pub last_log_id: LogId<NID>,
35+
pub last_log_id: Option<LogId<NID>>,
3636

3737
/// The last applied membership config.
3838
pub last_membership: EffectiveMembership<NID, N>,
@@ -49,7 +49,7 @@ where
4949
{
5050
pub fn signature(&self) -> SnapshotSignature<NID> {
5151
SnapshotSignature {
52-
last_log_id: Some(self.last_log_id),
52+
last_log_id: self.last_log_id,
5353
last_membership_log_id: self.last_membership.log_id,
5454
snapshot_id: self.snapshot_id.clone(),
5555
}

openraft/src/testing/suite.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,7 @@ where
10541054
let mut b = store.get_snapshot_builder().await;
10551055
let snap = b.build_snapshot().await?;
10561056
let meta = snap.meta;
1057-
assert_eq!(log_id(0, 0), meta.last_log_id);
1057+
assert_eq!(Some(log_id(0, 0)), meta.last_log_id);
10581058
assert_eq!(Some(log_id(0, 0)), meta.last_membership.log_id);
10591059
assert_eq!(
10601060
Membership::new(vec![btreeset! {1,2}], None),
@@ -1078,7 +1078,7 @@ where
10781078
let mut b = store.get_snapshot_builder().await;
10791079
let snap = b.build_snapshot().await?;
10801080
let meta = snap.meta;
1081-
assert_eq!(log_id(2, 2), meta.last_log_id);
1081+
assert_eq!(Some(log_id(2, 2)), meta.last_log_id);
10821082
assert_eq!(Some(log_id(2, 2)), meta.last_membership.log_id);
10831083
assert_eq!(
10841084
Membership::new(vec![btreeset! {3,4}], None),

openraft/tests/fixtures/mod.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -853,23 +853,29 @@ where
853853

854854
match index_test {
855855
ValueTest::Exact(index) => assert_eq!(
856-
&snap.meta.last_log_id.index, index,
857-
"expected node {} to have snapshot with index {}, got {}",
858-
id, index, snap.meta.last_log_id.index
856+
snap.meta.last_log_id.index(),
857+
Some(*index),
858+
"expected node {} to have snapshot with index {}, got {:?}",
859+
id,
860+
index,
861+
snap.meta.last_log_id
859862
),
860863
ValueTest::Range(range) => assert!(
861-
range.contains(&snap.meta.last_log_id.index),
862-
"expected node {} to have snapshot within range {:?}, got {}",
864+
range.contains(&snap.meta.last_log_id.index().unwrap_or_default()),
865+
"expected node {} to have snapshot within range {:?}, got {:?}",
863866
id,
864867
range,
865-
snap.meta.last_log_id.index
868+
snap.meta.last_log_id
866869
),
867870
}
868871

869872
assert_eq!(
870-
&snap.meta.last_log_id.leader_id.term, term,
871-
"expected node {} to have snapshot with term {}, got {}",
872-
id, term, snap.meta.last_log_id.leader_id.term
873+
&snap.meta.last_log_id.unwrap_or_default().leader_id.term,
874+
term,
875+
"expected node {} to have snapshot with term {}, got {:?}",
876+
id,
877+
term,
878+
snap.meta.last_log_id
873879
);
874880
}
875881

openraft/tests/snapshot/t20_api_install_snapshot.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ async fn snapshot_arguments() -> Result<()> {
5353
vote: Vote::new_committed(1, 0),
5454
meta: SnapshotMeta {
5555
snapshot_id: "ss1".into(),
56-
last_log_id: LogId {
56+
last_log_id: Some(LogId {
5757
leader_id: LeaderId::new(1, 0),
5858
index: 0,
59-
},
59+
}),
6060
last_membership: Default::default(),
6161
},
6262
offset: 0,

rocksstore/src/lib.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -378,21 +378,15 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<RocksStore> {
378378
last_membership = state_machine.last_membership;
379379
}
380380

381-
let last_applied_log = match last_applied_log {
382-
None => {
383-
panic!("can not compact empty state machine");
384-
}
385-
Some(x) => x,
386-
};
387-
388381
// TODO: we probably want thius to be atomic.
389382
let snapshot_idx: u64 = self.get_snapshot_index_()? + 1;
390383
self.set_snapshot_indesx_(snapshot_idx)?;
391384

392-
let snapshot_id = format!(
393-
"{}-{}-{}",
394-
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
395-
);
385+
let snapshot_id = if let Some(last) = last_applied_log {
386+
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
387+
} else {
388+
format!("--{}", snapshot_idx)
389+
};
396390

397391
let meta = SnapshotMeta {
398392
last_log_id: last_applied_log,

0 commit comments

Comments
 (0)