@@ -44,7 +44,6 @@ use crate::error::ChangeMembershipError;
44
44
use crate :: error:: CheckIsLeaderError ;
45
45
use crate :: error:: ClientWriteError ;
46
46
use crate :: error:: EmptyMembership ;
47
- use crate :: error:: ExtractFatal ;
48
47
use crate :: error:: Fatal ;
49
48
use crate :: error:: ForwardToLeader ;
50
49
use crate :: error:: InProgress ;
@@ -234,28 +233,27 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
234
233
pub ( super ) async fn handle_check_is_leader_request (
235
234
& mut self ,
236
235
tx : RaftRespTx < ( ) , CheckIsLeaderError < C :: NodeId , C :: Node > > ,
237
- ) {
236
+ ) -> Result < ( ) , StorageError < C :: NodeId > > {
238
237
// Setup sentinel values to track when we've received majority confirmation of leadership.
239
238
240
239
let em = self . engine . state . membership_state . effective ( ) ;
241
240
let mut granted = btreeset ! { self . id} ;
242
241
243
242
if em. is_quorum ( granted. iter ( ) ) {
244
243
let _ = tx. send ( Ok ( ( ) ) ) ;
245
- return ;
244
+ return Ok ( ( ) ) ;
246
245
}
247
246
248
247
// Spawn parallel requests, all with the standard timeout for heartbeats.
249
248
let mut pending = FuturesUnordered :: new ( ) ;
250
249
251
- let voter_progresses = if let Some ( l) = & self . engine . internal_server_state . leading ( ) {
250
+ let voter_progresses = {
251
+ let l = & self . engine . internal_server_state . leading ( ) . unwrap ( ) ;
252
252
l. progress
253
253
. iter ( )
254
254
. filter ( |( id, _v) | l. progress . is_voter ( id) == Some ( true ) )
255
255
. copied ( )
256
256
. collect :: < Vec < _ > > ( )
257
- } else {
258
- unreachable ! ( "it has to be a leader!!!" ) ;
259
257
} ;
260
258
261
259
for ( target, progress) in voter_progresses {
@@ -329,10 +327,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
329
327
// request.
330
328
if let AppendEntriesResponse :: HigherVote ( vote) = data {
331
329
let res = self . engine . vote_handler ( ) . handle_message_vote ( & vote) ;
332
- if let Err ( e) = self . run_engine_commands :: < Entry < C > > ( & [ ] ) . await . extract_fatal ( ) {
333
- let _ = tx. send ( Err ( e. into ( ) ) ) ;
334
- return ;
335
- }
330
+ self . run_engine_commands :: < Entry < C > > ( & [ ] ) . await ?;
331
+
336
332
if let Err ( e) = res {
337
333
// simply ignore stale responses
338
334
tracing:: warn!( target = display( target) , "vote {vote} rejected: {e}" ) ;
@@ -341,7 +337,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
341
337
// we are no longer leader so error out early
342
338
if !self . engine . state . is_leader ( & self . engine . config . id ) {
343
339
self . reject_with_forward_to_leader ( tx) ;
344
- return ;
340
+ return Ok ( ( ) ) ;
345
341
}
346
342
}
347
343
@@ -350,7 +346,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
350
346
let mem = & self . engine . state . membership_state . effective ( ) ;
351
347
if mem. is_quorum ( granted. iter ( ) ) {
352
348
let _ = tx. send ( Ok ( ( ) ) ) ;
353
- return ;
349
+ return Ok ( ( ) ) ;
354
350
}
355
351
}
356
352
@@ -362,6 +358,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
362
358
got : granted,
363
359
}
364
360
. into ( ) ) ) ;
361
+ Ok ( ( ) )
365
362
}
366
363
367
364
/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
@@ -1193,7 +1190,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
1193
1190
}
1194
1191
RaftMsg :: CheckIsLeaderRequest { tx } => {
1195
1192
if self . engine . state . is_leader ( & self . engine . config . id ) {
1196
- self . handle_check_is_leader_request ( tx) . await ;
1193
+ self . handle_check_is_leader_request ( tx) . await ? ;
1197
1194
} else {
1198
1195
self . reject_with_forward_to_leader ( tx) ;
1199
1196
}
0 commit comments