Skip to content

Replication bug fixes #516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 72 additions & 58 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -480,23 +480,26 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
State1 = put_peer(PeerId, Peer, State0),
Effects00 = maybe_promote_peer(PeerId, State1, []),
{State2, Effects0} = evaluate_quorum(State1, Effects00),
{State, Effects1} = process_pending_consistent_queries(State2,
{State3, Effects1} = process_pending_consistent_queries(State2,
Effects0),
Effects = [{next_event, info, pipeline_rpcs} | Effects1],
case State of
Effects2 = [{next_event, info, pipeline_rpcs} | Effects1],
case State3 of
#{cluster := #{Id := _}} ->
% leader is in the cluster
{leader, State, Effects};
{leader, State3, Effects2};
#{commit_index := CI,
cluster_index_term := {CITIndex, _}}
when CI >= CITIndex ->
% leader is not in the cluster and the new cluster
% config has been committed
% time to say goodbye
?INFO("~ts: leader not in new cluster - goodbye", [LogId]),
%% need to make pipelined rpcs here as cannot use next event
{State, _, Effects} =
make_pipelined_rpc_effects(State3, Effects2),
{stop, State, Effects};
_ ->
{leader, State, Effects}
{leader, State3, Effects2}
end
end;
handle_leader({PeerId, #append_entries_reply{term = Term}},
Expand Down Expand Up @@ -1225,18 +1228,22 @@ handle_follower(#append_entries_rpc{term = Term,
{NextState, State,
[cast_reply(Id, LeaderId, Reply) | Effects]};
false ->
%% We need to ensure we make progress in case
%% the last applied index is lower than the last
%% valid index
%% We need to ensure we make progress in case the
%% leader is having to resend already received
%% entries in order to validate, e.g. after a
%% term_mismatch, hence we reply with success but
%% only up to the last index we already had
LastValidatedIdx = max(LastApplied, LastValidIdx),
?DEBUG("~ts: append_entries_rpc with last index ~b "
" including ~b entries did not validate local log. "
"Requesting resend from index ~b",
[LogId, PLIdx, length(Entries0),
LastValidatedIdx + 1]),
{Reply, State} =
mismatch_append_entries_reply(Term, LastValidatedIdx,
State0#{log => Log2}),
"Local last index ~b",
[LogId, PLIdx, length(Entries0), LocalLastIdx]),
{LVTerm, State} = fetch_term(LastValidatedIdx, State0),
Reply = #append_entries_reply{term = CurTerm,
success = true,
next_index = LastValidatedIdx + 1,
last_index = LastValidatedIdx,
last_term = LVTerm},
{follower, State,
[cast_reply(Id, LeaderId, Reply)]}
end;
Expand All @@ -1246,28 +1253,20 @@ handle_follower(#append_entries_rpc{term = Term,
LeaderCommit),
%% assert we're not writing below the last applied index
?assertNot(FstIdx < LastApplied),
State2 = lists:foldl(fun pre_append_log_follower/2,
State1, Entries),
State = lists:foldl(fun pre_append_log_follower/2,
State1, Entries),
case ra_log:write(Entries, Log1) of
{ok, Log2} ->
{NextState, State, Effects} =
evaluate_commit_index_follower(State2#{log => Log2},
Effects0),
{NextState, State, Effects};
evaluate_commit_index_follower(State#{log => Log2},
Effects0);
{error, wal_down} ->
%% at this point we know the wal process exited
%% At this point we know the wal process exited
%% but we dont know exactly which in flight messages
%% made it to the wal before it crashed.
%% TODO: we cannot discover what the last index
%% the WAL wrote was anymore as the WAL does
%% not write the mem tables. We could implement something
%% alternative where the WAL writes the last index, term
%% it wrote for each UID into an ETS table and query
%% this.
{await_condition,
State2#{log => Log1,
condition =>
#{predicate_fun => fun wal_down_condition/2}},
State#{log => Log1,
condition =>
#{predicate_fun => fun wal_down_condition/2}},
Effects0};
{error, _} = Err ->
exit(Err)
Expand All @@ -1289,7 +1288,6 @@ handle_follower(#append_entries_rpc{term = Term,
transition_to => follower}}},
Effects};
{term_mismatch, OtherTerm, Log0} ->
%% NB: this is the commit index before update
LastApplied = maps:get(last_applied, State00),
?INFO("~ts: term mismatch - follower had entry at ~b with term ~b "
"but not with term ~b. "
Expand All @@ -1303,7 +1301,7 @@ handle_follower(#append_entries_rpc{term = Term,
% is rewind back and use the last applied as the last index
% and last applied + 1 as the next expected.
% This _may_ overwrite some valid entries but is probably the
% simplest way to proceed
% simplest and most reliable way to proceed
{Reply, State} = mismatch_append_entries_reply(Term, LastApplied,
State0),
Effects = [cast_reply(Id, LeaderId, Reply) | Effects0],
Expand Down Expand Up @@ -1342,20 +1340,23 @@ handle_follower(#heartbeat_rpc{leader_id = LeaderId,
cfg := #cfg{id = Id}} = State) ->
Reply = heartbeat_reply(CurTerm, QueryIndex),
{follower, State, [cast_reply(Id, LeaderId, Reply)]};
handle_follower({ra_log_event, {written, _, _} = Evt},
State0 = #{log := Log0,
cfg := #cfg{id = Id},
leader_id := LeaderId,
current_term := Term})
when LeaderId =/= undefined ->
handle_follower({ra_log_event, Evt}, #{log := Log0,
cfg := #cfg{id = Id},
leader_id := LeaderId,
current_term := Term} = State0) ->
% forward events to ra_log
% if the last written changes then send an append entries reply
LW = ra_log:last_written(Log0),
{Log, Effects} = ra_log:handle_event(Evt, Log0),
State = State0#{log => Log},
Reply = append_entries_reply(Term, true, State),
{follower, State, [cast_reply(Id, LeaderId, Reply) | Effects]};
handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
case LW =/= ra_log:last_written(Log) of
true when LeaderId =/= undefined ->
%% last written has changed so we need to send an AER reply
Reply = append_entries_reply(Term, true, State),
{follower, State, [cast_reply(Id, LeaderId, Reply) | Effects]};
_ ->
{follower, State, Effects}
end;
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId},
membership := Membership} = State) when Membership =/= voter ->
Expand Down Expand Up @@ -2081,11 +2082,15 @@ make_pipelined_rpc_effects(#{cfg := #cfg{id = Id,
end,
%% ensure we don't pass a batch size that would allow
%% the peer to go over the max pipeline count
BatchSize = min(MaxBatchSize,
MaxPipelineCount - NumInFlight),
%% we'd only really get here if Force=true so setting
%% a single entry batch size should be fine
BatchSize = max(1,
min(MaxBatchSize,
MaxPipelineCount - NumInFlight)),
{NewNextIdx, Eff, S} =
make_rpc_effect(PeerId, Peer0, BatchSize, S0,
EntryCache),
?assert(NewNextIdx >= NextIdx),
Peer = Peer0#{next_index => NewNextIdx,
commit_index_sent => CommitIndex},
NewNumInFlight = NewNextIdx - MatchIdx - 1,
Expand Down Expand Up @@ -2144,12 +2149,17 @@ make_rpc_effect(PeerId, #{next_index := Next}, MaxBatchSize,
PrevTerm, MaxBatchSize,
State#{log => Log},
EntryCache);
{LastIdx, _} ->
{SnapIdx, _} ->
?DEBUG("~ts: sending snapshot to ~w as their next index ~b "
"is lower than snapshot index ~b", [log_id(State),
PeerId, Next,
SnapIdx]),
?assert(PrevIdx < SnapIdx),
SnapState = ra_log:snapshot_state(Log),
%% don't increment the next index here as we will do
%% that once the snapshot is fully replicated
%% and we don't pipeline entries until after snapshot
{LastIdx,
{SnapIdx,
{send_snapshot, PeerId, {SnapState, Id, Term}},
State#{log => Log}}
end
Expand Down Expand Up @@ -2180,7 +2190,10 @@ log_read(From0, To, Cache, Log0) ->
{From, Entries0} = log_fold_cache(From0, To, Cache, []),
ra_log:fold(From, To, fun (E, A) -> [E | A] end, Entries0, Log0).

log_fold_cache(From, To, [{From, _, _} = Entry | Rem], Acc) ->
%% this cache is a bit so and so as it will only really work when each follower
%% begins with the same from index
log_fold_cache(From, To, [{From, _, _} = Entry | Rem], Acc)
when From =< To ->
log_fold_cache(From + 1, To, Rem, [Entry | Acc]);
log_fold_cache(From, _To, _Cache, Acc) ->
{From, Acc}.
Expand Down Expand Up @@ -2853,16 +2866,16 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyMode}},
State = case State0 of
#{cluster_index_term := {CI, CT}}
when Idx > CI andalso Term >= CT ->
?DEBUG("~ts: applying ra cluster change to ~w",
[log_id(State0), maps:keys(NewCluster)]),
?DEBUG("~ts: applying ra cluster change at index ~b to ~w",
[log_id(State0), Idx, maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
membership => get_membership(NewCluster, State0),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
?DEBUG("~ts: committing ra cluster change to ~w",
[log_id(State0), maps:keys(NewCluster)]),
?DEBUG("~ts: committing ra cluster change at index ~b to ~w",
[log_id(State0), Idx, maps:keys(NewCluster)]),
%% else just enable further cluster changes again
State0#{cluster_change_permitted => true}
end,
Expand Down Expand Up @@ -3070,17 +3083,23 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
% cluster
case Cmd of
{'$ra_cluster_change', _, Cluster, _} ->
?DEBUG("~ts: ~ts: follower applying ra cluster change to ~w",
[log_id(State), ?FUNCTION_NAME, maps:keys(Cluster)]),
State#{cluster => Cluster,
cluster_index_term => {Idx, Term}};
_ ->
% revert back to previous cluster
{PrevIdx, PrevTerm, PrevCluster} = maps:get(previous_cluster, State),
?DEBUG("~ts: ~ts: follower reverting cluster change to ~w",
[log_id(State), ?FUNCTION_NAME, maps:keys(PrevCluster)]),
State1 = State#{cluster => PrevCluster,
cluster_index_term => {PrevIdx, PrevTerm}},
pre_append_log_follower(Entry, State1)
end;
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
?DEBUG("~ts: ~ts: follower applying ra cluster change to ~w",
[log_id(State), ?FUNCTION_NAME, maps:keys(Cluster)]),
State#{cluster => Cluster,
membership => get_membership(Cluster, State),
cluster_index_term => {Idx, Term}};
Expand Down Expand Up @@ -3124,11 +3143,6 @@ mismatch_append_entries_reply(Term, LastAppliedIdx, State0) ->
State}.

append_entries_reply(Term, Success, #{log := Log} = State) ->
% we can't use the the last received idx
% as it may not have been persisted yet
% also we can't use the last writted Idx as then
% the follower may resent items that are currently waiting to
% be written.
{LWIdx, LWTerm} = ra_log:last_written(Log),
{LastIdx, _} = last_idx_term(State),
#append_entries_reply{term = Term,
Expand Down
11 changes: 7 additions & 4 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1142,17 +1142,20 @@ handle_enter(RaftState, OldRaftState,
{ServerState, Effects} = ra_server:handle_state_enter(RaftState,
OldRaftState,
ServerState0),
LastApplied = do_state_query(last_applied, State),
case RaftState == leader orelse OldRaftState == leader of
true ->
%% ensure transitions from and to leader are logged at a higher
%% level
?NOTICE("~ts: ~s -> ~s in term: ~b machine version: ~b",
?NOTICE("~ts: ~s -> ~s in term: ~b machine version: ~b, last applied ~b",
[log_id(State), OldRaftState, RaftState,
current_term(State), machine_version(State)]);
current_term(State), machine_version(State),
LastApplied]);
false ->
?DEBUG("~ts: ~s -> ~s in term: ~b machine version: ~b",
?DEBUG("~ts: ~s -> ~s in term: ~b machine version: ~b, last applied ~b",
[log_id(State), OldRaftState, RaftState,
current_term(State), machine_version(State)])
current_term(State), machine_version(State),
LastApplied])
end,
handle_effects(RaftState, Effects, cast,
State#state{server_state = ServerState}).
Expand Down
40 changes: 40 additions & 0 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ all_tests() ->
start_cluster_majority,
start_cluster_minority,
grow_cluster,
shrink_cluster_with_snapshot,
send_local_msg,
local_log_effect,
leaderboard,
Expand Down Expand Up @@ -382,6 +383,45 @@ grow_cluster(Config) ->
stop_peers(Peers),
ok.

shrink_cluster_with_snapshot(Config) ->
%% this test removes leaders to ensure the remaining cluster can
%% resume activity ok
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
Peers = start_peers([s1,s2,s3], PrivDir),
ServerIds = server_ids(ClusterName, Peers),
[A, B, C] = ServerIds,

Machine = {module, ?MODULE, #{}},
{ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds),
{ok, _, Leader1} = ra:members(ServerIds),

%% run some activity to create a snapshot
[_ = ra:process_command(Leader1, {banana, I})
|| I <- lists:seq(1, 5000)],

Fun = fun F(L0) ->
{ok, _, L} = ra:process_command(L0, banana),
F(L)
end,
Pid = spawn(fun () -> Fun(Leader1) end),
timer:sleep(100),

exit(Pid, kill),
{ok, _, _} = ra:remove_member(Leader1, Leader1),


timer:sleep(500),

{ok, _, Leader2} = ra:members(ServerIds),

ct:pal("old leader ~p, new leader ~p", [Leader1, Leader2]),
{ok, O, _} = ra:member_overview(Leader2),
ct:pal("overview2 ~p", [O]),
stop_peers(Peers),
?assertMatch(#{cluster_change_permitted := true}, O),
ok.

send_local_msg(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
Expand Down
10 changes: 6 additions & 4 deletions test/ra_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ diverged_follower(Config) ->
ra:stop_server(?SYS, F2),

%% use pipeline as wont be able to commit
ra:pipeline_command(LeaderId1, {enq, d3}, make_ref()),
ra:pipeline_command(LeaderId1, {enq, d4}, make_ref()),
[
ra:pipeline_command(LeaderId1, {enq, I}, make_ref())
|| I <- lists:seq(3, 100)],

%% stop leader
ra:stop_server(?SYS, LeaderId1),
Expand Down Expand Up @@ -393,6 +394,7 @@ diverged_follower(Config) ->

[[m1, m2, m3, m4, m5]] == lists:usort(States)
end, 50),
flush(),

ra:delete_cluster(Peers),
ok.
Expand Down Expand Up @@ -898,8 +900,8 @@ init(_) ->
state_enter(eol = S, State) ->
ct:pal("state_enter ~w ~w", [self(), S]),
[{send_msg, P, eol, ra_event} || {P, _} <- queue:to_list(State), is_pid(P)];
state_enter(S, _) ->
ct:pal("state_enter ~w ~w", [self(), S]),
state_enter(_S, _) ->
% ct:pal("state_enter ~w ~w", [self(), S]),
[].

flush() ->
Expand Down
Loading