Skip to content

Commit 1c46a71

Browse files
committed
change: RaftStore::get_log_entries use range as arg; add try_get_log_entry() that does not return error even when defensive check is on
1 parent 07d71c6 commit 1c46a71

File tree

10 files changed

+176
-59
lines changed

10 files changed

+176
-59
lines changed

async-raft/src/core/append_entries.rs

+23-4
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,25 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
8787
//// Begin Log Consistency Check ////
8888
tracing::debug!("begin log consistency check");
8989

90+
if self.last_log_id.index < msg.prev_log_id.index {
91+
if report_metrics {
92+
self.report_metrics(Update::Ignore);
93+
}
94+
95+
return Ok(AppendEntriesResponse {
96+
term: self.current_term,
97+
success: false,
98+
conflict_opt: Some(ConflictOpt {
99+
log_id: self.last_log_id,
100+
}),
101+
});
102+
}
103+
90104
// Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its
91105
// result.
92106
let entries = self
93107
.storage
94-
.get_log_entries(msg.prev_log_id.index, msg.prev_log_id.index + 1)
108+
.get_log_entries(msg.prev_log_id.index..=msg.prev_log_id.index)
95109
.await
96110
.map_err(|err| self.map_fatal_storage_error(err))?;
97111
let target_entry = match entries.first() {
@@ -136,7 +150,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
136150
};
137151
let old_entries = self
138152
.storage
139-
.get_log_entries(start, msg.prev_log_id.index)
153+
.get_log_entries(start..msg.prev_log_id.index)
140154
.await
141155
.map_err(|err| self.map_fatal_storage_error(err))?;
142156
let opt = match old_entries.iter().find(|entry| entry.log_id.term == msg.prev_log_id.term) {
@@ -236,7 +250,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
236250
// TODO(xp): logs in storage must be consecutive.
237251
let entries = self
238252
.storage
239-
.get_log_entries(self.last_applied.index + 1, self.commit_index + 1)
253+
.get_log_entries(self.last_applied.index + 1..=self.commit_index)
240254
.await
241255
.map_err(|e| self.map_fatal_storage_error(e))?;
242256

@@ -290,11 +304,16 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
290304
return;
291305
}
292306

307+
assert!(start <= stop);
308+
if start == stop {
309+
return;
310+
}
311+
293312
// Fetch the series of entries which must be applied to the state machine, then apply them.
294313
let handle = tokio::spawn(
295314
async move {
296315
let mut new_last_applied: Option<LogId> = None;
297-
let entries = storage.get_log_entries(start, stop).await?;
316+
let entries = storage.get_log_entries(start..stop).await?;
298317
if let Some(entry) = entries.last() {
299318
new_last_applied = Some(entry.log_id);
300319
}

async-raft/src/core/client.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
7979
// Thus if a new leader sees only the first one, it needs to append the final config log to let
8080
// the change-membership operation to finish.
8181

82-
let last_logs = self
83-
.core
84-
.storage
85-
.get_log_entries(last_index, last_index + 1)
86-
.await
87-
.map_err(RaftError::RaftStorage)?;
82+
let last_logs =
83+
self.core.storage.get_log_entries(last_index..=last_index).await.map_err(RaftError::RaftStorage)?;
8884
let last_log = &last_logs[0];
8985

9086
let req = match last_log.payload {
@@ -418,7 +414,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
418414
let entries = self
419415
.core
420416
.storage
421-
.get_log_entries(expected_next_index, index)
417+
.get_log_entries(expected_next_index..index)
422418
.await
423419
.map_err(|err| self.core.map_fatal_storage_error(err))?;
424420

async-raft/src/replication/mod.rs

+17-16
Original file line numberDiff line numberDiff line change
@@ -362,16 +362,23 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
362362
}
363363

364364
// Fetch the entry at conflict index and use the term specified there.
365-
match self
366-
.storage
367-
.get_log_entries(conflict.log_id.index, conflict.log_id.index + 1)
368-
.await
369-
.map(|entries| entries.get(0).map(|entry| entry.log_id.term))
370-
{
371-
Ok(Some(term)) => {
365+
let ent = self.storage.try_get_log_entry(conflict.log_id.index).await;
366+
let ent = match ent {
367+
Ok(x) => x,
368+
Err(err) => {
369+
tracing::error!(error=?err, "error fetching log entry due to returned AppendEntries RPC conflict_opt");
370+
let _ = self.raft_core_tx.send((ReplicaEvent::Shutdown, tracing::debug_span!("CH")));
371+
self.target_state = TargetReplState::Shutdown;
372+
return;
373+
}
374+
};
375+
376+
let ent_term = ent.map(|entry| entry.log_id.term);
377+
match ent_term {
378+
Some(term) => {
372379
self.matched.term = term; // If we have the specified log, ensure we use its term.
373380
}
374-
Ok(None) => {
381+
None => {
375382
// This condition would only ever be reached if the log has been removed due to
376383
// log compaction (barring critical storage failure), so transition to snapshotting.
377384
self.target_state = TargetReplState::Snapshotting;
@@ -384,12 +391,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
384391
));
385392
return;
386393
}
387-
Err(err) => {
388-
tracing::error!(error=%err, "error fetching log entry due to returned AppendEntries RPC conflict_opt");
389-
let _ = self.raft_core_tx.send((ReplicaEvent::Shutdown, tracing::debug_span!("CH")));
390-
self.target_state = TargetReplState::Shutdown;
391-
return;
392-
}
393394
};
394395

395396
// Check snapshot policy and handle conflict as needed.
@@ -676,7 +677,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
676677
/// Ensure there are no gaps in the outbound buffer due to transition from lagging.
677678
#[tracing::instrument(level = "trace", skip(self))]
678679
async fn frontload_outbound_buffer(&mut self, start: u64, stop: u64) {
679-
let entries = match self.replication_core.storage.get_log_entries(start, stop).await {
680+
let entries = match self.replication_core.storage.get_log_entries(start..stop).await {
680681
Ok(entries) => entries,
681682
Err(err) => {
682683
tracing::error!(error=%err, "error while frontloading outbound buffer");
@@ -785,7 +786,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
785786
// Bringing the target up-to-date by fetching the largest possible payload of entries
786787
// from storage within permitted configuration & ensure no snapshot pointer was returned.
787788
let entries =
788-
match self.replication_core.storage.get_log_entries(self.replication_core.next_index, stop_idx).await {
789+
match self.replication_core.storage.get_log_entries(self.replication_core.next_index..stop_idx).await {
789790
Ok(entries) => entries,
790791
Err(err) => {
791792
tracing::error!(error=%err, "error fetching logs from storage");

async-raft/src/storage.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,14 @@ where
159159
/// The start value is inclusive in the search and the stop value is non-inclusive: `[start, stop)`.
160160
///
161161
/// Errors returned from this method will cause Raft to go into shutdown.
162-
async fn get_log_entries(&self, start: u64, stop: u64) -> Result<Vec<Entry<D>>>;
162+
async fn get_log_entries<RNG: RangeBounds<u64> + Clone + Debug + Send + Sync>(
163+
&self,
164+
range: RNG,
165+
) -> Result<Vec<Entry<D>>>;
166+
167+
/// Try to get an log entry.
168+
/// It does not return an error if in defensive mode and the log entry at `log_index` is not found.
169+
async fn try_get_log_entry(&self, log_index: u64) -> Result<Option<Entry<D>>>;
163170

164171
/// Delete all logs in a `range`.
165172
///

async-raft/tests/fixtures/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ impl RaftRouter {
555555
) {
556556
let rt = self.routing_table.read().await;
557557
for (id, (_node, storage)) in rt.iter() {
558-
let last_log = storage.get_log_entries(0, 10_000).await.unwrap().last().unwrap().log_id.index;
558+
let last_log = storage.get_log_entries(..).await.unwrap().last().unwrap().log_id.index;
559559
assert_eq!(
560560
last_log, expect_last_log,
561561
"expected node {} to have last_log {}, got {}",

async-raft/tests/initialization.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async fn initialization() -> Result<()> {
5454

5555
for i in 0..3 {
5656
let sto = router.get_storage_handle(&1).await?;
57-
let first = sto.get_log_entries(1, 2).await?.first().cloned();
57+
let first = sto.get_log_entries(1..2).await?.first().cloned();
5858

5959
tracing::info!("--- check membership is replicated: id: {}, first log: {:?}", i, first);
6060
let mem = match first.unwrap().payload {

async-raft/tests/members_leader_fix_partial.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async fn members_leader_fix_partial() -> Result<()> {
7171
)
7272
.await?;
7373

74-
let final_log = sto.get_log_entries(want, want + 1).await?[0].clone();
74+
let final_log = sto.get_log_entries(want..=want).await?[0].clone();
7575

7676
let m = match final_log.payload {
7777
EntryPayload::ConfigChange(ref m) => m.membership.clone(),

async-raft/tests/snapshot_uses_prev_snap_membership.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
7474
router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, to(), "snapshot").await?;
7575

7676
{
77-
let logs = sto0.get_log_entries(0, 1000).await?;
77+
let logs = sto0.get_log_entries(..).await?;
7878
assert_eq!(1, logs.len(), "only one snapshot pointer log");
7979
}
8080
let m = sto0.get_membership_config().await?;
@@ -117,7 +117,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
117117
tracing::info!("--- check membership");
118118
{
119119
{
120-
let logs = sto0.get_log_entries(0, 1000).await?;
120+
let logs = sto0.get_log_entries(..).await?;
121121
assert_eq!(1, logs.len(), "only one snapshot pointer log");
122122
}
123123
let m = sto0.get_membership_config().await?;

memstore/src/lib.rs

+89-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod test;
55

66
use std::cmp::max;
77
use std::collections::BTreeMap;
8+
use std::collections::Bound;
89
use std::collections::HashMap;
910
use std::fmt::Debug;
1011
use std::io::Cursor;
@@ -307,18 +308,84 @@ impl MemStore {
307308
Ok(())
308309
}
309310

310-
pub async fn defensive_nonempty_range<RT, RNG: RangeBounds<RT> + Clone + Debug + Send + Iterator>(
311+
pub async fn defensive_nonempty_range<RNG: RangeBounds<u64> + Clone + Debug + Send>(
311312
&self,
312313
range: RNG,
313314
) -> anyhow::Result<()> {
314315
if !*self.defensive.read().await {
315316
return Ok(());
316317
}
317-
for _ in range.clone() {
318+
let start = match range.start_bound() {
319+
Bound::Included(i) => Some(*i),
320+
Bound::Excluded(i) => Some(*i + 1),
321+
Bound::Unbounded => None,
322+
};
323+
324+
let end = match range.end_bound() {
325+
Bound::Included(i) => Some(*i),
326+
Bound::Excluded(i) => Some(*i - 1),
327+
Bound::Unbounded => None,
328+
};
329+
330+
if start.is_none() || end.is_none() {
331+
return Ok(());
332+
}
333+
334+
if start > end {
335+
return Err(anyhow::anyhow!("range must be nonempty: {:?}", range));
336+
}
337+
338+
Ok(())
339+
}
340+
341+
pub async fn defensive_range_hits_logs<T: AppData, RNG: RangeBounds<u64> + Debug + Send>(
342+
&self,
343+
range: RNG,
344+
logs: &[Entry<T>],
345+
) -> anyhow::Result<()> {
346+
if !*self.defensive.read().await {
318347
return Ok(());
319348
}
320349

321-
Err(anyhow::anyhow!("range must be nonempty: {:?}", range))
350+
{
351+
let want_first = match range.start_bound() {
352+
Bound::Included(i) => Some(*i),
353+
Bound::Excluded(i) => Some(*i + 1),
354+
Bound::Unbounded => None,
355+
};
356+
357+
let first = logs.first().map(|x| x.log_id.index);
358+
359+
if want_first.is_some() && first != want_first {
360+
return Err(anyhow::anyhow!(
361+
"{:?} want first: {:?}, but {:?}",
362+
range,
363+
want_first,
364+
first
365+
));
366+
}
367+
}
368+
369+
{
370+
let want_last = match range.end_bound() {
371+
Bound::Included(i) => Some(*i),
372+
Bound::Excluded(i) => Some(*i - 1),
373+
Bound::Unbounded => None,
374+
};
375+
376+
let last = logs.last().map(|x| x.log_id.index);
377+
378+
if want_last.is_some() && last != want_last {
379+
return Err(anyhow::anyhow!(
380+
"{:?} want last: {:?}, but {:?}",
381+
range,
382+
want_last,
383+
last
384+
));
385+
}
386+
}
387+
388+
Ok(())
322389
}
323390

324391
pub async fn defensive_apply_log_id_gt_last<D: AppData>(&self, entries: &[&Entry<D>]) -> anyhow::Result<()> {
@@ -477,14 +544,26 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
477544
}
478545

479546
#[tracing::instrument(level = "trace", skip(self))]
480-
async fn get_log_entries(&self, start: u64, stop: u64) -> Result<Vec<Entry<ClientRequest>>> {
481-
// Invalid request, return empty vec.
482-
if start > stop {
483-
tracing::error!("get_log_entries: invalid request, start({}) > stop({})", start, stop);
484-
return Ok(vec![]);
485-
}
547+
async fn get_log_entries<RNG: RangeBounds<u64> + Clone + Debug + Send + Sync>(
548+
&self,
549+
range: RNG,
550+
) -> Result<Vec<Entry<ClientRequest>>> {
551+
self.defensive_nonempty_range(range.clone()).await?;
552+
553+
let res = {
554+
let log = self.log.read().await;
555+
log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>()
556+
};
557+
558+
self.defensive_range_hits_logs(range, &res).await?;
559+
560+
Ok(res)
561+
}
562+
563+
#[tracing::instrument(level = "trace", skip(self))]
564+
async fn try_get_log_entry(&self, log_index: u64) -> Result<Option<Entry<ClientRequest>>> {
486565
let log = self.log.read().await;
487-
Ok(log.range(start..stop).map(|(_, val)| val.clone()).collect())
566+
Ok(log.get(&log_index).cloned())
488567
}
489568

490569
#[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))]

0 commit comments

Comments
 (0)