Skip to content

Commit 24e3813

Browse files
committed
change: Entry: merge term and index to log_id: LogId
1 parent 9e4fb64 commit 24e3813

File tree

8 files changed

+70
-103
lines changed

8 files changed

+70
-103
lines changed

async-raft/src/core/append_entries.rs

+12-21
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::raft::Entry;
99
use crate::raft::EntryPayload;
1010
use crate::AppData;
1111
use crate::AppDataResponse;
12-
use crate::LogId;
1312
use crate::RaftNetwork;
1413
use crate::RaftStorage;
1514
use crate::Update;
@@ -105,12 +104,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
105104
};
106105

107106
// The target entry was found. Compare its term with target term to ensure everything is consistent.
108-
if target_entry.term == msg.prev_log.term {
107+
if target_entry.log_id.term == msg.prev_log.term {
109108
// We've found a point of agreement with the leader. If we have any logs present
110109
// with an index greater than this, then we must delete them per §5.3.
111-
if self.last_log.index > target_entry.index {
110+
if self.last_log.index > target_entry.log_id.index {
112111
self.storage
113-
.delete_logs_from(target_entry.index + 1, None)
112+
.delete_logs_from(target_entry.log_id.index + 1, None)
114113
.await
115114
.map_err(|err| self.map_fatal_storage_error(err))?;
116115
let membership =
@@ -131,13 +130,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
131130
.get_log_entries(start, msg.prev_log.index)
132131
.await
133132
.map_err(|err| self.map_fatal_storage_error(err))?;
134-
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log.term) {
135-
Some(entry) => Some(ConflictOpt {
136-
log_id: LogId {
137-
term: entry.term,
138-
index: entry.index,
139-
},
140-
}),
133+
let opt = match old_entries.iter().find(|entry| entry.log_id.term == msg.prev_log.term) {
134+
Some(entry) => Some(ConflictOpt { log_id: entry.log_id }),
141135
None => Some(ConflictOpt { log_id: self.last_log }),
142136
};
143137
if report_metrics {
@@ -188,10 +182,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
188182
// Replicate entries to log (same as append, but in follower mode).
189183
self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?;
190184
if let Some(entry) = entries.last() {
191-
self.last_log = LogId {
192-
term: entry.term,
193-
index: entry.index,
194-
};
185+
self.last_log = entry.log_id;
195186
}
196187
Ok(())
197188
}
@@ -204,7 +195,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
204195
async fn replicate_to_state_machine_if_needed(&mut self, entries: Vec<Entry<D>>) {
205196
// Update cache. Always.
206197
for entry in entries {
207-
self.entries_cache.insert(entry.index, entry);
198+
self.entries_cache.insert(entry.log_id.index, entry);
208199
}
209200
// Perform initial replication to state machine if needed.
210201
if !self.has_completed_initial_replication_to_sm {
@@ -222,7 +213,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
222213
}
223214
// If we have no cached entries, then do nothing.
224215
let first_idx = match self.entries_cache.iter().next() {
225-
Some((_, entry)) => entry.index,
216+
Some((_, entry)) => entry.log_id.index,
226217
None => return,
227218
};
228219

@@ -231,9 +222,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
231222
let entries: Vec<_> = (first_idx..=self.commit_index)
232223
.filter_map(|idx| {
233224
if let Some(entry) = self.entries_cache.remove(&idx) {
234-
last_entry_seen = Some(entry.index);
225+
last_entry_seen = Some(entry.log_id.index);
235226
match entry.payload {
236-
EntryPayload::Normal(inner) => Some((entry.index, inner.data)),
227+
EntryPayload::Normal(inner) => Some((entry.log_id.index, inner.data)),
237228
_ => None,
238229
}
239230
} else {
@@ -285,12 +276,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
285276
let mut new_last_applied: Option<u64> = None;
286277
let entries = storage.get_log_entries(start, stop).await?;
287278
if let Some(entry) = entries.last() {
288-
new_last_applied = Some(entry.index);
279+
new_last_applied = Some(entry.log_id.index);
289280
}
290281
let data_entries: Vec<_> = entries
291282
.iter()
292283
.filter_map(|entry| match &entry.payload {
293-
EntryPayload::Normal(inner) => Some((&entry.index, &inner.data)),
284+
EntryPayload::Normal(inner) => Some((&entry.log_id.index, &inner.data)),
294285
_ => None,
295286
})
296287
.collect();

async-raft/src/core/client.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::raft::EntryPayload;
2525
use crate::replication::RaftEvent;
2626
use crate::AppData;
2727
use crate::AppDataResponse;
28+
use crate::LogId;
2829
use crate::RaftNetwork;
2930
use crate::RaftStorage;
3031

@@ -252,16 +253,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
252253
#[tracing::instrument(level = "trace", skip(self, payload))]
253254
pub(super) async fn append_payload_to_log(&mut self, payload: EntryPayload<D>) -> RaftResult<Entry<D>> {
254255
let entry = Entry {
255-
index: self.core.last_log.index + 1,
256-
term: self.core.current_term,
256+
log_id: LogId {
257+
index: self.core.last_log.index + 1,
258+
term: self.core.current_term,
259+
},
257260
payload,
258261
};
259262
self.core
260263
.storage
261264
.append_entry_to_log(&entry)
262265
.await
263266
.map_err(|err| self.core.map_fatal_storage_error(err))?;
264-
self.core.last_log.index = entry.index;
267+
self.core.last_log.index = entry.log_id.index;
265268
Ok(entry)
266269
}
267270

@@ -285,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
285288
}
286289
} else {
287290
// Else, there are no voting nodes for replication, so the payload is now committed.
288-
self.core.commit_index = entry_arc.index;
291+
self.core.commit_index = entry_arc.log_id.index;
289292
self.leader_report_metrics();
290293
self.client_request_post_commit(req).await;
291294
}
@@ -308,10 +311,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
308311
ClientOrInternalResponseTx::Client(tx) => {
309312
match &req.entry.payload {
310313
EntryPayload::Normal(inner) => {
311-
match self.apply_entry_to_state_machine(&req.entry.index, &inner.data).await {
314+
match self.apply_entry_to_state_machine(&req.entry.log_id.index, &inner.data).await {
312315
Ok(data) => {
313316
let _ = tx.send(Ok(ClientWriteResponse {
314-
index: req.entry.index,
317+
index: req.entry.log_id.index,
315318
data,
316319
}));
317320
}
@@ -331,9 +334,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
331334
}
332335
}
333336
ClientOrInternalResponseTx::Internal(tx) => {
334-
self.core.last_applied = req.entry.index;
337+
self.core.last_applied = req.entry.log_id.index;
335338
self.leader_report_metrics();
336-
let _ = tx.send(Ok(req.entry.index));
339+
let _ = tx.send(Ok(req.entry.log_id.index));
337340
}
338341
}
339342

@@ -358,12 +361,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
358361
.await
359362
.map_err(|err| self.core.map_fatal_storage_error(err))?;
360363
if let Some(entry) = entries.last() {
361-
self.core.last_applied = entry.index;
364+
self.core.last_applied = entry.log_id.index;
362365
}
363366
let data_entries: Vec<_> = entries
364367
.iter()
365368
.filter_map(|entry| match &entry.payload {
366-
EntryPayload::Normal(inner) => Some((&entry.index, &inner.data)),
369+
EntryPayload::Normal(inner) => Some((&entry.log_id.index, &inner.data)),
367370
_ => None,
368371
})
369372
.collect();

async-raft/src/core/replication.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
185185
.awaiting_committed
186186
.iter()
187187
.enumerate()
188-
.take_while(|(_idx, elem)| elem.entry.index <= self.core.commit_index)
188+
.take_while(|(_idx, elem)| elem.entry.log_id.index <= self.core.commit_index)
189189
.last()
190190
.map(|(idx, _)| idx);
191191

async-raft/src/raft.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -405,10 +405,8 @@ pub struct ConflictOpt {
405405
/// A Raft log entry.
406406
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
407407
pub struct Entry<D: AppData> {
408-
/// This entry's term.
409-
pub term: u64,
410-
/// This entry's index.
411-
pub index: u64,
408+
pub log_id: LogId,
409+
412410
/// This entry's payload.
413411
#[serde(bound = "D: AppData")]
414412
pub payload: EntryPayload<D>,
@@ -428,8 +426,7 @@ impl<D: AppData> Entry<D> {
428426
/// latest membership covered by the snapshot.
429427
pub fn new_snapshot_pointer(index: u64, term: u64, id: String, membership: MembershipConfig) -> Self {
430428
Entry {
431-
term,
432-
index,
429+
log_id: LogId { term, index },
433430
payload: EntryPayload::SnapshotPointer(EntrySnapshotPointer { id, membership }),
434431
}
435432
}

async-raft/src/replication/mod.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -264,24 +264,24 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
264264
return;
265265
}
266266
};
267-
let last_index_and_term = self.outbound_buffer.last().map(|last| (last.as_ref().index, last.as_ref().term));
267+
let last_log_id = self.outbound_buffer.last().map(|last| last.as_ref().log_id);
268268

269269
// Once we've successfully sent a payload of entries, don't send them again.
270270
self.outbound_buffer.clear();
271271

272-
tracing::debug!("append_entries last: {:?}", last_index_and_term);
272+
tracing::debug!("append_entries last: {:?}", last_log_id);
273273

274274
// Handle success conditions.
275275
if res.success {
276-
tracing::debug!("append entries succeeded to {:?}", last_index_and_term);
276+
tracing::debug!("append entries succeeded to {:?}", last_log_id);
277277

278278
// If this was a proper replication event (last index & term were provided), then update state.
279-
if let Some((index, term)) = last_index_and_term {
280-
self.next_index = index + 1; // This should always be the next expected index.
281-
self.matched = (term, index).into();
279+
if let Some(log_id) = last_log_id {
280+
self.next_index = log_id.index + 1; // This should always be the next expected index.
281+
self.matched = log_id;
282282
let _ = self.raft_tx.send(ReplicaEvent::UpdateMatchIndex {
283283
target: self.target,
284-
matched: (term, index).into(),
284+
matched: log_id,
285285
});
286286

287287
// If running at line rate, and our buffered outbound requests have accumulated too
@@ -338,7 +338,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
338338
.storage
339339
.get_log_entries(conflict.log_id.index, conflict.log_id.index + 1)
340340
.await
341-
.map(|entries| entries.get(0).map(|entry| entry.term))
341+
.map(|entries| entries.get(0).map(|entry| entry.log_id.term))
342342
{
343343
Ok(Some(term)) => {
344344
self.matched.term = term; // If we have the specified log, ensure we use its term.
@@ -425,7 +425,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
425425

426426
RaftEvent::Replicate { entry, commit_index } => {
427427
self.commit_index = commit_index;
428-
self.last_log_index = entry.index;
428+
self.last_log_index = entry.log_id.index;
429429
if self.target_state == TargetReplState::LineRate {
430430
self.replication_buffer.push(entry);
431431
}
@@ -568,8 +568,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
568568
.core
569569
.outbound_buffer
570570
.first()
571-
.map(|entry| entry.as_ref().index)
572-
.or_else(|| self.core.replication_buffer.first().map(|entry| entry.index));
571+
.map(|entry| entry.as_ref().log_id.index)
572+
.or_else(|| self.core.replication_buffer.first().map(|entry| entry.log_id.index));
573573

574574
// When converting to `LaggingState`, `outbound_buffer` and `replication_buffer` is cleared,
575575
// in which there may be uncommitted logs.

async-raft/tests/conflict_with_empty_entries.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,11 @@ async fn conflict_with_empty_entries() -> Result<()> {
7474
prev_log: LogId { term: 1, index: 0 },
7575
entries: vec![
7676
Entry {
77-
term: 1,
78-
index: 1,
77+
log_id: (1, 1).into(),
7978
payload: EntryPayload::Blank,
8079
},
8180
Entry {
82-
term: 1,
83-
index: 2,
81+
log_id: (1, 2).into(),
8482
payload: EntryPayload::Normal(EntryNormal {
8583
data: ClientRequest {
8684
client: "foo".to_string(),

memstore/src/lib.rs

+9-13
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use async_raft::storage::HardState;
1919
use async_raft::storage::InitialState;
2020
use async_raft::AppData;
2121
use async_raft::AppDataResponse;
22-
use async_raft::LogId;
2322
use async_raft::NodeId;
2423
use async_raft::RaftStorage;
2524
use async_raft::SnapshotId;
@@ -188,16 +187,13 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
188187
let sm = self.sm.read().await;
189188
match &mut *hs {
190189
Some(inner) => {
191-
let (last_log_index, last_log_term) = match log.values().rev().next() {
192-
Some(log) => (log.index, log.term),
193-
None => (0, 0),
190+
let last_log_id = match log.values().rev().next() {
191+
Some(log) => log.log_id,
192+
None => (0, 0).into(),
194193
};
195194
let last_applied_log = sm.last_applied_log;
196195
Ok(InitialState {
197-
last_log: LogId {
198-
term: last_log_term,
199-
index: last_log_index,
200-
},
196+
last_log: last_log_id,
201197
last_applied_log,
202198
hard_state: inner.clone(),
203199
membership,
@@ -251,15 +247,15 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
251247
#[tracing::instrument(level = "trace", skip(self, entry))]
252248
async fn append_entry_to_log(&self, entry: &Entry<ClientRequest>) -> Result<()> {
253249
let mut log = self.log.write().await;
254-
log.insert(entry.index, entry.clone());
250+
log.insert(entry.log_id.index, entry.clone());
255251
Ok(())
256252
}
257253

258254
#[tracing::instrument(level = "trace", skip(self, entries))]
259255
async fn replicate_to_log(&self, entries: &[Entry<ClientRequest>]) -> Result<()> {
260256
let mut log = self.log.write().await;
261257
for entry in entries {
262-
log.insert(entry.index, entry.clone());
258+
log.insert(entry.log_id.index, entry.clone());
263259
}
264260
Ok(())
265261
}
@@ -311,7 +307,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
311307
membership_config = log
312308
.values()
313309
.rev()
314-
.skip_while(|entry| entry.index > last_applied_log)
310+
.skip_while(|entry| entry.log_id.index > last_applied_log)
315311
.find_map(|entry| match &entry.payload {
316312
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
317313
_ => None,
@@ -333,7 +329,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
333329
let mut current_snapshot = self.current_snapshot.write().await;
334330
term = log
335331
.get(&last_applied_log)
336-
.map(|entry| entry.term)
332+
.map(|entry| entry.log_id.term)
337333
.ok_or_else(|| anyhow::anyhow!(ERR_INCONSISTENT_LOG))?;
338334
*log = log.split_off(&last_applied_log);
339335
log.insert(
@@ -402,7 +398,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
402398
let membership_config = log
403399
.values()
404400
.rev()
405-
.skip_while(|entry| entry.index > index)
401+
.skip_while(|entry| entry.log_id.index > index)
406402
.find_map(|entry| match &entry.payload {
407403
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
408404
_ => None,

0 commit comments

Comments
 (0)