Skip to content

Commit 324d9bc

Browse files
committed
Replication bug fixes
1. Fix bug where occasionally one follower would receive more entries then expected resulting in triggering 2. 2. Fix bug when follower received an append_entries_rpc with no new entries in it would trigger an incorrect resend scenario when throughput is high. 3. Fix bug where a late written event could accidentally confirm unwritten entries in a higher term.
1 parent 64eee62 commit 324d9bc

File tree

3 files changed

+199
-72
lines changed

3 files changed

+199
-72
lines changed

src/ra_server.erl

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,18 +1225,22 @@ handle_follower(#append_entries_rpc{term = Term,
12251225
{NextState, State,
12261226
[cast_reply(Id, LeaderId, Reply) | Effects]};
12271227
false ->
1228-
%% We need to ensure we make progress in case
1229-
%% the last applied index is lower than the last
1230-
%% valid index
1228+
%% We need to ensure we make progress in case the
1229+
%% leader is having to resend already received
1230+
%% entries in order to validate, e.g. after a
1231+
%% term_mismatch, hence we reply with success but
1232+
%% only up to the last index we already had
12311233
LastValidatedIdx = max(LastApplied, LastValidIdx),
12321234
?DEBUG("~ts: append_entries_rpc with last index ~b "
12331235
" including ~b entries did not validate local log. "
1234-
"Requesting resend from index ~b",
1235-
[LogId, PLIdx, length(Entries0),
1236-
LastValidatedIdx + 1]),
1237-
{Reply, State} =
1238-
mismatch_append_entries_reply(Term, LastValidatedIdx,
1239-
State0#{log => Log2}),
1236+
"Local last index ~b",
1237+
[LogId, PLIdx, length(Entries0), LocalLastIdx]),
1238+
{LVTerm, State} = fetch_term(LastValidatedIdx, State0),
1239+
Reply = #append_entries_reply{term = CurTerm,
1240+
success = true,
1241+
next_index = LastValidatedIdx + 1,
1242+
last_index = LastValidatedIdx,
1243+
last_term = LVTerm},
12401244
{follower, State,
12411245
[cast_reply(Id, LeaderId, Reply)]}
12421246
end;
@@ -1246,14 +1250,12 @@ handle_follower(#append_entries_rpc{term = Term,
12461250
LeaderCommit),
12471251
%% assert we're not writing below the last applied index
12481252
?assertNot(FstIdx < LastApplied),
1249-
State2 = lists:foldl(fun pre_append_log_follower/2,
1250-
State1, Entries),
1253+
State = lists:foldl(fun pre_append_log_follower/2,
1254+
State1, Entries),
12511255
case ra_log:write(Entries, Log1) of
12521256
{ok, Log2} ->
1253-
{NextState, State, Effects} =
1254-
evaluate_commit_index_follower(State2#{log => Log2},
1255-
Effects0),
1256-
{NextState, State, Effects};
1257+
evaluate_commit_index_follower(State#{log => Log2},
1258+
Effects0);
12571259
{error, wal_down} ->
12581260
%% at this point we know the wal process exited
12591261
%% but we dont know exactly which in flight messages
@@ -1265,9 +1267,9 @@ handle_follower(#append_entries_rpc{term = Term,
12651267
%% it wrote for each UID into an ETS table and query
12661268
%% this.
12671269
{await_condition,
1268-
State2#{log => Log1,
1269-
condition =>
1270-
#{predicate_fun => fun wal_down_condition/2}},
1270+
State#{log => Log1,
1271+
condition =>
1272+
#{predicate_fun => fun wal_down_condition/2}},
12711273
Effects0};
12721274
{error, _} = Err ->
12731275
exit(Err)
@@ -1342,20 +1344,23 @@ handle_follower(#heartbeat_rpc{leader_id = LeaderId,
13421344
cfg := #cfg{id = Id}} = State) ->
13431345
Reply = heartbeat_reply(CurTerm, QueryIndex),
13441346
{follower, State, [cast_reply(Id, LeaderId, Reply)]};
1345-
handle_follower({ra_log_event, {written, _, _} = Evt},
1346-
State0 = #{log := Log0,
1347-
cfg := #cfg{id = Id},
1348-
leader_id := LeaderId,
1349-
current_term := Term})
1350-
when LeaderId =/= undefined ->
1347+
handle_follower({ra_log_event, Evt}, #{log := Log0,
1348+
cfg := #cfg{id = Id},
1349+
leader_id := LeaderId,
1350+
current_term := Term} = State0) ->
1351+
% forward events to ra_log
1352+
% if the last written changes then send an append entries reply
1353+
LW = ra_log:last_written(Log0),
13511354
{Log, Effects} = ra_log:handle_event(Evt, Log0),
13521355
State = State0#{log => Log},
1353-
Reply = append_entries_reply(Term, true, State),
1354-
{follower, State, [cast_reply(Id, LeaderId, Reply) | Effects]};
1355-
handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
1356-
% simply forward all other events to ra_log
1357-
{Log, Effects} = ra_log:handle_event(Evt, Log0),
1358-
{follower, State#{log => Log}, Effects};
1356+
case LW =/= ra_log:last_written(Log) of
1357+
true when LeaderId =/= undefined ->
1358+
%% last written has changed so we need to send an AER reply
1359+
Reply = append_entries_reply(Term, true, State),
1360+
{follower, State, [cast_reply(Id, LeaderId, Reply) | Effects]};
1361+
_ ->
1362+
{follower, State, Effects}
1363+
end;
13591364
handle_follower(#pre_vote_rpc{},
13601365
#{cfg := #cfg{log_id = LogId},
13611366
membership := Membership} = State) when Membership =/= voter ->
@@ -2180,7 +2185,10 @@ log_read(From0, To, Cache, Log0) ->
21802185
{From, Entries0} = log_fold_cache(From0, To, Cache, []),
21812186
ra_log:fold(From, To, fun (E, A) -> [E | A] end, Entries0, Log0).
21822187

2183-
log_fold_cache(From, To, [{From, _, _} = Entry | Rem], Acc) ->
2188+
%% this cache is a bit so and so as it will only really work when each follower
2189+
%% begins with the same from index
2190+
log_fold_cache(From, To, [{From, _, _} = Entry | Rem], Acc)
2191+
when From =< To ->
21842192
log_fold_cache(From + 1, To, Rem, [Entry | Acc]);
21852193
log_fold_cache(From, _To, _Cache, Acc) ->
21862194
{From, Acc}.

test/ra_log_memory.erl

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
fetch_term/2,
2121
flush/2,
2222
next_index/1,
23+
snapshot_state/1,
24+
set_snapshot_state/2,
2325
install_snapshot/3,
2426
read_snapshot/1,
2527
recover_snapshot/1,
@@ -40,13 +42,13 @@
4042
-include("src/ra.hrl").
4143

4244
-type ra_log_memory_meta() :: #{atom() => term()}.
43-
-type snapshot() :: {snapshot_meta(), term()}.
4445

4546
-record(state, {last_index = 0 :: ra_index(),
4647
last_written = {0, 0} :: ra_idxterm(), % only here to fake the async api of the file based one
47-
entries = #{0 => {0, undefined}} :: #{ra_term() => {ra_index(), term()}},
48+
entries = #{0 => {0, undefined}} ::
49+
#{ra_term() => {ra_index(), term()}},
4850
meta = #{} :: ra_log_memory_meta(),
49-
snapshot :: option(snapshot())}).
51+
snapshot :: option({ra_snapshot:meta(), term()})}).
5052

5153
-type ra_log_memory_state() :: #state{} | ra_log:state().
5254

@@ -168,13 +170,18 @@ last_written(#state{last_written = LastWritten}) ->
168170

169171
-spec handle_event(ra_log:event_body(), ra_log_memory_state()) ->
170172
{ra_log_memory_state(), list()}.
171-
handle_event({written, Term, {_From, Idx}}, State0) ->
173+
handle_event({written, Term, {_From, Idx} = Range0}, State0) ->
172174
case fetch_term(Idx, State0) of
173175
{Term, State} ->
174176
{State#state{last_written = {Idx, Term}}, []};
175177
_ ->
176-
% if the term doesn't match we just ignore it
177-
{State0, []}
178+
case ra_range:limit(Idx, Range0) of
179+
undefined ->
180+
% if the term doesn't match we just ignore it
181+
{State0, []};
182+
Range ->
183+
handle_event({written, Term, Range}, State0)
184+
end
178185
end;
179186
handle_event(_Evt, State0) ->
180187
{State0, []}.
@@ -203,17 +210,17 @@ fetch_term(Idx, #state{entries = Log} = State) ->
203210

204211
flush(_Idx, Log) -> Log.
205212

206-
-spec install_snapshot(SnapshotMeta :: ra_snapshot:meta(),
207-
SnapshotData :: term(),
208-
State :: ra_log_memory_state()) ->
209-
{ra_log_memory_state(), term(), list()}.
210-
install_snapshot(Meta, Data, #state{entries = Log0} = State) ->
211-
Index = maps:get(index, Meta),
213+
install_snapshot({Index, Term}, Data, #state{entries = Log0} = State) ->
214+
% Index = maps:get(index, Meta),
215+
% Term = maps:get(term, Meta),
212216
% discard log
213217
Log = maps:filter(fun (K, _) -> K > Index end, Log0),
214-
{State#state{entries = Log, snapshot = {Meta, Data}}, Data, []};
215-
install_snapshot(_Meta, Data, State) ->
216-
{State, Data, []}.
218+
{State#state{entries = Log,
219+
last_index = Index,
220+
last_written = {Index, Term},
221+
snapshot = Data}, []};
222+
install_snapshot(_Meta, _Data, State) ->
223+
{State, []}.
217224

218225
-spec read_snapshot(State :: ra_log_memory_state()) ->
219226
{ok, ra_snapshot:meta(), term()}.
@@ -224,9 +231,14 @@ read_snapshot(#state{snapshot = {Meta, Data}}) ->
224231
undefined | {ok, ra_snapshot:meta(), term()}.
225232
recover_snapshot(#state{snapshot = undefined}) ->
226233
undefined;
227-
recover_snapshot(#state{snapshot = {Meta, Data}}) ->
228-
{Meta, Data}.
234+
recover_snapshot(#state{snapshot = {Meta, MacState}}) ->
235+
{Meta, MacState}.
229236

237+
set_snapshot_state(SnapState, State) ->
238+
State#state{snapshot = SnapState}.
239+
240+
snapshot_state(State) ->
241+
State#state.snapshot.
230242

231243
-spec read_meta(Key :: ra_log:ra_meta_key(), State :: ra_log_memory_state()) ->
232244
option(term()).
@@ -235,10 +247,11 @@ read_meta(Key, #state{meta = Meta}) ->
235247

236248
-spec snapshot_index_term(State :: ra_log_memory_state()) ->
237249
ra_idxterm().
238-
snapshot_index_term(#state{snapshot = {#{index := Idx, term := Term}, _}}) ->
239-
{Idx, Term};
240250
snapshot_index_term(#state{snapshot = undefined}) ->
241-
undefined.
251+
undefined;
252+
snapshot_index_term(#state{snapshot = {#{index := Idx,
253+
term := Term}, _}}) ->
254+
{Idx, Term}.
242255

243256
-spec update_release_cursor(ra_index(), ra_cluster(),
244257
ra_machine:version(), term(),

0 commit comments

Comments
 (0)