@@ -15,6 +15,7 @@ use maplit::btreeset;
15
15
use pin_utils:: pin_mut;
16
16
use tokio:: io:: AsyncRead ;
17
17
use tokio:: io:: AsyncSeek ;
18
+ use tokio:: io:: AsyncWriteExt ;
18
19
use tokio:: sync:: mpsc;
19
20
use tokio:: sync:: oneshot;
20
21
use tokio:: sync:: watch;
@@ -30,12 +31,14 @@ use crate::config::RuntimeConfig;
30
31
use crate :: config:: SnapshotPolicy ;
31
32
use crate :: core:: building_state;
32
33
use crate :: core:: snapshot_state;
34
+ use crate :: core:: snapshot_state:: SnapshotRequestId ;
35
+ use crate :: core:: streaming_state:: Streaming ;
33
36
use crate :: core:: ServerState ;
34
37
use crate :: core:: SnapshotResult ;
35
38
use crate :: display_ext:: DisplaySlice ;
36
39
use crate :: engine:: Command ;
37
40
use crate :: engine:: Engine ;
38
- use crate :: engine:: SendResult ;
41
+ use crate :: engine:: Respond ;
39
42
use crate :: entry:: FromAppData ;
40
43
use crate :: entry:: RaftEntry ;
41
44
use crate :: entry:: RaftPayload ;
@@ -46,6 +49,7 @@ use crate::error::ForwardToLeader;
46
49
use crate :: error:: InitializeError ;
47
50
use crate :: error:: QuorumNotEnough ;
48
51
use crate :: error:: RPCError ;
52
+ use crate :: error:: SnapshotMismatch ;
49
53
use crate :: error:: Timeout ;
50
54
use crate :: log_id:: LogIdOptionExt ;
51
55
use crate :: log_id:: RaftLogId ;
@@ -62,8 +66,11 @@ use crate::raft::AppendEntriesTx;
62
66
use crate :: raft:: ClientWriteResponse ;
63
67
use crate :: raft:: ClientWriteTx ;
64
68
use crate :: raft:: ExternalCommand ;
69
+ use crate :: raft:: InstallSnapshotRequest ;
70
+ use crate :: raft:: InstallSnapshotResponse ;
71
+ use crate :: raft:: InstallSnapshotTx ;
65
72
use crate :: raft:: RaftMsg ;
66
- use crate :: raft:: RaftRespTx ;
73
+ use crate :: raft:: ResultSender ;
67
74
use crate :: raft:: VoteRequest ;
68
75
use crate :: raft:: VoteResponse ;
69
76
use crate :: raft:: VoteTx ;
@@ -87,8 +94,10 @@ use crate::RaftNetworkFactory;
87
94
use crate :: RaftStorage ;
88
95
use crate :: RaftTypeConfig ;
89
96
use crate :: SnapshotId ;
97
+ use crate :: SnapshotSegmentId ;
90
98
use crate :: StorageError ;
91
99
use crate :: StorageHelper ;
100
+ use crate :: StorageIOError ;
92
101
use crate :: Update ;
93
102
use crate :: Vote ;
94
103
@@ -218,7 +227,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
218
227
#[ tracing:: instrument( level = "trace" , skip( self , tx) ) ]
219
228
pub ( super ) async fn handle_check_is_leader_request (
220
229
& mut self ,
221
- tx : RaftRespTx < ( ) , CheckIsLeaderError < C :: NodeId , C :: Node > > ,
230
+ tx : ResultSender < ( ) , CheckIsLeaderError < C :: NodeId , C :: Node > > ,
222
231
) -> Result < ( ) , StorageError < C :: NodeId > > {
223
232
// Setup sentinel values to track when we've received majority confirmation of leadership.
224
233
@@ -370,7 +379,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
370
379
& mut self ,
371
380
changes : ChangeMembers < C :: NodeId , C :: Node > ,
372
381
retain : bool ,
373
- tx : RaftRespTx < ClientWriteResponse < C > , ClientWriteError < C :: NodeId , C :: Node > > ,
382
+ tx : ResultSender < ClientWriteResponse < C > , ClientWriteError < C :: NodeId , C :: Node > > ,
374
383
) {
375
384
let res = self . engine . state . membership_state . change_handler ( ) . apply ( changes, retain) ;
376
385
let new_membership = match res {
@@ -519,17 +528,111 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
519
528
pub ( crate ) fn handle_initialize (
520
529
& mut self ,
521
530
member_nodes : BTreeMap < C :: NodeId , C :: Node > ,
522
- tx : RaftRespTx < ( ) , InitializeError < C :: NodeId , C :: Node > > ,
531
+ tx : ResultSender < ( ) , InitializeError < C :: NodeId , C :: Node > > ,
523
532
) {
524
533
let membership = Membership :: from ( member_nodes) ;
525
534
526
535
let entry = C :: Entry :: new_membership ( LogId :: default ( ) , membership) ;
527
536
let res = self . engine . initialize ( entry) ;
528
- self . engine . output . push_command ( Command :: SendInitializeResult {
529
- send : SendResult :: new ( res, tx) ,
537
+ self . engine . output . push_command ( Command :: Respond {
538
+ resp : Respond :: new ( res, tx) ,
530
539
} ) ;
531
540
}
532
541
542
+ /// Invoked by leader to send chunks of a snapshot to a follower.
543
+ ///
544
+ /// Leaders always send chunks in order. It is important to note that, according to the Raft
545
+ /// spec, a log may only have one snapshot at any time. As snapshot contents are application
546
+ /// specific, the Raft log will only store a pointer to the snapshot file along with the
547
+ /// index & term.
548
+ #[ tracing:: instrument( level = "debug" , skip_all) ]
549
+ pub ( crate ) async fn handle_install_snapshot_request (
550
+ & mut self ,
551
+ req : InstallSnapshotRequest < C > ,
552
+ tx : InstallSnapshotTx < C :: NodeId > ,
553
+ ) -> Result < ( ) , StorageError < C :: NodeId > > {
554
+ // TODO: move receiving to another thread.
555
+ tracing:: debug!( req = display( req. summary( ) ) ) ;
556
+
557
+ let snapshot_meta = req. meta . clone ( ) ;
558
+ let done = req. done ;
559
+ let offset = req. offset ;
560
+
561
+ let req_id = SnapshotRequestId :: new ( * req. vote . leader_id ( ) , snapshot_meta. snapshot_id . clone ( ) , offset) ;
562
+
563
+ let res = self . engine . vote_handler ( ) . accept_vote ( & req. vote , tx, |state, _rejected| {
564
+ Ok ( InstallSnapshotResponse {
565
+ vote : * state. vote_ref ( ) ,
566
+ } )
567
+ } ) ;
568
+
569
+ let tx = match res {
570
+ Ok ( tx) => tx,
571
+ Err ( _) => return Ok ( ( ) ) ,
572
+ } ;
573
+
574
+ let curr_id = self . snapshot_state . streaming . as_ref ( ) . map ( |s| & s. snapshot_id ) ;
575
+
576
+ // Changed to another stream. re-init snapshot state.
577
+ if curr_id != Some ( & req. meta . snapshot_id ) {
578
+ if req. offset > 0 {
579
+ let mismatch = SnapshotMismatch {
580
+ expect : SnapshotSegmentId {
581
+ id : snapshot_meta. snapshot_id . clone ( ) ,
582
+ offset : 0 ,
583
+ } ,
584
+ got : SnapshotSegmentId {
585
+ id : snapshot_meta. snapshot_id . clone ( ) ,
586
+ offset,
587
+ } ,
588
+ } ;
589
+
590
+ self . engine . output . push_command ( Command :: Respond {
591
+ resp : Respond :: new ( Err ( mismatch. into ( ) ) , tx) ,
592
+ } ) ;
593
+
594
+ return Ok ( ( ) ) ;
595
+ }
596
+
597
+ let snapshot_data = self . storage . begin_receiving_snapshot ( ) . await ?;
598
+ self . snapshot_state . streaming = Some ( Streaming :: new ( req. meta . snapshot_id . clone ( ) , snapshot_data) ) ;
599
+ }
600
+
601
+ tracing:: info!( "Received snapshot request: {:?}" , req_id) ;
602
+
603
+ let streaming = self . snapshot_state . streaming . as_mut ( ) . unwrap ( ) ;
604
+
605
+ // Receive the data.
606
+ streaming. receive ( req) . await ?;
607
+
608
+ if done {
609
+ let streaming = self . snapshot_state . streaming . take ( ) . unwrap ( ) ;
610
+ let mut data = streaming. snapshot_data ;
611
+
612
+ data. as_mut ( )
613
+ . shutdown ( )
614
+ . await
615
+ . map_err ( |e| StorageIOError :: write_snapshot ( snapshot_meta. signature ( ) , & e) ) ?;
616
+
617
+ self . received_snapshot . insert ( snapshot_meta. snapshot_id . clone ( ) , data) ;
618
+ }
619
+
620
+ if done {
621
+ self . engine . following_handler ( ) . install_snapshot ( snapshot_meta) ;
622
+ }
623
+
624
+ self . engine . output . push_command ( Command :: Respond {
625
+ resp : Respond :: new (
626
+ Ok ( InstallSnapshotResponse {
627
+ vote : * self . engine . state . vote_ref ( ) ,
628
+ } ) ,
629
+ tx,
630
+ ) ,
631
+ } ) ;
632
+
633
+ Ok ( ( ) )
634
+ }
635
+
533
636
fn handle_building_snapshot_result (
534
637
& mut self ,
535
638
result : SnapshotResult < C :: NodeId , C :: Node > ,
@@ -619,7 +722,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
619
722
620
723
/// Reject a request due to the Raft node being in a state which prohibits the request.
621
724
#[ tracing:: instrument( level = "trace" , skip( self , tx) ) ]
622
- pub ( crate ) fn reject_with_forward_to_leader < T , E > ( & self , tx : RaftRespTx < T , E > )
725
+ pub ( crate ) fn reject_with_forward_to_leader < T , E > ( & self , tx : ResultSender < T , E > )
623
726
where E : From < ForwardToLeader < C :: NodeId , C :: Node > > {
624
727
let mut leader_id = self . current_leader ( ) ;
625
728
let leader_node = self . get_leader_node ( leader_id) ;
@@ -926,8 +1029,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
926
1029
tracing:: debug!( req = display( req. summary( ) ) , func = func_name!( ) ) ;
927
1030
928
1031
let resp = self . engine . handle_vote_req ( req) ;
929
- self . engine . output . push_command ( Command :: SendVoteResult {
930
- send : SendResult :: new ( Ok ( resp) , tx) ,
1032
+ self . engine . output . push_command ( Command :: Respond {
1033
+ resp : Respond :: new ( Ok ( resp) , tx) ,
931
1034
} ) ;
932
1035
}
933
1036
@@ -1368,25 +1471,14 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
1368
1471
debug_assert ! ( got. is_some( ) , "there has to be a buffered snapshot data" ) ;
1369
1472
}
1370
1473
Command :: InstallSnapshot { snapshot_meta } => {
1371
- let snapshot_data = self . received_snapshot . remove ( & snapshot_meta. snapshot_id ) ;
1474
+ // Safe unwrap: it is guaranteed that the snapshot data is buffered. Otherwise it is a bug.
1475
+ let data = self . received_snapshot . remove ( & snapshot_meta. snapshot_id ) . unwrap ( ) ;
1372
1476
1373
- if let Some ( data) = snapshot_data {
1374
- self . storage . install_snapshot ( & snapshot_meta, data) . await ?;
1375
- tracing:: debug!( "Done install_snapshot, meta: {:?}" , snapshot_meta) ;
1376
- } else {
1377
- unreachable ! ( "buffered snapshot not found: snapshot meta: {:?}" , snapshot_meta)
1378
- }
1379
- }
1380
- Command :: SendVoteResult { send } => {
1381
- send. send ( ) ;
1382
- }
1383
- Command :: SendAppendEntriesResult { send } => {
1384
- send. send ( ) ;
1385
- }
1386
- Command :: SendInstallSnapshotResult { send } => {
1387
- send. send ( ) ;
1477
+ tracing:: info!( "Start to install_snapshot, meta: {:?}" , snapshot_meta) ;
1478
+ self . storage . install_snapshot ( & snapshot_meta, data) . await ?;
1479
+ tracing:: info!( "Done install_snapshot, meta: {:?}" , snapshot_meta) ;
1388
1480
}
1389
- Command :: SendInitializeResult { send } => {
1481
+ Command :: Respond { resp : send } => {
1390
1482
send. send ( ) ;
1391
1483
}
1392
1484
}
0 commit comments