Skip to content

Commit 7b1ea3d

Browse files
authored
Merge pull request #175 from esl/users/pool_supervision_tree
Users supervision tree
2 parents e63ae4b + 623a89f commit 7b1ea3d

File tree

11 files changed

+550
-151
lines changed

11 files changed

+550
-151
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ jobs:
1111
name: ${{ matrix.test-type }} test on OTP ${{matrix.otp_vsn}}
1212
strategy:
1313
matrix:
14-
otp_vsn: ['26.1', '25.3', '24.3']
14+
otp_vsn: ['26.2', '25.3', '24.3']
1515
rebar_vsn: ['3.22.0']
1616
test-type: ['regular', 'integration']
1717
runs-on: 'ubuntu-22.04'
1818
steps:
19-
- uses: actions/checkout@v3
19+
- uses: actions/checkout@v4
2020
- uses: erlef/setup-beam@v1
2121
with:
2222
otp-version: ${{ matrix.otp_vsn }}

elvis.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
{elvis_style, invalid_dynamic_call, #{ignore => [amoc_code_server_SUITE]}},
2323
{elvis_style, dont_repeat_yourself, #{min_complexity => 50}},
2424
{elvis_style, no_debug_call, disable},
25-
{elvis_style, no_block_expressions, #{ignore => [amoc_code_server_SUITE]}},
25+
{elvis_style, no_block_expressions, #{ignore => [amoc_code_server_SUITE, controller_SUITE]}},
2626
{elvis_style, no_throw, disable},
2727
{elvis_style, no_import, disable}
2828
]},

guides/distributed-run.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ amoc_dist:remove(50, Force).
3636
```elixir
3737
:amoc_dist.remove(50, force).
3838
```
39-
Remove 50 sessions.
39+
Remove 50 sessions.
4040

4141
Where ``Force`` is a boolean of value:
4242

4343
* ``true`` - to kill the user processes using ``supervisor:terminate_child/2`` function
4444
* ``false`` - to send ``exit(User,shutdown)`` signal to the user process (can be ignored by the user)
4545

46-
All the users are `temporary` children of the `simple_one_for_one` supervisor with the `shutdown` key set to `2000`.
46+
All the users are `temporary` children of a `simple_one_for_one` supervisor with the `shutdown` key set to `2000`.
47+
48+
Note that removal operation is asynchronous, and if we call `amoc_controller:remove_users/2` two times in a row, it may select the same users for removal.
4749

4850
Also all the user processes trap exits.
4951

guides/local-run.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ amoc:remove(10, true).
2828
:amoc.remove(10, true).
2929
```
3030

31+
Note that removal operation is asynchronous, and if we call `amoc_controller:remove_users/2` two times in a row, it may select the same users for removal.
32+
33+
Also note that all the user processes trap exits.
34+
3135
#### Many independent Amoc nodes
3236

3337
Sometimes a need arises to run several Amoc nodes independently from each other.

integration_test/extra_code_paths/path1/dummy_helper.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ test_amoc_dist() ->
6666
get_users_info(SlaveNodes) ->
6767
Users = [{Node, Id} ||
6868
Node <- SlaveNodes,
69-
{Id, _Pid} <- rpc:call(Node, ets, tab2list, [amoc_users])],
69+
{_Pid, Id} <- rpc:call(Node, amoc_users_sup, get_all_children, [])],
7070
Ids = lists:usort([Id || {_, Id} <- Users]),
7171
Nodes = lists:usort([Node || {Node, _} <- Users]),
7272
N = length(Ids),

src/amoc_controller.erl

Lines changed: 67 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
%% @copyright 2023 Erlang Solutions Ltd.
1+
%% @copyright 2024 Erlang Solutions Ltd.
22
%% @doc Main controller of a node, responsible for the scenario and the users
33
%%
44
%% Note that this module should be rarely used, APIs are fully exposed by `amoc' and `amoc_dist'
@@ -8,16 +8,14 @@
88
-behaviour(gen_server).
99

1010
-define(SERVER, ?MODULE).
11-
-define(USERS_TABLE, amoc_users).
1211

1312
-required_variable(#{name => interarrival, default_value => 50,
14-
verification => {?MODULE, positive_integer, 1},
13+
verification => {?MODULE, non_neg_integer, 1},
1514
description => "a delay between creating the processes for two "
1615
"consecutive users (ms, def: 50ms)",
1716
update => {?MODULE, maybe_update_interarrival_timer, 2}}).
1817

1918
-record(state, {scenario :: amoc:scenario() | undefined,
20-
no_of_users = 0 :: user_count(),
2119
last_user_id = 0 :: last_user_id(),
2220
status = idle :: idle | running | terminating | finished |
2321
{error, any()} | disabled,
@@ -67,7 +65,9 @@
6765
%% ------------------------------------------------------------------
6866
%% Parameters verification functions
6967
%% ------------------------------------------------------------------
70-
-export([maybe_update_interarrival_timer/2, positive_integer/1]).
68+
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).
69+
70+
-export([zero_users_running/0]).
7171

7272
%% ------------------------------------------------------------------
7373
%% gen_server Function Exports
@@ -77,6 +77,7 @@
7777
%% ------------------------------------------------------------------
7878
%% API Function Definitions
7979
%% ------------------------------------------------------------------
80+
8081
%% @private
8182
-spec start_link() -> {ok, pid()}.
8283
start_link() ->
@@ -121,17 +122,24 @@ disable() ->
121122
gen_server:call(?SERVER, disable).
122123

123124
%% @private
124-
-spec positive_integer(any()) -> boolean().
125-
positive_integer(Interarrival) ->
126-
is_integer(Interarrival) andalso Interarrival > 0.
125+
-spec non_neg_integer(any()) -> boolean().
126+
non_neg_integer(Interarrival) ->
127+
is_integer(Interarrival) andalso Interarrival >= 0.
127128

128129
%% @private
129130
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
130131
maybe_update_interarrival_timer(interarrival, _) ->
131132
gen_server:cast(?SERVER, maybe_update_interarrival_timer).
133+
134+
%% @private
135+
-spec zero_users_running() -> ok.
136+
zero_users_running() ->
137+
gen_server:cast(?SERVER, zero_users_running).
138+
132139
%% ------------------------------------------------------------------
133140
%% gen_server Function Definitions
134141
%% ------------------------------------------------------------------
142+
135143
%% @private
136144
-spec init([]) -> {ok, state()}.
137145
init([]) ->
@@ -174,6 +182,9 @@ handle_call(_Request, _From, State) ->
174182
-spec handle_cast(any(), state()) -> {noreply, state()}.
175183
handle_cast(maybe_update_interarrival_timer, State) ->
176184
{noreply, maybe_update_interarrival_timer(State)};
185+
handle_cast(zero_users_running, State) ->
186+
NewSate = handle_zero_users_running(State),
187+
{noreply, NewSate};
177188
handle_cast(_Msg, State) ->
178189
{noreply, State}.
179190

@@ -182,8 +193,8 @@ handle_cast(_Msg, State) ->
182193
handle_info(start_user, State) ->
183194
NewSate = handle_start_user(State),
184195
{noreply, NewSate};
185-
handle_info({'DOWN', _, process, Pid, _}, State) ->
186-
NewSate = handle_stop_user(Pid, State),
196+
handle_info(start_all_users, State) ->
197+
NewSate = handle_start_all_users(State),
187198
{noreply, NewSate};
188199
handle_info(_Msg, State) ->
189200
{noreply, State}.
@@ -209,12 +220,15 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
209220
{{error, {invalid_status, Status}}, State}.
210221

211222
-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
212-
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
213-
terminate_scenario(State),
214-
{ok, State#state{status = finished}};
215223
handle_stop_scenario(#state{status = running} = State) ->
216-
terminate_all_users(),
217-
{ok, State#state{status = terminating}};
224+
case amoc_users_sup:count_no_of_users() of
225+
0 ->
226+
terminate_scenario(State),
227+
{ok, State#state{status = finished}};
228+
_ ->
229+
amoc_users_sup:terminate_all_children(),
230+
{ok, State#state{status = terminating}}
231+
end;
218232
handle_stop_scenario(#state{status = Status} = State) ->
219233
{{error, {invalid_status, Status}}, State}.
220234

@@ -240,29 +254,25 @@ handle_add(StartId, EndId, #state{last_user_id = LastId,
240254
NewUsers = lists:seq(StartId, EndId),
241255
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
242256
NewTRef = maybe_start_timer(TRef),
243-
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef,
244-
last_user_id = EndId}};
257+
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
245258
handle_add(_StartId, _EndId, #state{status = running} = State) ->
246259
{{error, invalid_range}, State};
247260
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
248261
{{error, {invalid_status, Status}}, State}.
249262

250263
-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
251264
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
252-
amoc_telemetry:execute([controller, users], #{count => Count},
265+
CountRemove = amoc_users_sup:stop_children(Count, ForceRemove),
266+
amoc_telemetry:execute([controller, users], #{count => CountRemove},
253267
#{scenario => Scenario, type => remove}),
254-
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
255-
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
256-
'$end_of_table' -> []
257-
end,
258-
amoc_users_sup:stop_children(Pids, ForceRemove),
259-
{ok, length(Pids)};
268+
{ok, CountRemove};
260269
handle_remove(_Count, _ForceRemove, #state{status = Status}) ->
261270
{error, {invalid_status, Status}}.
262271

263272
-spec handle_status(state()) -> amoc_status().
264273
handle_status(#state{status = running, scenario = Scenario,
265-
no_of_users = N, last_user_id = LastId}) ->
274+
last_user_id = LastId}) ->
275+
N = amoc_users_sup:count_no_of_users(),
266276
{running, #{scenario => Scenario, currently_running_users => N, highest_user_id => LastId}};
267277
handle_status(#state{status = terminating, scenario = Scenario}) ->
268278
{terminating, Scenario};
@@ -279,33 +289,26 @@ handle_disable(#state{status = Status} = State) ->
279289

280290
-spec handle_start_user(state()) -> state().
281291
handle_start_user(#state{create_users = [UserId | T],
282-
no_of_users = N,
283292
scenario = Scenario,
284293
scenario_state = ScenarioState} = State) ->
285-
start_user(Scenario, UserId, ScenarioState),
286-
State#state{create_users = T, no_of_users = N + 1};
294+
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
295+
State#state{create_users = T};
287296
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
288297
State#state{tref = maybe_stop_timer(TRef)}.
289298

290-
-spec handle_stop_user(pid(), state()) -> state().
291-
handle_stop_user(Pid, State) ->
292-
case ets:match(?USERS_TABLE, {'$1', Pid}, 1) of
293-
{[[UserId]], _} ->
294-
ets:delete(?USERS_TABLE, UserId),
295-
dec_no_of_users(State);
296-
_ ->
297-
State
298-
end.
299+
-spec handle_start_all_users(state()) -> state().
300+
handle_start_all_users(#state{create_users = AllUsers,
301+
scenario = Scenario,
302+
scenario_state = ScenarioState,
303+
tref = TRef} = State) ->
304+
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
305+
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.
299306

300307
%% ------------------------------------------------------------------
301308
%% helpers
302309
%% ------------------------------------------------------------------
303310
-spec start_tables() -> ok.
304311
start_tables() -> %% ETS creation
305-
?USERS_TABLE = ets:new(?USERS_TABLE, [named_table,
306-
ordered_set,
307-
protected,
308-
{read_concurrency, true}]),
309312
amoc_config_utils:create_amoc_config_ets(),
310313
ok.
311314

@@ -321,11 +324,12 @@ init_scenario(Scenario, Settings) ->
321324
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
322325
amoc_scenario:terminate(Scenario, ScenarioState).
323326

324-
-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
325-
maybe_start_timer(undefined) ->
326-
{ok, TRef} = timer:send_interval(interarrival(), start_user),
327-
TRef;
328-
maybe_start_timer(TRef) -> TRef.
327+
-spec handle_zero_users_running(state()) -> state().
328+
handle_zero_users_running(#state{status = terminating} = State) ->
329+
terminate_scenario(State),
330+
State#state{status = finished};
331+
handle_zero_users_running(State) ->
332+
State.
329333

330334
-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
331335
maybe_stop_timer(undefined) ->
@@ -334,43 +338,28 @@ maybe_stop_timer(TRef) ->
334338
{ok, cancel} = timer:cancel(TRef),
335339
undefined.
336340

337-
-spec start_user(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok.
338-
start_user(Scenario, Id, ScenarioState) ->
339-
{ok, Pid} = supervisor:start_child(amoc_users_sup, [Scenario, Id, ScenarioState]),
340-
ets:insert(?USERS_TABLE, {Id, Pid}),
341-
erlang:monitor(process, Pid),
342-
ok.
343-
344-
-spec terminate_all_users() -> any().
345-
terminate_all_users() ->
346-
%stop all the users
347-
Match = ets:match_object(?USERS_TABLE, '$1', 200),
348-
terminate_all_users(Match).
349-
350-
%% ets:continuation/0 type is unfortunately not exported from the ets module.
351-
-spec terminate_all_users({tuple(), term()} | '$end_of_table') -> ok.
352-
terminate_all_users({Objects, Continuation}) ->
353-
Pids = [Pid || {_Id, Pid} <- Objects],
354-
amoc_users_sup:stop_children(Pids, true),
355-
Match = ets:match_object(Continuation),
356-
terminate_all_users(Match);
357-
terminate_all_users('$end_of_table') -> ok.
358-
359-
-spec dec_no_of_users(state()) -> state().
360-
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
361-
terminate_scenario(State),
362-
State#state{no_of_users = 0, status = finished};
363-
dec_no_of_users(#state{no_of_users = N} = State) ->
364-
State#state{no_of_users = N - 1}.
365-
366-
-spec interarrival() -> interarrival().
367-
interarrival() ->
341+
-spec get_interarrival() -> interarrival().
342+
get_interarrival() ->
368343
amoc_config:get(interarrival).
369344

370345
-spec maybe_update_interarrival_timer(state()) -> state().
371346
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
372347
State;
373348
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
374349
{ok, cancel} = timer:cancel(TRef),
375-
{ok, NewTRef} = timer:send_interval(interarrival(), start_user),
350+
Value = get_interarrival(),
351+
NewTRef = do_interarrival(Value),
376352
State#state{tref = NewTRef}.
353+
354+
-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
355+
maybe_start_timer(undefined) ->
356+
Value = get_interarrival(),
357+
do_interarrival(Value);
358+
maybe_start_timer(TRef) -> TRef.
359+
360+
do_interarrival(0) ->
361+
self() ! start_all_users,
362+
undefined;
363+
do_interarrival(Value) ->
364+
{ok, NewTRef} = timer:send_interval(Value, start_user),
365+
NewTRef.

src/amoc_sup.erl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%% @private
2-
%% @copyright 2023 Erlang Solutions Ltd.
2+
%% @copyright 2024 Erlang Solutions Ltd.
33
-module(amoc_sup).
44

55
-behaviour(supervisor).
@@ -26,12 +26,9 @@ start_link() ->
2626
%% Supervisor callbacks
2727
%% ===================================================================
2828
-spec init(term()) ->
29-
{ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(),
30-
MaxR :: non_neg_integer(), MaxT :: pos_integer()},
31-
[ChildSpec :: supervisor:child_spec()]
32-
}}.
29+
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
3330
init([]) ->
34-
{ok, {{one_for_one, 5, 10},
31+
{ok, {#{strategy => one_for_all, intensity => 0},
3532
[
3633
?SUP(amoc_users_sup, supervisor),
3734
?SUP(amoc_throttle_sup, supervisor),

src/users/amoc_user.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%% @private
2-
%% @copyright 2023 Erlang Solutions Ltd.
2+
%% @copyright 2024 Erlang Solutions Ltd.
33
-module(amoc_user).
44

55
%% API
@@ -10,15 +10,15 @@
1010
-type state() :: term().
1111

1212
-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
13-
{ok, pid()}.
13+
{ok, pid()} | {error, term()}.
1414
start_link(Scenario, Id, State) ->
1515
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).
1616

1717
-spec stop() -> no_return().
1818
stop() ->
1919
stop(self(), false).
2020

21-
-spec stop(pid(), boolean()) -> no_return() | ok | {error, any()}.
21+
-spec stop(pid(), boolean()) -> ok.
2222
stop(Pid, Force) when is_pid(Pid) ->
2323
amoc_users_sup:stop_child(Pid, Force).
2424

0 commit comments

Comments
 (0)