Skip to content

Commit 58d8e3a

Browse files
committed
change: AppendEntriesRequest: merge prev_log_{term,index} into prev_log: LogId
1 parent 9c5f3d7 commit 58d8e3a

File tree

5 files changed

+19
-23
lines changed

5 files changed

+19
-23
lines changed

async-raft/src/core/append_entries.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
2020
/// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo.
2121
#[tracing::instrument(
2222
level="trace", skip(self, msg),
23-
fields(term=msg.term, leader_id=msg.leader_id, prev_log_index=msg.prev_log_index, prev_log_term=msg.prev_log_term, leader_commit=msg.leader_commit),
23+
fields(term=msg.term, leader_id=msg.leader_id, prev_log_index=msg.prev_log.index, prev_log_term=msg.prev_log.term, leader_commit=msg.leader_commit),
2424
)]
2525
pub(super) async fn handle_append_entries_request(
2626
&mut self,
@@ -61,9 +61,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
6161

6262
// If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local
6363
// log info, then replication is g2g.
64-
let msg_prev_index_is_min = msg.prev_log_index == u64::min_value();
64+
let msg_prev_index_is_min = msg.prev_log.index == u64::min_value();
6565
let msg_index_and_term_match =
66-
(msg.prev_log_index == self.last_log.index) && (msg.prev_log_term == self.last_log.term);
66+
(msg.prev_log.index == self.last_log.index) && (msg.prev_log.term == self.last_log.term);
6767
if msg_prev_index_is_min || msg_index_and_term_match {
6868
self.append_log_entries(&msg.entries).await?;
6969
self.replicate_to_state_machine_if_needed(msg.entries).await;
@@ -85,7 +85,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
8585
// result.
8686
let entries = self
8787
.storage
88-
.get_log_entries(msg.prev_log_index, msg.prev_log_index + 1)
88+
.get_log_entries(msg.prev_log.index, msg.prev_log.index + 1)
8989
.await
9090
.map_err(|err| self.map_fatal_storage_error(err))?;
9191
let target_entry = match entries.first() {
@@ -108,7 +108,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
108108
};
109109

110110
// The target entry was found. Compare its term with target term to ensure everything is consistent.
111-
if target_entry.term == msg.prev_log_term {
111+
if target_entry.term == msg.prev_log.term {
112112
// We've found a point of agreement with the leader. If we have any logs present
113113
// with an index greater than this, then we must delete them per §5.3.
114114
if self.last_log.index > target_entry.index {
@@ -124,17 +124,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
124124
// The target entry does not have the same term. Fetch the last 50 logs, and use the last
125125
// entry of that payload which is still in the target term for conflict optimization.
126126
else {
127-
let start = if msg.prev_log_index >= 50 {
128-
msg.prev_log_index - 50
127+
let start = if msg.prev_log.index >= 50 {
128+
msg.prev_log.index - 50
129129
} else {
130130
0
131131
};
132132
let old_entries = self
133133
.storage
134-
.get_log_entries(start, msg.prev_log_index)
134+
.get_log_entries(start, msg.prev_log.index)
135135
.await
136136
.map_err(|err| self.map_fatal_storage_error(err))?;
137-
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log_term) {
137+
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log.term) {
138138
Some(entry) => Some(ConflictOpt {
139139
term: entry.term,
140140
index: entry.index,

async-raft/src/core/client.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
165165
let rpc = AppendEntriesRequest {
166166
term: self.core.current_term,
167167
leader_id: self.core.id,
168-
prev_log_index: node.matched.index,
169-
prev_log_term: node.matched.term,
168+
prev_log: node.matched,
170169
entries: vec![],
171170
leader_commit: self.core.commit_index,
172171
};

async-raft/src/raft.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,10 @@ pub struct AppendEntriesRequest<D: AppData> {
361361
pub term: u64,
362362
/// The leader's ID. Useful in redirecting clients.
363363
pub leader_id: u64,
364-
/// The index of the log entry immediately preceding the new entries.
365-
pub prev_log_index: u64,
366-
/// The term of the `prev_log_index` entry.
367-
pub prev_log_term: u64,
364+
365+
/// The log entry immediately preceding the new entries.
366+
pub prev_log: LogId,
367+
368368
/// The new log entries to store.
369369
///
370370
/// This may be empty when the leader is sending heartbeats. Entries

async-raft/src/replication/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
239239
let payload = AppendEntriesRequest {
240240
term: self.term,
241241
leader_id: self.id,
242-
prev_log_index: self.matched.index,
243-
prev_log_term: self.matched.term,
242+
prev_log: self.matched,
244243
leader_commit: self.commit_index,
245244
entries: self.outbound_buffer.iter().map(|entry| entry.as_ref().clone()).collect(),
246245
};

async-raft/tests/conflict_with_empty_entries.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use async_raft::raft::Entry;
77
use async_raft::raft::EntryNormal;
88
use async_raft::raft::EntryPayload;
99
use async_raft::Config;
10+
use async_raft::LogId;
1011
use async_raft::RaftNetwork;
1112
use fixtures::RaftRouter;
1213
use memstore::ClientRequest;
@@ -49,8 +50,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
4950
let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
5051
term: 1,
5152
leader_id: 1,
52-
prev_log_index: 5,
53-
prev_log_term: 1,
53+
prev_log: LogId { term: 1, index: 5 },
5454
entries: vec![],
5555
leader_commit: 5,
5656
};
@@ -66,8 +66,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
6666
let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
6767
term: 1,
6868
leader_id: 1,
69-
prev_log_index: 0,
70-
prev_log_term: 1,
69+
prev_log: LogId { term: 1, index: 0 },
7170
entries: vec![
7271
Entry {
7372
term: 1,
@@ -98,8 +97,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
9897
let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
9998
term: 1,
10099
leader_id: 1,
101-
prev_log_index: 3,
102-
prev_log_term: 1,
100+
prev_log: LogId { term: 1, index: 3 },
103101
entries: vec![],
104102
leader_commit: 5,
105103
};

0 commit comments

Comments
 (0)