Skip to content

Commit c61b4c4

Browse files
committed
change: remove ConflictOpt, which is a wrapper of log_id; add matched log id in AppendEntriesResponse
1 parent 6155117 commit c61b4c4

File tree

7 files changed

+72
-103
lines changed

7 files changed

+72
-103
lines changed

async-raft/src/core/append_entries.rs

+14-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::core::UpdateCurrentLeader;
55
use crate::error::RaftResult;
66
use crate::raft::AppendEntriesRequest;
77
use crate::raft::AppendEntriesResponse;
8-
use crate::raft::ConflictOpt;
98
use crate::raft::Entry;
109
use crate::raft::EntryPayload;
1110
use crate::ActiveMembership;
@@ -36,8 +35,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
3635
tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term");
3736
return Ok(AppendEntriesResponse {
3837
term: self.current_term,
39-
success: false,
40-
conflict_opt: None,
38+
matched: None,
39+
conflict: None,
4140
});
4241
}
4342

@@ -247,11 +246,19 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
247246

248247
return Ok(AppendEntriesResponse {
249248
term: self.current_term,
250-
success: false,
251-
conflict_opt: Some(ConflictOpt { log_id: *prev_log_id }),
249+
matched: None,
250+
conflict: Some(*prev_log_id),
252251
});
253252
}
254253

254+
// If prev_log_id matches local entry, then every inconsistent entries will be removed.
255+
// Thus the last known matching log id has to be the last entry id.
256+
let matched = Some(if entries.is_empty() {
257+
*prev_log_id
258+
} else {
259+
entries[entries.len() - 1].log_id
260+
});
261+
255262
// The entries left are all inconsistent log or absent
256263
let (n_matching, entries) = self.skip_matching_entries(entries).await?;
257264

@@ -279,8 +286,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
279286

280287
Ok(AppendEntriesResponse {
281288
term: self.current_term,
282-
success: true,
283-
conflict_opt: None,
289+
matched,
290+
conflict: None,
284291
})
285292
}
286293

async-raft/src/raft.rs

+15-17
Original file line numberDiff line numberDiff line change
@@ -520,26 +520,23 @@ impl<D: AppData> MessageSummary for AppendEntriesRequest<D> {
520520
pub struct AppendEntriesResponse {
521521
/// The responding node's current term, for leader to update itself.
522522
pub term: u64,
523-
/// Will be true if follower contained entry matching `prev_log_index` and `prev_log_term`.
524-
pub success: bool,
525-
/// A value used to implement the _conflicting term_ optimization outlined in §5.3.
523+
524+
/// The last matching log id on follower.
525+
///
526+
/// It is a successful append-entry iff `matched` is `Some()`.
527+
pub matched: Option<LogId>,
528+
529+
/// The log id that is different from the leader on follower.
526530
///
527-
/// This value will only be present, and should only be considered, when `success` is `false`.
528-
pub conflict_opt: Option<ConflictOpt>,
531+
/// `conflict` is None if `matched` is `Some()`, because if there is a matching entry, all following inconsistent
532+
/// entries will be deleted.
533+
pub conflict: Option<LogId>,
529534
}
530535

531-
/// A struct used to implement the _conflicting term_ optimization outlined in §5.3 for log replication.
532-
///
533-
/// This value will only be present, and should only be considered, when an `AppendEntriesResponse`
534-
/// object has a `success` value of `false`.
535-
///
536-
/// This implementation of Raft uses this value to more quickly synchronize a leader with its
537-
/// followers which may be some distance behind in replication, may have conflicting entries, or
538-
/// which may be new to the cluster.
539-
#[derive(Debug, Serialize, Deserialize, PartialEq)]
540-
pub struct ConflictOpt {
541-
/// The most recent entry which does not conflict with the received request.
542-
pub log_id: LogId,
536+
impl AppendEntriesResponse {
537+
pub fn success(&self) -> bool {
538+
self.matched.is_some()
539+
}
543540
}
544541

545542
/// A Raft log entry.
@@ -683,6 +680,7 @@ impl MembershipConfig {
683680
pub struct VoteRequest {
684681
/// The candidate's current term.
685682
pub term: u64,
683+
686684
/// The candidate's ID.
687685
pub candidate_id: u64,
688686

async-raft/src/replication/mod.rs

+7-16
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
307307
break (prev_log_id, logs);
308308
};
309309

310-
let last_log_id = if logs.is_empty() {
311-
prev_log_id
312-
} else {
313-
logs[logs.len() - 1].log_id
314-
};
315-
316310
// Build the heartbeat frame to be sent to the follower.
317311
let payload = AppendEntriesRequest {
318312
term: self.term,
@@ -350,13 +344,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
350344
}
351345
};
352346

353-
tracing::debug!(%last_log_id, "append_entries resp: {:?}", append_resp);
347+
tracing::debug!("append_entries resp: {:?}", append_resp);
354348

355349
// Handle success conditions.
356-
if append_resp.success {
357-
self.matched = last_log_id;
350+
if append_resp.success() {
351+
self.matched = append_resp.matched.unwrap();
358352
// TODO(xp): if matched does not change, do not bother the core.
359-
self.update_max_possible_matched_index(last_log_id.index);
353+
self.update_max_possible_matched_index(self.matched.index);
360354
self.update_matched();
361355

362356
return Ok(());
@@ -375,21 +369,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
375369
}
376370

377371
// Replication was not successful, handle conflict optimization record, else decrement `next_index`.
378-
let conflict = append_resp.conflict_opt.unwrap();
372+
let conflict = append_resp.conflict.unwrap();
379373

380374
tracing::debug!(
381375
?conflict,
382376
append_resp.term,
383377
"append entries failed, handling conflict opt"
384378
);
385379

386-
assert_eq!(
387-
conflict.log_id, prev_log_id,
388-
"if conflict, it is always the prev_log_id"
389-
);
380+
assert_eq!(conflict, prev_log_id, "if conflict, it is always the prev_log_id");
390381

391382
// Continue to find the matching log id on follower.
392-
self.max_possible_matched_index = conflict.log_id.index - 1;
383+
self.max_possible_matched_index = conflict.index - 1;
393384
Ok(())
394385
}
395386

async-raft/tests/append_conflicts.rs

+20-36
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use anyhow::Result;
44
use async_raft::raft::AppendEntriesRequest;
5-
use async_raft::raft::ConflictOpt;
65
use async_raft::raft::Entry;
76
use async_raft::Config;
87
use async_raft::LogId;
@@ -55,8 +54,8 @@ async fn append_conflicts() -> Result<()> {
5554
};
5655

5756
let resp = r0.append_entries(req.clone()).await?;
58-
assert!(resp.success);
59-
assert_eq!(None, resp.conflict_opt);
57+
assert!(resp.success());
58+
assert_eq!(None, resp.conflict);
6059

6160
check_logs(&sto0, vec![0]).await?;
6261

@@ -72,16 +71,16 @@ async fn append_conflicts() -> Result<()> {
7271
};
7372

7473
let resp = r0.append_entries(req.clone()).await?;
75-
assert!(resp.success);
76-
assert_eq!(None, resp.conflict_opt);
74+
assert!(resp.success());
75+
assert_eq!(None, resp.conflict);
7776

7877
check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?;
7978

8079
tracing::info!("--- case 0: prev_log_id.index == 0, last_log_id mismatch");
8180

8281
let resp = r0.append_entries(req.clone()).await?;
83-
assert!(resp.success);
84-
assert_eq!(None, resp.conflict_opt);
82+
assert!(resp.success());
83+
assert_eq!(None, resp.conflict);
8584

8685
check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?;
8786

@@ -97,8 +96,8 @@ async fn append_conflicts() -> Result<()> {
9796
};
9897

9998
let resp = r0.append_entries(req).await?;
100-
assert!(resp.success);
101-
assert_eq!(None, resp.conflict_opt);
99+
assert!(resp.success());
100+
assert_eq!(None, resp.conflict);
102101

103102
check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?;
104103

@@ -114,8 +113,8 @@ async fn append_conflicts() -> Result<()> {
114113
};
115114

116115
let resp = r0.append_entries(req).await?;
117-
assert!(resp.success);
118-
assert_eq!(None, resp.conflict_opt);
116+
assert!(resp.success());
117+
assert_eq!(None, resp.conflict);
119118

120119
check_logs(&sto0, vec![0, 1, 1, 2]).await?;
121120

@@ -129,13 +128,8 @@ async fn append_conflicts() -> Result<()> {
129128
};
130129

131130
let resp = r0.append_entries(req).await?;
132-
assert!(!resp.success);
133-
assert_eq!(
134-
Some(ConflictOpt {
135-
log_id: LogId { term: 1, index: 2000 }
136-
}),
137-
resp.conflict_opt
138-
);
131+
assert!(!resp.success());
132+
assert_eq!(Some(LogId::new(1, 2000)), resp.conflict);
139133

140134
check_logs(&sto0, vec![0, 1, 1, 2]).await?;
141135

@@ -150,14 +144,9 @@ async fn append_conflicts() -> Result<()> {
150144
};
151145

152146
let resp = r0.append_entries(req).await?;
153-
assert!(!resp.success);
147+
assert!(!resp.success());
154148
// returns the id just before prev_log_id.index
155-
assert_eq!(
156-
Some(ConflictOpt {
157-
log_id: LogId { term: 3, index: 3 }
158-
}),
159-
resp.conflict_opt
160-
);
149+
assert_eq!(Some(LogId::new(3, 3)), resp.conflict);
161150

162151
check_logs(&sto0, vec![0, 1, 1]).await?;
163152

@@ -172,8 +161,8 @@ async fn append_conflicts() -> Result<()> {
172161
};
173162

174163
let resp = r0.append_entries(req).await?;
175-
assert!(resp.success);
176-
assert_eq!(None, resp.conflict_opt);
164+
assert!(resp.success());
165+
assert_eq!(None, resp.conflict);
177166

178167
// check prepared store
179168
check_logs(&sto0, vec![0, 1, 1, 2, 2, 2]).await?;
@@ -188,8 +177,8 @@ async fn append_conflicts() -> Result<()> {
188177
};
189178

190179
let resp = r0.append_entries(req).await?;
191-
assert!(resp.success);
192-
assert_eq!(None, resp.conflict_opt);
180+
assert!(resp.success());
181+
assert_eq!(None, resp.conflict);
193182

194183
check_logs(&sto0, vec![0, 1, 1, 2, 3]).await?;
195184

@@ -205,13 +194,8 @@ async fn append_conflicts() -> Result<()> {
205194
};
206195

207196
let resp = r0.append_entries(req).await?;
208-
assert!(!resp.success);
209-
assert_eq!(
210-
Some(ConflictOpt {
211-
log_id: LogId { term: 1, index: 200 }
212-
}),
213-
resp.conflict_opt
214-
);
197+
assert!(!resp.success());
198+
assert_eq!(Some(LogId::new(1, 200)), resp.conflict);
215199

216200
Ok(())
217201
}

async-raft/tests/append_updates_membership.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ async fn append_updates_membership() -> Result<()> {
7171
};
7272

7373
let resp = r0.append_entries(req.clone()).await?;
74-
assert!(resp.success);
75-
assert_eq!(None, resp.conflict_opt);
74+
assert!(resp.success());
75+
assert_eq!(None, resp.conflict);
7676

7777
r0.wait(timeout()).members(btreeset! {1,2,3,4}, "append-entries update membership").await?;
7878
}
@@ -88,8 +88,8 @@ async fn append_updates_membership() -> Result<()> {
8888
};
8989

9090
let resp = r0.append_entries(req.clone()).await?;
91-
assert!(resp.success);
92-
assert_eq!(None, resp.conflict_opt);
91+
assert!(resp.success());
92+
assert_eq!(None, resp.conflict);
9393

9494
r0.wait(timeout()).members(btreeset! {1,2}, "deleting inconsistent lgos updates membership").await?;
9595
}

async-raft/tests/compaction.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ async fn compaction() -> Result<()> {
138138
})
139139
.await?;
140140

141-
assert!(res.success);
142-
assert_eq!(None, res.conflict_opt);
141+
assert!(res.success());
142+
assert_eq!(None, res.conflict);
143143
}
144144

145145
Ok(())

async-raft/tests/conflict_with_empty_entries.rs

+10-21
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use anyhow::Result;
44
use async_raft::raft::AppendEntriesRequest;
5-
use async_raft::raft::ConflictOpt;
65
use async_raft::raft::Entry;
76
use async_raft::raft::EntryPayload;
87
use async_raft::Config;
@@ -57,15 +56,10 @@ async fn conflict_with_empty_entries() -> Result<()> {
5756
};
5857

5958
let resp = router.send_append_entries(0, rpc).await?;
60-
assert!(!resp.success);
61-
assert!(resp.conflict_opt.is_some());
62-
let c = resp.conflict_opt.unwrap();
63-
assert_eq!(
64-
ConflictOpt {
65-
log_id: LogId { term: 1, index: 5 }
66-
},
67-
c
68-
);
59+
assert!(!resp.success());
60+
assert!(resp.conflict.is_some());
61+
let c = resp.conflict.unwrap();
62+
assert_eq!(LogId { term: 1, index: 5 }, c);
6963

7064
// Feed 2 logs
7165

@@ -91,8 +85,8 @@ async fn conflict_with_empty_entries() -> Result<()> {
9185
};
9286

9387
let resp = router.send_append_entries(0, rpc).await?;
94-
assert!(resp.success);
95-
assert!(resp.conflict_opt.is_none());
88+
assert!(resp.success());
89+
assert!(resp.conflict.is_none());
9690

9791
// Expect a conflict with prev_log_index == 3
9892

@@ -105,15 +99,10 @@ async fn conflict_with_empty_entries() -> Result<()> {
10599
};
106100

107101
let resp = router.send_append_entries(0, rpc).await?;
108-
assert!(!resp.success);
109-
assert!(resp.conflict_opt.is_some());
110-
let c = resp.conflict_opt.unwrap();
111-
assert_eq!(
112-
ConflictOpt {
113-
log_id: LogId { term: 1, index: 3 }
114-
},
115-
c
116-
);
102+
assert!(!resp.success());
103+
assert!(resp.conflict.is_some());
104+
let c = resp.conflict.unwrap();
105+
assert_eq!(LogId::new(1, 3), c);
117106

118107
Ok(())
119108
}

0 commit comments

Comments
 (0)