Skip to content

Commit b43c085

Browse files
committed
change: track committed log id instead of just a commit index
1 parent 50964d0 commit b43c085

15 files changed

+117
-83
lines changed

async-raft/src/core/admin.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
121121

122122
// The last membership config is not committed yet.
123123
// Can not process the next one.
124-
if self.core.commit_index < self.core.effective_membership.log_id.index {
124+
if self.core.committed < self.core.effective_membership.log_id {
125125
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
126126
ChangeMembershipError::InProgress {
127127
membership_log_id: self.core.effective_membership.log_id,

async-raft/src/core/append_entries.rs

+16-16
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
5858
// Then it will update commit_index to 3 and apply {2,3}
5959

6060
// TODO(xp): cleanup commit index at sender side.
61-
let valid_commit_index = msg_entries.last().map(|x| x.log_id).unwrap_or_else(|| msg.prev_log_id).index;
62-
let valid_commit_index = std::cmp::min(msg.leader_commit, valid_commit_index);
61+
let valid_commit_index = msg_entries.last().map(|x| x.log_id).unwrap_or_else(|| msg.prev_log_id);
62+
let valid_committed = std::cmp::min(msg.leader_commit, valid_commit_index);
6363

6464
tracing::debug!("start to check and update to latest term/leader");
6565
{
@@ -94,7 +94,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
9494
// +----------------+------------------------+
9595
// ` 0 ` last_applied ` last_log_id
9696

97-
return self.append_apply_log_entries(&msg.prev_log_id, msg_entries, valid_commit_index).await;
97+
return self.append_apply_log_entries(&msg.prev_log_id, msg_entries, valid_committed).await;
9898
}
9999

100100
#[tracing::instrument(level = "debug", skip(self))]
@@ -227,13 +227,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
227227
&mut self,
228228
prev_log_id: &LogId,
229229
entries: &[Entry<D>],
230-
commit_index: u64,
230+
committed: LogId,
231231
) -> RaftResult<AppendEntriesResponse> {
232232
let matching = self.does_log_id_match(prev_log_id).await?;
233233
tracing::debug!(
234-
"check prev_log_id {} match: commit_index: {}, matches: {}",
234+
"check prev_log_id {} match: committed: {}, matches: {}",
235235
prev_log_id,
236-
self.commit_index,
236+
self.committed,
237237
matching,
238238
);
239239

@@ -263,7 +263,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
263263
let (n_matching, entries) = self.skip_matching_entries(entries).await?;
264264

265265
tracing::debug!(
266-
commit_index = self.commit_index,
266+
%self.committed,
267267
n_matching,
268268
entries = %entries.summary(),
269269
"skip matching entries",
@@ -278,7 +278,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
278278

279279
// commit index must not > last_log_id.index
280280
// This is guaranteed by caller.
281-
self.commit_index = commit_index;
281+
self.committed = committed;
282282

283283
self.replicate_to_state_machine_if_needed().await?;
284284

@@ -306,7 +306,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
306306
let log_id = entries[i].log_id;
307307
let index = log_id.index;
308308

309-
if index <= self.commit_index {
309+
if index <= self.committed.index {
310310
continue;
311311
}
312312

@@ -333,7 +333,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
333333
let index = remote_log_id.index;
334334

335335
// Committed entries are always safe and are consistent to a valid leader.
336-
if index <= self.commit_index {
336+
if index <= self.committed.index {
337337
return Ok(true);
338338
}
339339

@@ -410,10 +410,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
410410
}
411411

412412
// If we don't have any new entries to replicate, then do nothing.
413-
if self.commit_index <= self.last_applied.index {
413+
if self.committed <= self.last_applied {
414414
tracing::debug!(
415-
"commit_index({}) <= last_applied({}), return",
416-
self.commit_index,
415+
"committed({}) <= last_applied({}), return",
416+
self.committed,
417417
self.last_applied
418418
);
419419
return Ok(());
@@ -423,7 +423,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
423423

424424
let entries = self
425425
.storage
426-
.get_log_entries(self.last_applied.index + 1..=self.commit_index)
426+
.get_log_entries(self.last_applied.index + 1..=self.committed.index)
427427
.await
428428
.map_err(|e| self.map_storage_error(e))?;
429429

@@ -452,11 +452,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
452452
/// from the AppendEntries RPC handler.
453453
#[tracing::instrument(level = "debug", skip(self))]
454454
async fn initial_replicate_to_state_machine(&mut self) -> Result<(), RaftError> {
455-
let stop = std::cmp::min(self.commit_index, self.last_log_id.index) + 1;
455+
let stop = std::cmp::min(self.committed.index, self.last_log_id.index) + 1;
456456
let start = self.last_applied.index + 1;
457457
let storage = self.storage.clone();
458458

459-
tracing::debug!(start, stop, self.commit_index, %self.last_log_id, "start stop");
459+
tracing::debug!(start, stop, %self.committed, %self.last_log_id, "start stop");
460460

461461
// when self.commit_index is not initialized, e.g. the first heartbeat from leader always has a commit_index to
462462
// be 0, because the leader needs one round of heartbeat to find out the commit index.

async-raft/src/core/client.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
139139
leader_id: self.core.id,
140140
prev_log_id: node.matched,
141141
entries: vec![],
142-
leader_commit: self.core.commit_index,
142+
leader_commit: self.core.committed,
143143
};
144144
let target = *id;
145145
let network = self.core.network.clone();
@@ -273,8 +273,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
273273
self.awaiting_committed.push(req);
274274
} else {
275275
// Else, there are no voting nodes for replication, so the payload is now committed.
276-
self.core.commit_index = entry_arc.log_id.index;
277-
tracing::debug!(self.core.commit_index, "update commit index, no need to replicate");
276+
self.core.committed = entry_arc.log_id;
277+
tracing::debug!(%self.core.committed, "update committed, no need to replicate");
278+
278279
self.leader_report_metrics();
279280
self.client_request_post_commit(req).await;
280281
}
@@ -283,7 +284,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
283284
let _ = node.repl_stream.repl_tx.send((
284285
RaftEvent::Replicate {
285286
entry: entry_arc.clone(),
286-
commit_index: self.core.commit_index,
287+
committed: self.core.committed,
287288
},
288289
tracing::debug_span!("CH"),
289290
));

async-raft/src/core/install_snapshot.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
204204
.map_err(|e| self.map_storage_error(e))?;
205205

206206
tracing::debug!("update after apply or install-snapshot: {:?}", changes);
207-
println!("update after apply or install-snapshot: {:?}", changes);
208207

209208
// After installing snapshot, no inconsistent log is removed.
210209
// This does not affect raft consistency.
@@ -219,16 +218,16 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
219218
// snapshot is installed
220219
self.last_applied = last_applied;
221220

222-
if self.commit_index < self.last_applied.index {
223-
self.commit_index = self.last_applied.index;
221+
if self.committed < self.last_applied {
222+
self.committed = self.last_applied;
224223
}
225224
if self.last_log_id < self.last_applied {
226225
self.last_log_id = self.last_applied;
227226
}
228227

229228
// There could be unknown membership in the snapshot.
230229
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?;
231-
println!("storage membership: {:?}", membership);
230+
tracing::debug!("storage membership: {:?}", membership);
232231

233232
self.update_membership(membership)?;
234233

async-raft/src/core/mod.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
103103
/// The target state of the system.
104104
target_state: State,
105105

106-
/// The index of the last known committed entry.
106+
/// The log id of the last known committed entry.
107107
///
108-
/// I.e.:
108+
/// Committed means:
109109
/// - a log that is replicated to a quorum of the cluster and it is of the term of the leader.
110110
/// - A quorum could be a joint quorum.
111-
commit_index: u64,
111+
committed: LogId,
112112

113113
/// The log id of the highest log entry which has been applied to the local state machine.
114114
last_applied: LogId,
@@ -182,14 +182,14 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
182182
network,
183183
storage,
184184
target_state: State::Follower,
185-
commit_index: 0,
186-
last_applied: LogId { term: 0, index: 0 },
185+
committed: LogId::new(0, 0),
186+
last_applied: LogId::new(0, 0),
187187
current_term: 0,
188188
current_leader: None,
189189
voted_for: None,
190-
last_log_id: LogId { term: 0, index: 0 },
190+
last_log_id: LogId::new(0, 0),
191191
snapshot_state: None,
192-
snapshot_last_log_id: LogId { term: 0, index: 0 },
192+
snapshot_last_log_id: LogId::new(0, 0),
193193
has_completed_initial_replication_to_sm: false,
194194
last_heartbeat: None,
195195
next_election_timeout: None,
@@ -213,10 +213,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
213213
self.voted_for = state.hard_state.voted_for;
214214
self.effective_membership = state.last_membership.clone();
215215
self.last_applied = state.last_applied;
216+
216217
// NOTE: this is repeated here for clarity. It is unsafe to initialize the node's commit
217218
// index to any other value. The commit index must be determined by a leader after
218219
// successfully committing a new log to the cluster.
219-
self.commit_index = 0;
220+
self.committed = LogId::new(0, 0);
220221

221222
// Fetch the most recent snapshot in the system.
222223
if let Some(snapshot) = self.storage.get_current_snapshot().await.map_err(|err| self.map_storage_error(err))? {

async-raft/src/core/replication.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
4040
self.core.current_term,
4141
self.core.config.clone(),
4242
self.core.last_log_id,
43-
self.core.commit_index,
43+
self.core.committed,
4444
self.core.network.clone(),
4545
self.core.storage.clone(),
4646
self.replication_tx.clone(),
@@ -116,24 +116,24 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
116116
self.update_leader_metrics(target, matched);
117117
}
118118

119-
if matched.index <= self.core.commit_index {
119+
if matched <= self.core.committed {
120120
self.leader_report_metrics();
121121
return Ok(());
122122
}
123123

124-
let commit_index = self.calc_commit_index();
124+
let commit_index = self.calc_commit_log_id();
125125

126126
// Determine if we have a new commit index, accounting for joint consensus.
127127
// If a new commit index has been established, then update a few needed elements.
128128

129-
if commit_index > self.core.commit_index {
130-
self.core.commit_index = commit_index;
129+
if commit_index > self.core.committed {
130+
self.core.committed = commit_index;
131131

132132
// Update all replication streams based on new commit index.
133133
for node in self.nodes.values() {
134134
let _ = node.repl_stream.repl_tx.send((
135-
RaftEvent::UpdateCommitIndex {
136-
commit_index: self.core.commit_index,
135+
RaftEvent::UpdateCommittedLogId {
136+
committed: self.core.committed,
137137
},
138138
tracing::debug_span!("CH"),
139139
));
@@ -144,7 +144,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
144144
.awaiting_committed
145145
.iter()
146146
.enumerate()
147-
.take_while(|(_idx, elem)| elem.entry.log_id.index <= self.core.commit_index)
147+
.take_while(|(_idx, elem)| elem.entry.log_id <= self.core.committed)
148148
.last()
149149
.map(|(idx, _)| idx);
150150

@@ -168,16 +168,16 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
168168
}
169169

170170
#[tracing::instrument(level = "trace", skip(self))]
171-
fn calc_commit_index(&self) -> u64 {
171+
fn calc_commit_log_id(&self) -> LogId {
172172
let repl_indexes = self.get_match_log_indexes();
173173

174174
let committed = self.core.effective_membership.membership.greatest_majority_value(&repl_indexes);
175175

176-
*committed.unwrap_or(&self.core.commit_index)
176+
*committed.unwrap_or(&self.core.committed)
177177
}
178178

179179
/// Collect indexes of the greatest matching log on every replica(include the leader itself)
180-
fn get_match_log_indexes(&self) -> BTreeMap<NodeId, u64> {
180+
fn get_match_log_indexes(&self) -> BTreeMap<NodeId, LogId> {
181181
let node_ids = self.core.effective_membership.membership.all_nodes();
182182

183183
let mut res = BTreeMap::new();
@@ -194,7 +194,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
194194
// and that new leader may overrides any lower term logs.
195195
// Thus it is not considered as committed.
196196
if matched.term == self.core.current_term {
197-
res.insert(*id, matched.index);
197+
res.insert(*id, matched);
198198
}
199199
}
200200

async-raft/src/raft.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -501,8 +501,8 @@ pub struct AppendEntriesRequest<D: AppData> {
501501
#[serde(bound = "D: AppData")]
502502
pub entries: Vec<Entry<D>>,
503503

504-
/// The leader's commit index.
505-
pub leader_commit: u64,
504+
/// The leader's committed log id.
505+
pub leader_commit: LogId,
506506
}
507507

508508
impl<D: AppData> MessageSummary for AppendEntriesRequest<D> {

async-raft/src/raft_types.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub struct LogId {
1414

1515
impl From<(u64, u64)> for LogId {
1616
fn from(v: (u64, u64)) -> Self {
17-
LogId { term: v.0, index: v.1 }
17+
LogId::new(v.0, v.1)
1818
}
1919
}
2020

@@ -26,6 +26,10 @@ impl Display for LogId {
2626

2727
impl LogId {
2828
pub fn new(term: u64, index: u64) -> Self {
29+
if term == 0 || index == 0 {
30+
assert_eq!(index, 0, "zero-th log entry must be (0,0), but {}, {}", term, index);
31+
assert_eq!(term, 0, "zero-th log entry must be (0,0), but {}, {}", term, index);
32+
}
2933
LogId { term, index }
3034
}
3135
}

0 commit comments

Comments
 (0)