@@ -480,23 +480,26 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
480
480
State1 = put_peer (PeerId , Peer , State0 ),
481
481
Effects00 = maybe_promote_peer (PeerId , State1 , []),
482
482
{State2 , Effects0 } = evaluate_quorum (State1 , Effects00 ),
483
- {State , Effects1 } = process_pending_consistent_queries (State2 ,
483
+ {State3 , Effects1 } = process_pending_consistent_queries (State2 ,
484
484
Effects0 ),
485
- Effects = [{next_event , info , pipeline_rpcs } | Effects1 ],
486
- case State of
485
+ Effects2 = [{next_event , info , pipeline_rpcs } | Effects1 ],
486
+ case State3 of
487
487
#{cluster := #{Id := _ }} ->
488
488
% leader is in the cluster
489
- {leader , State , Effects };
489
+ {leader , State3 , Effects2 };
490
490
#{commit_index := CI ,
491
491
cluster_index_term := {CITIndex , _ }}
492
492
when CI >= CITIndex ->
493
493
% leader is not in the cluster and the new cluster
494
494
% config has been committed
495
495
% time to say goodbye
496
496
? INFO (" ~ts : leader not in new cluster - goodbye" , [LogId ]),
497
+ % % need to make pipelined rpcs here as cannot use next event
498
+ {State , _ , Effects } =
499
+ make_pipelined_rpc_effects (State3 , Effects2 ),
497
500
{stop , State , Effects };
498
501
_ ->
499
- {leader , State , Effects }
502
+ {leader , State3 , Effects2 }
500
503
end
501
504
end ;
502
505
handle_leader ({PeerId , # append_entries_reply {term = Term }},
@@ -2086,11 +2089,15 @@ make_pipelined_rpc_effects(#{cfg := #cfg{id = Id,
2086
2089
end ,
2087
2090
% % ensure we don't pass a batch size that would allow
2088
2091
% % the peer to go over the max pipeline count
2089
- BatchSize = min (MaxBatchSize ,
2090
- MaxPipelineCount - NumInFlight ),
2092
+ % % we'd only really get here if Force=true so setting
2093
+ % % a single entry batch size should be fine
2094
+ BatchSize = max (1 ,
2095
+ min (MaxBatchSize ,
2096
+ MaxPipelineCount - NumInFlight )),
2091
2097
{NewNextIdx , Eff , S } =
2092
2098
make_rpc_effect (PeerId , Peer0 , BatchSize , S0 ,
2093
2099
EntryCache ),
2100
+ ? assert (NewNextIdx >= NextIdx ),
2094
2101
Peer = Peer0 #{next_index => NewNextIdx ,
2095
2102
commit_index_sent => CommitIndex },
2096
2103
NewNumInFlight = NewNextIdx - MatchIdx - 1 ,
@@ -2149,12 +2156,17 @@ make_rpc_effect(PeerId, #{next_index := Next}, MaxBatchSize,
2149
2156
PrevTerm , MaxBatchSize ,
2150
2157
State #{log => Log },
2151
2158
EntryCache );
2152
- {LastIdx , _ } ->
2159
+ {SnapIdx , _ } ->
2160
+ ? DEBUG (" ~ts : sending snapshot to ~w as their next index ~b "
2161
+ " is lower than snapshot index ~b " , [log_id (State ),
2162
+ PeerId , Next ,
2163
+ SnapIdx ]),
2164
+ ? assert (PrevIdx < SnapIdx ),
2153
2165
SnapState = ra_log :snapshot_state (Log ),
2154
2166
% % don't increment the next index here as we will do
2155
2167
% % that once the snapshot is fully replicated
2156
2168
% % and we don't pipeline entries until after snapshot
2157
- {LastIdx ,
2169
+ {SnapIdx ,
2158
2170
{send_snapshot , PeerId , {SnapState , Id , Term }},
2159
2171
State #{log => Log }}
2160
2172
end
@@ -2861,16 +2873,16 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyMode}},
2861
2873
State = case State0 of
2862
2874
#{cluster_index_term := {CI , CT }}
2863
2875
when Idx > CI andalso Term >= CT ->
2864
- ? DEBUG (" ~ts : applying ra cluster change to ~w " ,
2865
- [log_id (State0 ), maps :keys (NewCluster )]),
2876
+ ? DEBUG (" ~ts : applying ra cluster change at index ~b to ~w " ,
2877
+ [log_id (State0 ), Idx , maps :keys (NewCluster )]),
2866
2878
% % we are recovering and should apply the cluster change
2867
2879
State0 #{cluster => NewCluster ,
2868
2880
membership => get_membership (NewCluster , State0 ),
2869
2881
cluster_change_permitted => true ,
2870
2882
cluster_index_term => {Idx , Term }};
2871
2883
_ ->
2872
- ? DEBUG (" ~ts : committing ra cluster change to ~w " ,
2873
- [log_id (State0 ), maps :keys (NewCluster )]),
2884
+ ? DEBUG (" ~ts : committing ra cluster change at index ~b to ~w " ,
2885
+ [log_id (State0 ), Idx , maps :keys (NewCluster )]),
2874
2886
% % else just enable further cluster changes again
2875
2887
State0 #{cluster_change_permitted => true }
2876
2888
end ,
@@ -3078,17 +3090,23 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
3078
3090
% cluster
3079
3091
case Cmd of
3080
3092
{'$ra_cluster_change' , _ , Cluster , _ } ->
3093
+ ? DEBUG (" ~ts : ~ts : follower applying ra cluster change to ~w " ,
3094
+ [log_id (State ), ? FUNCTION_NAME , maps :keys (Cluster )]),
3081
3095
State #{cluster => Cluster ,
3082
3096
cluster_index_term => {Idx , Term }};
3083
3097
_ ->
3084
3098
% revert back to previous cluster
3085
3099
{PrevIdx , PrevTerm , PrevCluster } = maps :get (previous_cluster , State ),
3100
+ ? DEBUG (" ~ts : ~ts : follower reverting cluster change to ~w " ,
3101
+ [log_id (State ), ? FUNCTION_NAME , maps :keys (PrevCluster )]),
3086
3102
State1 = State #{cluster => PrevCluster ,
3087
3103
cluster_index_term => {PrevIdx , PrevTerm }},
3088
3104
pre_append_log_follower (Entry , State1 )
3089
3105
end ;
3090
3106
pre_append_log_follower ({Idx , Term , {'$ra_cluster_change' , _ , Cluster , _ }},
3091
3107
State ) ->
3108
+ ? DEBUG (" ~ts : ~ts : follower applying ra cluster change to ~w " ,
3109
+ [log_id (State ), ? FUNCTION_NAME , maps :keys (Cluster )]),
3092
3110
State #{cluster => Cluster ,
3093
3111
membership => get_membership (Cluster , State ),
3094
3112
cluster_index_term => {Idx , Term }};
0 commit comments