Skip to content

Commit 8e0b0df

Browse files
committed
feature: report snapshot metrics to RaftMetrics::snapshot, which is a LogId: (term, index) that a snapshot includes
- Add: `Wait.snapshot()` to watch snapshot changes. - Test: replace `sleep()` with `wait_for_snapshot()` to speed up tests.
1 parent 5eb9d3a commit 8e0b0df

File tree

8 files changed

+110
-22
lines changed

8 files changed

+110
-22
lines changed

async-raft/src/core/install_snapshot.rs

+1
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
186186
self.last_log_term = req.last_included.term;
187187
self.last_applied = req.last_included.index;
188188
self.snapshot_last_included = req.last_included;
189+
self.report_metrics(Update::Ignore);
189190
Ok(())
190191
}
191192
}

async-raft/src/core/mod.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
213213
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
214214
{
215215
self.snapshot_last_included = snapshot.included;
216+
self.report_metrics(Update::Ignore);
216217
}
217218

218219
let has_log = self.last_log_index != u64::min_value();
@@ -287,6 +288,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
287288
last_applied: self.last_applied,
288289
current_leader: self.current_leader,
289290
membership_config: self.membership.clone(),
291+
snapshot: self.snapshot_last_included,
290292
leader_metrics,
291293
});
292294

@@ -409,11 +411,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
409411
#[tracing::instrument(level = "trace", skip(self))]
410412
fn update_snapshot_state(&mut self, update: SnapshotUpdate) {
411413
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
412-
self.snapshot_last_included = log_id
414+
self.snapshot_last_included = log_id;
415+
self.report_metrics(Update::Ignore);
413416
}
414417
// If snapshot state is anything other than streaming, then drop it.
415418
if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() {
416-
self.snapshot_state = Some(state)
419+
self.snapshot_state = Some(state);
417420
}
418421
}
419422

async-raft/src/metrics.rs

+16
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio::time::Duration;
1818

1919
use crate::core::State;
2020
use crate::raft::MembershipConfig;
21+
use crate::LogId;
2122
use crate::NodeId;
2223
use crate::RaftError;
2324
use crate::ReplicationMetrics;
@@ -40,6 +41,10 @@ pub struct RaftMetrics {
4041
/// The current membership config of the cluster.
4142
pub membership_config: MembershipConfig,
4243

44+
/// The id of the last log included in snapshot.
45+
/// If there is no snapshot, it is (0,0).
46+
pub snapshot: LogId,
47+
4348
/// The metrics about the leader. It is Some() only when this node is leader.
4449
pub leader_metrics: Option<LeaderMetrics>,
4550
}
@@ -62,6 +67,7 @@ impl RaftMetrics {
6267
last_applied: 0,
6368
current_leader: None,
6469
membership_config,
70+
snapshot: LogId { term: 0, index: 0 },
6571
leader_metrics: None,
6672
}
6773
}
@@ -190,4 +196,14 @@ impl Wait {
190196
)
191197
.await
192198
}
199+
200+
/// Wait for `snapshot` to become `want_snapshot` or timeout.
201+
#[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))]
202+
pub async fn snapshot(&self, want_snapshot: LogId, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
203+
self.metrics(
204+
|x| x.snapshot == want_snapshot,
205+
&format!("{} .snapshot -> {:?}", msg.to_string(), want_snapshot),
206+
)
207+
.await
208+
}
193209
}

async-raft/src/metrics_wait_test.rs

+44
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tokio::time::sleep;
77
use crate::metrics::Wait;
88
use crate::metrics::WaitError;
99
use crate::raft::MembershipConfig;
10+
use crate::LogId;
1011
use crate::RaftMetrics;
1112
use crate::State;
1213

@@ -99,6 +100,48 @@ async fn test_wait() -> anyhow::Result<()> {
99100
assert_eq!(Some(hashset![1, 2]), got.membership_config.members_after_consensus);
100101
}
101102

103+
tracing::info!("--- wait for snapshot, Ok");
104+
{
105+
let (init, w, tx) = init_wait_test();
106+
107+
let h = tokio::spawn(async move {
108+
sleep(Duration::from_millis(10)).await;
109+
let mut update = init.clone();
110+
update.snapshot = LogId { term: 1, index: 2 };
111+
let rst = tx.send(update);
112+
assert!(rst.is_ok());
113+
});
114+
let got = w.snapshot(LogId { term: 1, index: 2 }, "snapshot").await?;
115+
h.await?;
116+
117+
assert_eq!(LogId { term: 1, index: 2 }, got.snapshot);
118+
}
119+
120+
tracing::info!("--- wait for snapshot, only index matches");
121+
{
122+
let (init, w, tx) = init_wait_test();
123+
124+
let h = tokio::spawn(async move {
125+
sleep(Duration::from_millis(10)).await;
126+
let mut update = init.clone();
127+
update.snapshot = LogId { term: 3, index: 2 };
128+
let rst = tx.send(update);
129+
assert!(rst.is_ok());
130+
// delay otherwise the channel will be closed thus the error is shutdown.
131+
sleep(Duration::from_millis(200)).await;
132+
});
133+
let got = w.snapshot(LogId { term: 1, index: 2 }, "snapshot").await;
134+
h.await?;
135+
match got.unwrap_err() {
136+
WaitError::Timeout(t, _) => {
137+
assert_eq!(Duration::from_millis(100), t);
138+
}
139+
_ => {
140+
panic!("expect WaitError::Timeout");
141+
}
142+
}
143+
}
144+
102145
{
103146
// timeout
104147
let (_init, w, _tx) = init_wait_test();
@@ -136,6 +179,7 @@ fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender<RaftMetrics>) {
136179
members: Default::default(),
137180
members_after_consensus: None,
138181
},
182+
snapshot: LogId { term: 0, index: 0 },
139183
leader_metrics: None,
140184
};
141185
let (tx, rx) = watch::channel(init.clone());

async-raft/tests/compaction.rs

+11-13
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
mod fixtures;
22

33
use std::sync::Arc;
4-
use std::time::Duration;
54

65
use anyhow::Result;
76
use async_raft::raft::MembershipConfig;
87
use async_raft::Config;
8+
use async_raft::LogId;
99
use async_raft::SnapshotPolicy;
1010
use async_raft::State;
1111
use fixtures::RaftRouter;
1212
use maplit::hashset;
13-
use tokio::time::sleep;
1413

1514
/// Compaction test.
1615
///
@@ -25,10 +24,12 @@ use tokio::time::sleep;
2524
async fn compaction() -> Result<()> {
2625
fixtures::init_tracing();
2726

27+
let snapshot_threshold: u64 = 50;
28+
2829
// Setup test dependencies.
2930
let config = Arc::new(
3031
Config::build("test".into())
31-
.snapshot_policy(SnapshotPolicy::LogsSinceLast(500))
32+
.snapshot_policy(SnapshotPolicy::LogsSinceLast(snapshot_threshold))
3233
.validate()
3334
.expect("failed to build Raft config"),
3435
);
@@ -53,22 +54,19 @@ async fn compaction() -> Result<()> {
5354

5455
// Send enough requests to the cluster that compaction on the node should be triggered.
5556
// Puts us exactly at the configured snapshot policy threshold.
56-
router.client_request_many(0, "0", 499).await;
57-
want += 499;
57+
router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await;
58+
want = snapshot_threshold;
5859

5960
router.wait_for_log(&hashset![0], want, None, "write").await?;
6061
router.assert_stable_cluster(Some(1), Some(want)).await;
61-
62-
// TODO: add snapshot info into metrics.
63-
// Then watch metrics instead of waiting.
64-
sleep(Duration::from_secs(10)).await;
62+
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
6563
router
6664
.assert_storage_state(
6765
1,
68-
500,
66+
want,
6967
Some(0),
70-
500,
71-
Some((500.into(), 1, MembershipConfig {
68+
want,
69+
Some((want.into(), 1, MembershipConfig {
7270
members: hashset![0],
7371
members_after_consensus: None,
7472
})),
@@ -85,7 +83,7 @@ async fn compaction() -> Result<()> {
8583
want += 1;
8684

8785
router.wait_for_log(&hashset![0, 1], want, None, "add follower").await?;
88-
let expected_snap = Some((500.into(), 1, MembershipConfig {
86+
let expected_snap = Some((snapshot_threshold.into(), 1, MembershipConfig {
8987
members: hashset![0u64],
9088
members_after_consensus: None,
9189
}));

async-raft/tests/fixtures/mod.rs

+16
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use async_raft::raft::VoteRequest;
2525
use async_raft::raft::VoteResponse;
2626
use async_raft::storage::RaftStorage;
2727
use async_raft::Config;
28+
use async_raft::LogId;
2829
use async_raft::NodeId;
2930
use async_raft::Raft;
3031
use async_raft::RaftMetrics;
@@ -232,6 +233,21 @@ impl RaftRouter {
232233
Ok(())
233234
}
234235

236+
/// Wait for specified nodes until their snapshot becomes `want`.
237+
#[tracing::instrument(level = "info", skip(self))]
238+
pub async fn wait_for_snapshot(
239+
&self,
240+
node_ids: &HashSet<u64>,
241+
want: LogId,
242+
timeout: Option<Duration>,
243+
msg: &str,
244+
) -> Result<()> {
245+
for i in node_ids.iter() {
246+
self.wait(i, timeout).await?.snapshot(want, msg).await?;
247+
}
248+
Ok(())
249+
}
250+
235251
/// Get the ID of the current leader.
236252
pub async fn leader(&self) -> Option<NodeId> {
237253
let isolated = self.isolated_nodes.read().await;

async-raft/tests/snapshot_ge_half_threshold.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
mod fixtures;
22

33
use std::sync::Arc;
4-
use std::time::Duration;
54

65
use anyhow::Result;
76
use async_raft::raft::MembershipConfig;
87
use async_raft::Config;
8+
use async_raft::LogId;
99
use async_raft::SnapshotPolicy;
1010
use async_raft::State;
1111
use fixtures::RaftRouter;
1212
use maplit::hashset;
13-
use tokio::time::sleep;
1413

1514
/// A leader should create and send snapshot when snapshot is old and is not that old to trigger a snapshot, i.e.:
1615
/// `threshold/2 < leader.last_log_index - snapshot.applied_index < threshold`
@@ -63,9 +62,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
6362
router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?;
6463
router.assert_stable_cluster(Some(1), Some(want)).await;
6564

66-
// TODO: add snapshot info into metrics.
67-
// Then watch metrics instead of waiting.
68-
sleep(Duration::from_secs(5)).await;
65+
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
6966
router
7067
.assert_storage_state(
7168
1,
@@ -96,6 +93,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
9693
members: hashset![0u64],
9794
members_after_consensus: None,
9895
}));
96+
router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?;
9997
router.assert_storage_state(1, want, None /* non-voter does not vote */, want, expected_snap).await;
10098
}
10199

memstore/src/lib.rs

+14-2
Original file line numberDiff line numberDiff line change
@@ -377,9 +377,21 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
377377
{ snapshot_size = snapshot.get_ref().len() },
378378
"decoding snapshot for installation"
379379
);
380-
let raw = serde_json::to_string_pretty(snapshot.get_ref().as_slice())?;
381-
println!("JSON SNAP:\n{}", raw);
380+
381+
{
382+
let t = snapshot.get_ref().as_slice();
383+
let y = std::str::from_utf8(t).unwrap();
384+
tracing::debug!("JSON SNAP:\n{}", y);
385+
}
386+
382387
let new_snapshot: MemStoreSnapshot = serde_json::from_slice(snapshot.get_ref().as_slice())?;
388+
389+
{
390+
let t = &new_snapshot.data;
391+
let y = std::str::from_utf8(t).unwrap();
392+
tracing::debug!("JSON SNAP DATA:\n{}", y);
393+
}
394+
383395
// Update log.
384396
{
385397
// Go backwards through the log to find the most recent membership config <= the `through` index.

0 commit comments

Comments
 (0)