Skip to content

Commit 1cb588f

Browse files
committed
ra_server_proc: Fix handling of local query replies
[Why] The reply of local query was formatted as: {reply, From, Reply} However, the code was called in two different contexts: 1. When the local query was executed immediately, the reply tuple was interpreted by `gen_statem` and thus the reply was sent by the local Ra server process. Exactly what we wanted. 2. When the local query execution depended on a condition and might have to be delayed, it could be executed right away (and the reply tuple was interpreted by `gen_statem`), or the execution could be delayed and at the time of execution, the reply tuple could be interpreted as a Ra effect. Unfortunately, the same tuple has a very different meaning depending on who interprets it: * `gen_statem` will send the reply regardless of the Raft state of the Ra server process. * The Ra effect will only send the reply if the Ra server process is acting as the leader. The delayed query was always processed by the Ra server that received it, regardless of its state. This means that if the Ra server is not a leader and the reply tuple is interpreted as a Ra effect, the caller will never get an answer. This led to some timeouts in Khepri and nasty bugs in RabbitMQ. In particular, it caused the `peer_discovery_classic_config_SUITE` to fail quite frequently in RabbitMQ CI. [How] First, the patch ensures the reply tuple is always interpreted as a Ra effect. Then, it sets the `{member, ...}` replier option in the reply effect to the Ra server that executes the query. This way, the reply is always emitted emitted and by the correct Ra server, regardless of its Raft state.
1 parent 64eee62 commit 1cb588f

File tree

1 file changed

+30
-16
lines changed

1 file changed

+30
-16
lines changed

src/ra_server_proc.erl

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,9 @@ leader(EventType, flush_commands,
530530
end;
531531
leader({call, _From} = EventType, {local_query, QueryFun}, State) ->
532532
leader(EventType, {local_query, QueryFun, #{}}, State);
533-
leader({call, From}, {local_query, QueryFun, Options}, State) ->
534-
perform_or_delay_local_query(leader, From, QueryFun, Options, State);
533+
leader({call, From} = EventType, {local_query, QueryFun, Options}, State) ->
534+
perform_or_delay_local_query(
535+
leader, EventType, From, QueryFun, Options, State);
535536
leader({call, From}, {state_query, Spec}, State) ->
536537
Reply = {ok, do_state_query(Spec, State), id(State)},
537538
{keep_state, State, [{reply, From, Reply}]};
@@ -626,8 +627,9 @@ candidate(cast, {command, _Priority,
626627
{keep_state, State, []};
627628
candidate({call, _From} = EventType, {local_query, QueryFun}, State) ->
628629
candidate(EventType, {local_query, QueryFun, #{}}, State);
629-
candidate({call, From}, {local_query, QueryFun, Options}, State) ->
630-
perform_or_delay_local_query(candidate, From, QueryFun, Options, State);
630+
candidate({call, From} = EventType, {local_query, QueryFun, Options}, State) ->
631+
perform_or_delay_local_query(
632+
candidate, EventType, From, QueryFun, Options, State);
631633
candidate({call, From}, {state_query, Spec}, State) ->
632634
Reply = {ok, do_state_query(Spec, State), id(State)},
633635
{keep_state, State, [{reply, From, Reply}]};
@@ -674,8 +676,9 @@ pre_vote(cast, {command, _Priority,
674676
{keep_state, State, []};
675677
pre_vote({call, _From} = EventType, {local_query, QueryFun}, State) ->
676678
pre_vote(EventType, {local_query, QueryFun, #{}}, State);
677-
pre_vote({call, From}, {local_query, QueryFun, Options}, State) ->
678-
perform_or_delay_local_query(pre_vote, From, QueryFun, Options, State);
679+
pre_vote({call, From} = EventType, {local_query, QueryFun, Options}, State) ->
680+
perform_or_delay_local_query(
681+
pre_vote, EventType, From, QueryFun, Options, State);
679682
pre_vote({call, From}, {state_query, Spec}, State) ->
680683
Reply = {ok, do_state_query(Spec, State), id(State)},
681684
{keep_state, State, [{reply, From, Reply}]};
@@ -756,8 +759,9 @@ follower(cast, {command, _Priority,
756759
{keep_state, State, []};
757760
follower({call, _From} = EventType, {local_query, QueryFun}, State) ->
758761
follower(EventType, {local_query, QueryFun, #{}}, State);
759-
follower({call, From}, {local_query, QueryFun, Options}, State) ->
760-
perform_or_delay_local_query(follower, From, QueryFun, Options, State);
762+
follower({call, From} = EventType, {local_query, QueryFun, Options}, State) ->
763+
perform_or_delay_local_query(
764+
follower, EventType, From, QueryFun, Options, State);
761765
follower({call, From}, {state_query, Spec}, State) ->
762766
Reply = {ok, do_state_query(Spec, State), id(State)},
763767
{keep_state, State, [{reply, From, Reply}]};
@@ -880,9 +884,10 @@ receive_snapshot(EventType, {local_call, Msg}, State) ->
880884
receive_snapshot(EventType, Msg, State);
881885
receive_snapshot({call, _From} = EventType, {local_query, QueryFun}, State) ->
882886
receive_snapshot(EventType, {local_query, QueryFun, #{}}, State);
883-
receive_snapshot({call, From}, {local_query, QueryFun, Options}, State) ->
887+
receive_snapshot({call, From} = EventType, {local_query, QueryFun, Options},
888+
State) ->
884889
perform_or_delay_local_query(
885-
receive_snapshot, From, QueryFun, Options, State);
890+
receive_snapshot, EventType, From, QueryFun, Options, State);
886891
receive_snapshot({call, From}, {state_query, Spec}, State) ->
887892
Reply = {ok, do_state_query(Spec, State), id(State)},
888893
{keep_state, State, [{reply, From, Reply}]};
@@ -973,9 +978,10 @@ await_condition(EventType, {local_call, Msg}, State) ->
973978
await_condition(EventType, Msg, State);
974979
await_condition({call, _From} = EventType, {local_query, QueryFun}, State) ->
975980
await_condition(EventType, {local_query, QueryFun, #{}}, State);
976-
await_condition({call, From}, {local_query, QueryFun, Options}, State) ->
981+
await_condition({call, From} = EventType, {local_query, QueryFun, Options},
982+
State) ->
977983
perform_or_delay_local_query(
978-
await_condition, From, QueryFun, Options, State);
984+
await_condition, EventType, From, QueryFun, Options, State);
979985
await_condition({call, From}, {state_query, Spec}, State) ->
980986
Reply = {ok, do_state_query(Spec, State), id(State)},
981987
{keep_state, State, [{reply, From, Reply}]};
@@ -1208,9 +1214,17 @@ handle_await_condition(Msg, State) ->
12081214
handle_raft_state(?FUNCTION_NAME, Msg, State).
12091215

12101216
perform_or_delay_local_query(
1217+
RaftState, EventType, From, QueryFun, Options, State0) ->
1218+
{NextState, State1, Effects} = do_perform_or_delay_local_query(
1219+
RaftState, From, QueryFun, Options,
1220+
State0),
1221+
{State, Actions} = handle_effects(RaftState, Effects, EventType, State1),
1222+
{NextState, State, Actions}.
1223+
1224+
do_perform_or_delay_local_query(
12111225
RaftState, From, QueryFun, Options,
12121226
#state{conf = Conf,
1213-
server_state = ServerState,
1227+
server_state = #{cfg := #cfg{id = ThisMember}} = ServerState,
12141228
pending_queries = PendingQueries} = State) ->
12151229
%% The caller might decide it wants the query to be executed only after a
12161230
%% specific index has been applied on the local node. It can specify that
@@ -1226,7 +1240,7 @@ perform_or_delay_local_query(
12261240
undefined ->
12271241
Leader = determine_leader(RaftState, State),
12281242
Reply = perform_local_query(QueryFun, Leader, ServerState, Conf),
1229-
{keep_state, State, [{reply, From, Reply}]};
1243+
{keep_state, State, [{reply, From, Reply, {member, ThisMember}}]};
12301244
Condition ->
12311245
PendingQuery = {Condition, From, QueryFun},
12321246
PendingQueries1 = [PendingQuery | PendingQueries],
@@ -1266,7 +1280,7 @@ perform_pending_queries(RaftState, LastApplied,
12661280

12671281
perform_pending_queries1(
12681282
{{applied, {TargetIndex, TargetTerm}}, From, QueryFun} = PendingQuery,
1269-
{PendingQueries0, Actions0, ServerState0},
1283+
{PendingQueries0, Actions0, #{cfg := #cfg{id = ThisMember}} = ServerState0},
12701284
#{last_applied := LastApplied, leader := Leader, conf := Conf})
12711285
when TargetIndex =< LastApplied ->
12721286
{Term, ServerState} = ra_server:fetch_term(TargetIndex, ServerState0),
@@ -1280,7 +1294,7 @@ perform_pending_queries1(
12801294
%% evaluated. The reply will be discarded by Erlang because the
12811295
%% process alias in `From' is inactive after the timeout.
12821296
Reply = perform_local_query(QueryFun, Leader, ServerState, Conf),
1283-
Actions = [{reply, From, Reply} | Actions0],
1297+
Actions = [{reply, From, Reply, {member, ThisMember}} | Actions0],
12841298
{PendingQueries0, Actions, ServerState};
12851299
_ ->
12861300
PendingQueries = [PendingQuery | PendingQueries0],

0 commit comments

Comments
 (0)