|
| 1 | +use std::collections::HashSet; |
| 2 | + |
1 | 3 | use tokio::sync::oneshot;
|
2 | 4 |
|
3 | 5 | use crate::config::SnapshotPolicy;
|
@@ -173,35 +175,25 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
|
173 | 175 | }
|
174 | 176 |
|
175 | 177 | // Determine the new commit index of the current membership config nodes.
|
176 |
| - let mut indices_c0 = self |
177 |
| - .nodes |
178 |
| - .iter() |
179 |
| - .filter(|(id, _)| self.core.membership.members.contains(id)) |
180 |
| - .map(|(_, node)| (node.match_index, node.match_term)) |
181 |
| - .collect::<Vec<_>>(); |
182 |
| - if !self.is_stepping_down { |
183 |
| - indices_c0.push((self.core.last_log_index, self.core.last_log_term)); |
184 |
| - } |
| 178 | + let indices_c0 = self.get_match_indexes(&self.core.membership.members); |
185 | 179 | tracing::debug!("indices_c0: {:?}", indices_c0);
|
| 180 | + |
186 | 181 | let commit_index_c0 =
|
187 | 182 | calculate_new_commit_index(indices_c0, self.core.commit_index, self.core.current_term);
|
188 |
| - |
189 | 183 | tracing::debug!("commit_index_c0: {}", commit_index_c0);
|
190 | 184 |
|
191 | 185 | tracing::debug!("c1: {:?}", self.core.membership.members_after_consensus);
|
192 |
| - tracing::debug!("nodes: {:?}", self.nodes.keys().collect::<Vec<_>>()); |
| 186 | + tracing::debug!( |
| 187 | + "follower nodes: {:?}", |
| 188 | + self.nodes.keys().collect::<Vec<_>>() |
| 189 | + ); |
193 | 190 |
|
194 | 191 | // If we are in joint consensus, then calculate the new commit index of the new membership config nodes.
|
195 | 192 | let mut commit_index_c1 = commit_index_c0; // Defaults to just matching C0.
|
196 | 193 | if let Some(members) = &self.core.membership.members_after_consensus {
|
197 |
| - let indices_c1 = self |
198 |
| - .nodes |
199 |
| - .iter() |
200 |
| - .filter(|(id, _)| members.contains(id)) |
201 |
| - .map(|(_, node)| (node.match_index, node.match_term)) |
202 |
| - .collect(); |
203 |
| - |
| 194 | + let indices_c1 = self.get_match_indexes(members); |
204 | 195 | tracing::debug!("indices_c1: {:?}", indices_c1);
|
| 196 | + |
205 | 197 | commit_index_c1 = calculate_new_commit_index(
|
206 | 198 | indices_c1,
|
207 | 199 | self.core.commit_index,
|
@@ -252,6 +244,39 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
|
252 | 244 | Ok(())
|
253 | 245 | }
|
254 | 246 |
|
| 247 | + /// Extract the matching index/term of the replication state of specified nodes. |
| 248 | + fn get_match_indexes(&self, node_ids: &HashSet<NodeId>) -> Vec<(u64, u64)> { |
| 249 | + tracing::debug!("to get match indexes of nodes: {:?}", node_ids); |
| 250 | + |
| 251 | + let mut rst = Vec::with_capacity(node_ids.len()); |
| 252 | + for id in node_ids.iter() { |
| 253 | + // this node is me, the leader |
| 254 | + if *id == self.core.id { |
| 255 | + // TODO: can it be sure that self.core.last_log_term is the term of this leader? |
| 256 | + rst.push((self.core.last_log_index, self.core.last_log_term)); |
| 257 | + continue; |
| 258 | + } |
| 259 | + |
| 260 | + // this node is a follower |
| 261 | + let repl_state = self.nodes.get(id); |
| 262 | + if let Some(x) = repl_state { |
| 263 | + rst.push((x.match_index, x.match_term)); |
| 264 | + continue; |
| 265 | + } |
| 266 | + |
| 267 | + // this node is a non-voter |
| 268 | + let repl_state = self.non_voters.get(id); |
| 269 | + if let Some(x) = repl_state { |
| 270 | + rst.push((x.state.match_index, x.state.match_term)); |
| 271 | + continue; |
| 272 | + } |
| 273 | + panic!("node {} not found in nodes or non-voters", id); |
| 274 | + } |
| 275 | + |
| 276 | + tracing::debug!("match indexes of nodes: {:?}: {:?}", node_ids, rst); |
| 277 | + rst |
| 278 | + } |
| 279 | + |
255 | 280 | /// Handle events from replication streams requesting for snapshot info.
|
256 | 281 | #[tracing::instrument(level = "trace", skip(self, tx))]
|
257 | 282 | async fn handle_needs_snapshot(
|
|
0 commit comments