@@ -311,7 +311,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
311
311
ClientOrInternalResponseTx :: Client ( tx) => {
312
312
match & req. entry . payload {
313
313
EntryPayload :: Normal ( inner) => {
314
- match self . apply_entry_to_state_machine ( & req. entry . log_id . index , & inner. data ) . await {
314
+ match self . apply_entry_to_state_machine ( & req. entry . log_id , & inner. data ) . await {
315
315
Ok ( data) => {
316
316
let _ = tx. send ( Ok ( ClientWriteResponse {
317
317
index : req. entry . log_id . index ,
@@ -346,23 +346,28 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
346
346
347
347
/// Apply the given log entry to the state machine.
348
348
#[ tracing:: instrument( level = "trace" , skip( self , entry) ) ]
349
- pub ( super ) async fn apply_entry_to_state_machine ( & mut self , index : & u64 , entry : & D ) -> RaftResult < R > {
349
+ pub ( super ) async fn apply_entry_to_state_machine ( & mut self , log_id : & LogId , entry : & D ) -> RaftResult < R > {
350
350
// First, we just ensure that we apply any outstanding up to, but not including, the index
351
351
// of the given entry. We need to be able to return the data response from applying this
352
352
// entry to the state machine.
353
353
//
354
354
// Note that this would only ever happen if a node had unapplied logs from before becoming leader.
355
+
356
+ let index = log_id. index ;
357
+
355
358
let expected_next_index = self . core . last_applied + 1 ;
356
- if index != & expected_next_index {
359
+ if index != expected_next_index {
357
360
let entries = self
358
361
. core
359
362
. storage
360
- . get_log_entries ( expected_next_index, * index)
363
+ . get_log_entries ( expected_next_index, index)
361
364
. await
362
365
. map_err ( |err| self . core . map_fatal_storage_error ( err) ) ?;
366
+
363
367
if let Some ( entry) = entries. last ( ) {
364
368
self . core . last_applied = entry. log_id . index ;
365
369
}
370
+
366
371
let data_entries: Vec < _ > = entries
367
372
. iter ( )
368
373
. filter_map ( |entry| match & entry. payload {
@@ -388,7 +393,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
388
393
}
389
394
}
390
395
// Apply this entry to the state machine and return its data response.
391
- let res = self . core . storage . apply_entry_to_state_machine ( index, entry) . await . map_err ( |err| {
396
+ let res = self . core . storage . apply_entry_to_state_machine ( & index, entry) . await . map_err ( |err| {
392
397
if err. downcast_ref :: < S :: ShutdownError > ( ) . is_some ( ) {
393
398
// If this is an instance of the storage impl's shutdown error, then trigger shutdown.
394
399
self . core . map_fatal_storage_error ( err)
@@ -397,7 +402,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
397
402
RaftError :: RaftStorage ( err)
398
403
}
399
404
} ) ;
400
- self . core . last_applied = * index;
405
+ self . core . last_applied = index;
401
406
self . leader_report_metrics ( ) ;
402
407
res
403
408
}
0 commit comments