Skip to content

Commit f4460a5

Browse files
committed
Fixes
1 parent 0d95609 commit f4460a5

File tree

4 files changed

+197
-85
lines changed

4 files changed

+197
-85
lines changed

src/ra_server.erl

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,18 +1181,14 @@ handle_follower(#append_entries_rpc{term = Term,
11811181
State0 = update_term(Term, State00#{leader_id => LeaderId}),
11821182
case has_log_entry_or_snapshot(PLIdx, PLTerm, Log00) of
11831183
{entry_ok, Log0} ->
1184-
{LocalLastIdx, _} = ra_log:last_index_term(Log0),
11851184
% filter entries already seen
11861185
{Log1, Entries, LastValidIdx} =
11871186
drop_existing(Log0, Entries0, PLIdx),
1188-
?DEBUG_IF(length(Entries) < length(Entries0),
1189-
"~ts DUPES last index ~b incoming last ~b ~b ~b",
1190-
[log_id(State0), LocalLastIdx, PLIdx,
1191-
length(Entries0), length(Entries)]),
11921187
case Entries of
11931188
[] ->
11941189
%% all entries have already been written
11951190
ok = incr_counter(Cfg, ?C_RA_SRV_AER_RECEIVED_FOLLOWER_EMPTY, 1),
1191+
{LocalLastIdx, _} = ra_log:last_index_term(Log1),
11961192
{LogIsValidated, Log2} =
11971193
case Entries0 of
11981194
[] when LocalLastIdx > PLIdx ->
@@ -1229,18 +1225,22 @@ handle_follower(#append_entries_rpc{term = Term,
12291225
{NextState, State,
12301226
[cast_reply(Id, LeaderId, Reply) | Effects]};
12311227
false ->
1232-
%% We need to ensure we make progress in case
1233-
%% the last applied index is lower than the last
1234-
%% valid index
1228+
%% We need to ensure we make progress in case the
1229+
%% leader is having to resend already received
1230+
%% entries in order to validate, e.g. after a
1231+
%% term_mismatch, hence we reply with success but
1232+
%% only up to the last index we already had
12351233
LastValidatedIdx = max(LastApplied, LastValidIdx),
12361234
?DEBUG("~ts: append_entries_rpc with last index ~b "
12371235
" including ~b entries did not validate local log. "
1238-
"Requesting resend from index ~b, last index ~b",
1239-
[LogId, PLIdx, length(Entries0),
1240-
LastValidatedIdx + 1, LocalLastIdx]),
1241-
{Reply, State} =
1242-
mismatch_append_entries_reply(Term, LastValidatedIdx,
1243-
State0#{log => Log2}),
1236+
"Local last index ~b",
1237+
[LogId, PLIdx, length(Entries0), LocalLastIdx]),
1238+
{LVTerm, State} = fetch_term(LastValidatedIdx, State0),
1239+
Reply = #append_entries_reply{term = CurTerm,
1240+
success = true,
1241+
next_index = LastValidatedIdx + 1,
1242+
last_index = LastValidatedIdx,
1243+
last_term = LVTerm},
12441244
{follower, State,
12451245
[cast_reply(Id, LeaderId, Reply)]}
12461246
end;
@@ -1250,14 +1250,12 @@ handle_follower(#append_entries_rpc{term = Term,
12501250
LeaderCommit),
12511251
%% assert we're not writing below the last applied index
12521252
?assertNot(FstIdx < LastApplied),
1253-
State2 = lists:foldl(fun pre_append_log_follower/2,
1254-
State1, Entries),
1253+
State = lists:foldl(fun pre_append_log_follower/2,
1254+
State1, Entries),
12551255
case ra_log:write(Entries, Log1) of
12561256
{ok, Log2} ->
1257-
{NextState, State, Effects} =
1258-
evaluate_commit_index_follower(State2#{log => Log2},
1259-
Effects0),
1260-
{NextState, State, Effects};
1257+
evaluate_commit_index_follower(State#{log => Log2},
1258+
Effects0);
12611259
{error, wal_down} ->
12621260
%% at this point we know the wal process exited
12631261
%% but we dont know exactly which in flight messages
@@ -1269,9 +1267,9 @@ handle_follower(#append_entries_rpc{term = Term,
12691267
%% it wrote for each UID into an ETS table and query
12701268
%% this.
12711269
{await_condition,
1272-
State2#{log => Log1,
1273-
condition =>
1274-
#{predicate_fun => fun wal_down_condition/2}},
1270+
State#{log => Log1,
1271+
condition =>
1272+
#{predicate_fun => fun wal_down_condition/2}},
12751273
Effects0};
12761274
{error, _} = Err ->
12771275
exit(Err)
@@ -1346,23 +1344,23 @@ handle_follower(#heartbeat_rpc{leader_id = LeaderId,
13461344
cfg := #cfg{id = Id}} = State) ->
13471345
Reply = heartbeat_reply(CurTerm, QueryIndex),
13481346
{follower, State, [cast_reply(Id, LeaderId, Reply)]};
1349-
handle_follower({ra_log_event, {written, TERM, _} = Evt},
1350-
State0 = #{log := Log0,
1351-
cfg := #cfg{id = Id},
1352-
leader_id := LeaderId,
1353-
current_term := TERM = Term})
1354-
when LeaderId =/= undefined ->
1347+
handle_follower({ra_log_event, Evt}, #{log := Log0,
1348+
cfg := #cfg{id = Id},
1349+
leader_id := LeaderId,
1350+
current_term := Term} = State0) ->
1351+
% forward events to ra_log
1352+
% if the last written changes then send an append entries reply
1353+
LW = ra_log:last_written(Log0),
13551354
{Log, Effects} = ra_log:handle_event(Evt, Log0),
13561355
State = State0#{log => Log},
1357-
%% only reply with success if the written event relates to the current
1358-
%% term. this avoids accidentally confirming overwritten indexes too
1359-
%% early
1360-
Reply = append_entries_reply(Term, true, State),
1361-
{follower, State, [cast_reply(Id, LeaderId, Reply) | Effects]};
1362-
handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
1363-
% simply forward all other events to ra_log
1364-
{Log, Effects} = ra_log:handle_event(Evt, Log0),
1365-
{follower, State#{log => Log}, Effects};
1356+
case LW =/= ra_log:last_written(Log) of
1357+
true when LeaderId =/= undefined ->
1358+
%% last written has changed so we need to send an AER reply
1359+
Reply = append_entries_reply(Term, true, State),
1360+
{follower, State, [cast_reply(Id, LeaderId, Reply) | Effects]};
1361+
_ ->
1362+
{follower, State, Effects}
1363+
end;
13661364
handle_follower(#pre_vote_rpc{},
13671365
#{cfg := #cfg{log_id = LogId},
13681366
membership := Membership} = State) when Membership =/= voter ->

src/ra_server_proc.erl

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,7 @@ leader(info, {update_peer, PeerId, Update}, State0) ->
560560
State = update_peer(PeerId, Update, State0),
561561
{keep_state, State, []};
562562
leader(_, tick_timeout, State0) ->
563-
% {State1, RpcEffs} = make_rpcs(State0),
564-
% ?DEBUG_IF(length(RpcEffs) > 0, "~ts tick_timeout made ~b rpcs",
565-
% [log_id(State0), length(RpcEffs)]),
566-
State1 = State0,
567-
RpcEffs = [],
563+
{State1, RpcEffs} = make_rpcs(State0),
568564
ServerState0 = State1#state.server_state,
569565
Effects = ra_server:tick(ServerState0),
570566
ServerState = ra_server:log_tick(ServerState0),
@@ -1355,7 +1351,6 @@ handle_effect(_RaftState, {send_rpc, To, Rpc}, _,
13551351
incr_counter(Conf, ?C_RA_SRV_MSGS_SENT, 1),
13561352
Self ! {update_peer, To, #{status => normal}}
13571353
end),
1358-
?DEBUG("~ts suspended ~w", [log_id(State0), To]),
13591354
{update_peer(To, #{status => suspended}, State0), Actions};
13601355
noconnect ->
13611356
%% for noconnects just allow it to pipeline and catch up later

test/ra_log_memory.erl

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
fetch_term/2,
2121
flush/2,
2222
next_index/1,
23+
snapshot_state/1,
24+
set_snapshot_state/2,
2325
install_snapshot/3,
2426
read_snapshot/1,
2527
recover_snapshot/1,
@@ -40,13 +42,13 @@
4042
-include("src/ra.hrl").
4143

4244
-type ra_log_memory_meta() :: #{atom() => term()}.
43-
-type snapshot() :: {snapshot_meta(), term()}.
4445

4546
-record(state, {last_index = 0 :: ra_index(),
4647
last_written = {0, 0} :: ra_idxterm(), % only here to fake the async api of the file based one
47-
entries = #{0 => {0, undefined}} :: #{ra_term() => {ra_index(), term()}},
48+
entries = #{0 => {0, undefined}} ::
49+
#{ra_term() => {ra_index(), term()}},
4850
meta = #{} :: ra_log_memory_meta(),
49-
snapshot :: option(snapshot())}).
51+
snapshot :: option({ra_snapshot:meta(), term()})}).
5052

5153
-type ra_log_memory_state() :: #state{} | ra_log:state().
5254

@@ -168,13 +170,18 @@ last_written(#state{last_written = LastWritten}) ->
168170

169171
-spec handle_event(ra_log:event_body(), ra_log_memory_state()) ->
170172
{ra_log_memory_state(), list()}.
171-
handle_event({written, Term, {_From, Idx}}, State0) ->
173+
handle_event({written, Term, {_From, Idx} = Range0}, State0) ->
172174
case fetch_term(Idx, State0) of
173175
{Term, State} ->
174176
{State#state{last_written = {Idx, Term}}, []};
175177
_ ->
176-
% if the term doesn't match we just ignore it
177-
{State0, []}
178+
case ra_range:limit(Idx, Range0) of
179+
undefined ->
180+
% if the term doesn't match we just ignore it
181+
{State0, []};
182+
Range ->
183+
handle_event({written, Term, Range}, State0)
184+
end
178185
end;
179186
handle_event(_Evt, State0) ->
180187
{State0, []}.
@@ -203,17 +210,17 @@ fetch_term(Idx, #state{entries = Log} = State) ->
203210

204211
flush(_Idx, Log) -> Log.
205212

206-
-spec install_snapshot(SnapshotMeta :: ra_snapshot:meta(),
207-
SnapshotData :: term(),
208-
State :: ra_log_memory_state()) ->
209-
{ra_log_memory_state(), term(), list()}.
210-
install_snapshot(Meta, Data, #state{entries = Log0} = State) ->
211-
Index = maps:get(index, Meta),
213+
install_snapshot({Index, Term}, Data, #state{entries = Log0} = State) ->
214+
% Index = maps:get(index, Meta),
215+
% Term = maps:get(term, Meta),
212216
% discard log
213217
Log = maps:filter(fun (K, _) -> K > Index end, Log0),
214-
{State#state{entries = Log, snapshot = {Meta, Data}}, Data, []};
215-
install_snapshot(_Meta, Data, State) ->
216-
{State, Data, []}.
218+
{State#state{entries = Log,
219+
last_index = Index,
220+
last_written = {Index, Term},
221+
snapshot = Data}, []};
222+
install_snapshot(_Meta, _Data, State) ->
223+
{State, []}.
217224

218225
-spec read_snapshot(State :: ra_log_memory_state()) ->
219226
{ok, ra_snapshot:meta(), term()}.
@@ -224,9 +231,14 @@ read_snapshot(#state{snapshot = {Meta, Data}}) ->
224231
undefined | {ok, ra_snapshot:meta(), term()}.
225232
recover_snapshot(#state{snapshot = undefined}) ->
226233
undefined;
227-
recover_snapshot(#state{snapshot = {Meta, Data}}) ->
228-
{Meta, Data}.
234+
recover_snapshot(#state{snapshot = {Meta, MacState}}) ->
235+
{Meta, MacState}.
229236

237+
set_snapshot_state(SnapState, State) ->
238+
State#state{snapshot = SnapState}.
239+
240+
snapshot_state(State) ->
241+
State#state.snapshot.
230242

231243
-spec read_meta(Key :: ra_log:ra_meta_key(), State :: ra_log_memory_state()) ->
232244
option(term()).
@@ -235,10 +247,11 @@ read_meta(Key, #state{meta = Meta}) ->
235247

236248
-spec snapshot_index_term(State :: ra_log_memory_state()) ->
237249
ra_idxterm().
238-
snapshot_index_term(#state{snapshot = {#{index := Idx, term := Term}, _}}) ->
239-
{Idx, Term};
240250
snapshot_index_term(#state{snapshot = undefined}) ->
241-
undefined.
251+
undefined;
252+
snapshot_index_term(#state{snapshot = {#{index := Idx,
253+
term := Term}, _}}) ->
254+
{Idx, Term}.
242255

243256
-spec update_release_cursor(ra_index(), ra_cluster(),
244257
ra_machine:version(), term(),

0 commit comments

Comments
 (0)