From 4dfa2115d51dd9eb017035455665d02d71af315f Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 1 Dec 2023 12:19:21 +0100 Subject: [PATCH 1/6] Transform throttle log statements into telemetry events --- guides/telemetry.md | 10 ++++ src/amoc_throttle/amoc_throttle_process.erl | 55 ++++++++++++--------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index d895c666..b60fa8d8 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -78,6 +78,16 @@ measurements: #{count := 1} metadata: #{monotonic_time := integer(), name := atom()} ``` +### Throttle process internals + +Events related to internals of the throttle processes, these might expose unstable conditions you +might want to log or reconfigure: +```erlang +event_name: [amoc, throttle, process] +measurements: #{msg := binary(), process := pid()} +metadata: #{monotonic_time := integer(), name := atom(), printable_state => map()} +``` + ## Coordinator Indicates when a coordinating event was raised, like a process being added for coordination or a timeout being triggered diff --git a/src/amoc_throttle/amoc_throttle_process.erl b/src/amoc_throttle/amoc_throttle_process.erl index a144823a..4271bb66 100644 --- a/src/amoc_throttle/amoc_throttle_process.erl +++ b/src/amoc_throttle/amoc_throttle_process.erl @@ -21,8 +21,6 @@ handle_continue/2, format_status/2]). --include_lib("kernel/include/logger.hrl"). - -define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute -record(state, {can_run_fn = true :: boolean(), @@ -76,7 +74,7 @@ get_state(Pid) -> -spec init(list()) -> {ok, state(), timeout()}. init([Name, Interval, Rate]) -> - InitialState = initial_state(Interval, Rate), + InitialState = initial_state(Name, Interval, Rate), StateWithTimer = maybe_start_timer(InitialState), {ok, StateWithTimer#state{name = Name}, timeout(InitialState)}. @@ -86,7 +84,7 @@ handle_info({'DOWN', _, process, _, _}, State) -> handle_info(delay_between_executions, State) -> {noreply, State#state{can_run_fn = true}, {continue, maybe_run_fn}}; handle_info(timeout, State) -> - log_state("is inactive", State), + internal_event(<<"is inactive">>, State), {noreply, State, {continue, maybe_run_fn}}. -spec handle_cast(term(), state()) -> @@ -100,9 +98,9 @@ handle_cast(resume_process, State) -> handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) -> amoc_throttle_controller:telemetry_event(Name, request), {noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}}; -handle_cast({update, Interval, Rate}, State) -> - NewState = merge_state(initial_state(Interval, Rate), State), - log_state("state update", NewState), +handle_cast({update, Interval, Rate}, #state{name = Name} = State) -> + NewState = merge_state(initial_state(Name, Interval, Rate), State), + internal_event(<<"state update">>, NewState), {noreply, NewState, {continue, maybe_run_fn}}. -spec handle_call(term(), term(), state()) -> @@ -128,23 +126,26 @@ format_status(_Opt, [_PDict, State]) -> %% internal functions %%------------------------------------------------------------------------------ -initial_state(Interval, 0) -> - ?LOG_ERROR("invalid rate, must be higher than zero"), - initial_state(Interval, 1); -initial_state(Interval, Rate) when Rate > 0 -> - case Rate < 5 of - true -> ?LOG_ERROR("too low rate, please reduce NoOfProcesses"); - false -> ok - end, - Delay = case {Interval, Interval div Rate, Interval rem Rate} of +initial_state(Name, Interval, Rate) when Rate >= 0 -> + NewRate = case {Rate =:= 0, Rate < 5} of + {true, _} -> + internal_event(<<"invalid rate, must be higher than zero">>, Name), + 1; + {_, true} -> + internal_event(<<"too low rate, please reduce NoOfProcesses">>, Name), + Rate; + {_, false} -> + Rate + end, + Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of {0, _, _} -> 0; %% limit only No of simultaneous executions {_, I, _} when I < 10 -> - ?LOG_ERROR("too high rate, please increase NoOfProcesses"), + internal_event(<<"too high rate, please increase NoOfProcesses">>, Name), 10; {_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions; {_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 end, - #state{interval = Interval, n = Rate, max_n = Rate, delay_between_executions = Delay}. + #state{interval = Interval, n = NewRate, max_n = NewRate, delay_between_executions = Delay}. merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = MaxN}, #state{n = OldN, max_n = OldMaxN} = OldState) -> @@ -205,21 +206,27 @@ inc_n(#state{n = N, max_n = MaxN} = State) -> NewN = N + 1, case MaxN < NewN of true -> - PrintableState = printable_state(State), - ?LOG_ERROR("~nthrottle process ~p: invalid N (~p)~n", [self(), PrintableState]), + internal_event(<<"throttle proccess has invalid N">>, State), State#state{n = MaxN}; false -> State#state{n = NewN} end. -log_state(Msg, State) -> +internal_event(Msg, #state{name = Name} = State) -> PrintableState = printable_state(State), - ?LOG_DEBUG("~nthrottle process ~p: ~s (~p)~n", [self(), Msg, PrintableState]). + telemetry:execute([amoc, throttle, process], + #{msg => Msg, process => self()}, + #{printable_state => PrintableState, + monotonic_time => erlang:monotonic_time(), name => Name}); +internal_event(Msg, Name) when is_atom(Name) -> + telemetry:execute([amoc, throttle, process], + #{msg => Msg, process => self()}, + #{monotonic_time => erlang:monotonic_time(), name => Name}). printable_state(#state{} = State) -> Fields = record_info(fields, state), [_ | Values] = tuple_to_list(State#state{schedule = [], schedule_reversed = []}), StateMap = maps:from_list(lists:zip(Fields, Values)), StateMap#{ - schedule:=length(State#state.schedule), - schedule_reversed:=length(State#state.schedule_reversed)}. + schedule := length(State#state.schedule), + schedule_reversed := length(State#state.schedule_reversed)}. From 0c0b0ee58934b4887056b94b9b8478d46dc8dc97 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 1 Dec 2023 12:45:44 +0100 Subject: [PATCH 2/6] Transform config log statements into telemetry events --- guides/telemetry.md | 10 ++++++++++ src/amoc_config/amoc_config.erl | 9 ++++++--- src/amoc_config/amoc_config_env.erl | 13 +++++-------- src/amoc_config/amoc_config_verification.erl | 14 +++++++++----- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index b60fa8d8..0eaa1676 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -98,3 +98,13 @@ event_name: [amoc, coordinator, start | stop | add | reset | timeout] measurements: #{count := 1} metadata: #{monotonic_time := integer(), name := atom()} ``` + +## Config + +### Internal events +There are related to bad configuration events, they might deserve logging +```erlang +event_name: [amoc, config, get | verify | env] +measurements: #{} +metadata: #{log_class => syslog_level(), _ => _} +``` diff --git a/src/amoc_config/amoc_config.erl b/src/amoc_config/amoc_config.erl index 5e0b90df..c20f3375 100644 --- a/src/amoc_config/amoc_config.erl +++ b/src/amoc_config/amoc_config.erl @@ -3,7 +3,6 @@ %% @doc TODO -module(amoc_config). --include_lib("kernel/include/logger.hrl"). -include("amoc_config.hrl"). -export([get/1, get/2]). @@ -20,13 +19,17 @@ get(Name) -> get(Name, Default) when is_atom(Name) -> case ets:lookup(amoc_config, Name) of [] -> - ?LOG_ERROR("no scenario setting ~p", [Name]), + telemetry:execute([amoc, config, get], #{}, + #{log_class => error, msg => <<"no scenario setting">>, + scenario => Name}), throw({invalid_setting, Name}); [#module_parameter{name = Name, value = undefined}] -> Default; [#module_parameter{name = Name, value = Value}] -> Value; InvalidLookupRet -> - ?LOG_ERROR("invalid lookup return value ~p ~p", [Name, InvalidLookupRet]), + telemetry:execute([amoc, config, get], #{}, + #{log_class => error, msg => <<"invalid lookup return value">>, + scenario => Name, return => InvalidLookupRet}), throw({invalid_lookup_ret_value, InvalidLookupRet}) end. diff --git a/src/amoc_config/amoc_config_env.erl b/src/amoc_config/amoc_config_env.erl index d5a38e8d..dc0fb811 100644 --- a/src/amoc_config/amoc_config_env.erl +++ b/src/amoc_config/amoc_config_env.erl @@ -14,8 +14,6 @@ -export([get/1, get/2]). --include_lib("kernel/include/logger.hrl"). - -define(DEFAULT_PARSER_MODULE, amoc_config_parser). -callback(parse_value(string()) -> {ok, amoc_config:value()} | {error, any()}). @@ -41,12 +39,11 @@ get_os_env(Name, Default) -> case parse_value(Value, Default) of {ok, Term} -> Term; {error, Error} -> - ?LOG_ERROR("cannot parse environment variable, using default value.~n" - " parsing error: '~p'~n" - " variable name: '$~s'~n" - " variable value: '~s'~n" - " default value: '~p'~n", - [Error, EnvName, Value, Default]), + telemetry:execute( + [amoc, config, env], #{error => 1}, + #{log_class => error, error => Error, variable_name => EnvName, + variable_value => Value, default_value => Default, + msg => <<"cannot parse environment variable, using default value">>}), Default end. diff --git a/src/amoc_config/amoc_config_verification.erl b/src/amoc_config/amoc_config_verification.erl index 94514c05..71dc93d2 100644 --- a/src/amoc_config/amoc_config_verification.erl +++ b/src/amoc_config/amoc_config_verification.erl @@ -8,7 +8,6 @@ %% API -export([process_scenario_config/2]). --include_lib("kernel/include/logger.hrl"). -include("amoc_config.hrl"). %% @doc Applies the processing as provided by the `required_variable' list to the provided scenario config @@ -40,12 +39,17 @@ verify(Fun, Value) -> {true, NewValue} -> {true, NewValue}; {false, Reason} -> {false, {verification_failed, Reason}}; Ret -> - ?LOG_ERROR("invalid verification method ~p(~p), return value : ~p", - [Fun, Value, Ret]), + telemetry:execute([amoc, config, verify], #{error => 1}, + #{log_class => error, verification_method => Fun, + verification_arg => Value, verification_return => Ret, + msg => <<"invalid verification method">>}), {false, {invalid_verification_return_value, Ret}} catch C:E:S -> - ?LOG_ERROR("invalid verification method ~p(~p), exception: ~p ~p ~p", - [Fun, Value, C, E, S]), + telemetry:execute([amoc, config, verify], #{error => 1}, + #{log_class => error, verification_method => Fun, + verification_arg => Value, + kind => C, reason => E, stacktrace => S, + msg => <<"invalid verification method">>}), {false, {exception_during_verification, {C, E, S}}} end. From 33f8bb09993d62b9afb9b0e9bf9dc39488cd86eb Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 1 Dec 2023 12:50:38 +0100 Subject: [PATCH 3/6] Transform cluster log statements into telemetry events --- guides/telemetry.md | 10 ++++++++++ src/amoc_distribution/amoc_cluster.erl | 12 +++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index 0eaa1676..b80acfdb 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -108,3 +108,13 @@ event_name: [amoc, config, get | verify | env] measurements: #{} metadata: #{log_class => syslog_level(), _ => _} ``` + +## Cluster + +### Internal events +There are related to clustering events +```erlang +event_name: [amoc, cluster, connect_nodes | nodedown | master_node_down] +measurements: #{count => non_neg_integer()}, +metadata: #{node => node(), nodes => nodes(), state => map()} +``` diff --git a/src/amoc_distribution/amoc_cluster.erl b/src/amoc_distribution/amoc_cluster.erl index 3415d8d7..e736eca3 100644 --- a/src/amoc_distribution/amoc_cluster.erl +++ b/src/amoc_distribution/amoc_cluster.erl @@ -5,8 +5,6 @@ -behaviour(gen_server). -define(SERVER, ?MODULE). --include_lib("kernel/include/logger.hrl"). - %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ @@ -134,7 +132,8 @@ handle_call(_Request, _From, State) -> -spec handle_cast(any(), state()) -> {noreply, state()}. handle_cast({connect_nodes, Nodes}, State) -> - ?LOG_INFO("{connect_nodes, ~p}, state: ~p", [Nodes, state_to_map(State)]), + telemetry:execute([amoc, cluster, connect_nodes], #{count => length(Nodes)}, + #{nodes => Nodes, state => state_to_map(State)}), NewState = handle_connect_nodes(Nodes, State), schedule_timer(NewState), {noreply, NewState}; @@ -149,11 +148,14 @@ handle_info(timeout, State) -> schedule_timer(NewState), {noreply, NewState}; handle_info({nodedown, Node}, #state{master = Node} = State) -> - ?LOG_ERROR("Master node ~p is down. Halting.", [Node]), + telemetry:execute([amoc, cluster, master_node_down], + #{count => 1}, + #{node => Node, state => state_to_map(State)}), erlang:halt(), {noreply, State}; handle_info({nodedown, Node}, State) -> - ?LOG_ERROR("node ~p is down.", [Node]), + telemetry:execute([amoc, cluster, nodedown], #{count => 1}, + #{node => Node, state => state_to_map(State)}), {noreply, merge(connection_lost, [Node], State)}; handle_info(_Info, State) -> {noreply, State}. From 49ac5035265f5017e329d2e5909931be59a8d1f4 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Tue, 12 Dec 2023 00:25:45 +0100 Subject: [PATCH 4/6] Extract more telemetry helpers --- guides/telemetry.md | 14 ++++++---- src/amoc_config/amoc_config.erl | 11 ++++---- src/amoc_config/amoc_config_env.erl | 10 +++---- src/amoc_config/amoc_config_verification.erl | 18 ++++++------ src/amoc_distribution/amoc_cluster.erl | 15 +++++----- src/amoc_telemetry.erl | 16 +++++++++-- src/amoc_throttle/amoc_throttle_process.erl | 29 +++++++++++--------- 7 files changed, 65 insertions(+), 48 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index b80acfdb..f822f540 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -84,8 +84,12 @@ Events related to internals of the throttle processes, these might expose unstab might want to log or reconfigure: ```erlang event_name: [amoc, throttle, process] -measurements: #{msg := binary(), process := pid()} -metadata: #{monotonic_time := integer(), name := atom(), printable_state => map()} +measurements: #{logger:level() => 1} +metadata: #{log_level := logger:level(), + msg := binary(), + rate => non_neg_integer(), + state => map(), + _ => _} ``` ## Coordinator @@ -105,8 +109,8 @@ metadata: #{monotonic_time := integer(), name := atom()} There are related to bad configuration events, they might deserve logging ```erlang event_name: [amoc, config, get | verify | env] -measurements: #{} -metadata: #{log_class => syslog_level(), _ => _} +measurements: #{logger:level() => 1} +metadata: #{log_level => logger:level(), setting => atom(), msg => binary(), _ => _} ``` ## Cluster @@ -116,5 +120,5 @@ There are related to clustering events ```erlang event_name: [amoc, cluster, connect_nodes | nodedown | master_node_down] measurements: #{count => non_neg_integer()}, -metadata: #{node => node(), nodes => nodes(), state => map()} +metadata: #{nodes => nodes(), state => map()} ``` diff --git a/src/amoc_config/amoc_config.erl b/src/amoc_config/amoc_config.erl index c20f3375..0f1792ec 100644 --- a/src/amoc_config/amoc_config.erl +++ b/src/amoc_config/amoc_config.erl @@ -19,17 +19,16 @@ get(Name) -> get(Name, Default) when is_atom(Name) -> case ets:lookup(amoc_config, Name) of [] -> - telemetry:execute([amoc, config, get], #{}, - #{log_class => error, msg => <<"no scenario setting">>, - scenario => Name}), + amoc_telemetry:execute_log( + error, [config, get], #{setting => Name}, <<"no scenario setting">>), throw({invalid_setting, Name}); [#module_parameter{name = Name, value = undefined}] -> Default; [#module_parameter{name = Name, value = Value}] -> Value; InvalidLookupRet -> - telemetry:execute([amoc, config, get], #{}, - #{log_class => error, msg => <<"invalid lookup return value">>, - scenario => Name, return => InvalidLookupRet}), + amoc_telemetry:execute_log( + error, [config, get], #{setting => Name, return => InvalidLookupRet}, + <<"invalid lookup return value">>), throw({invalid_lookup_ret_value, InvalidLookupRet}) end. diff --git a/src/amoc_config/amoc_config_env.erl b/src/amoc_config/amoc_config_env.erl index dc0fb811..f3512d2f 100644 --- a/src/amoc_config/amoc_config_env.erl +++ b/src/amoc_config/amoc_config_env.erl @@ -39,11 +39,11 @@ get_os_env(Name, Default) -> case parse_value(Value, Default) of {ok, Term} -> Term; {error, Error} -> - telemetry:execute( - [amoc, config, env], #{error => 1}, - #{log_class => error, error => Error, variable_name => EnvName, - variable_value => Value, default_value => Default, - msg => <<"cannot parse environment variable, using default value">>}), + amoc_telemetry:execute_log( + error, [config, env], + #{error => Error, variable_name => EnvName, + variable_value => Value, default_value => Default}, + <<"cannot parse environment variable, using default value">>), Default end. diff --git a/src/amoc_config/amoc_config_verification.erl b/src/amoc_config/amoc_config_verification.erl index 71dc93d2..7e86cbb4 100644 --- a/src/amoc_config/amoc_config_verification.erl +++ b/src/amoc_config/amoc_config_verification.erl @@ -39,17 +39,17 @@ verify(Fun, Value) -> {true, NewValue} -> {true, NewValue}; {false, Reason} -> {false, {verification_failed, Reason}}; Ret -> - telemetry:execute([amoc, config, verify], #{error => 1}, - #{log_class => error, verification_method => Fun, - verification_arg => Value, verification_return => Ret, - msg => <<"invalid verification method">>}), + amoc_telemetry:execute_log( + error, [config, verify], + #{verification_method => Fun, verification_arg => Value, verification_return => Ret}, + <<"invalid verification method">>), {false, {invalid_verification_return_value, Ret}} catch C:E:S -> - telemetry:execute([amoc, config, verify], #{error => 1}, - #{log_class => error, verification_method => Fun, - verification_arg => Value, - kind => C, reason => E, stacktrace => S, - msg => <<"invalid verification method">>}), + amoc_telemetry:execute_log( + error, [config, verify], + #{verification_method => Fun, verification_arg => Value, + kind => C, reason => E, stacktrace => S}, + <<"invalid verification method">>), {false, {exception_during_verification, {C, E, S}}} end. diff --git a/src/amoc_distribution/amoc_cluster.erl b/src/amoc_distribution/amoc_cluster.erl index e736eca3..e9d95171 100644 --- a/src/amoc_distribution/amoc_cluster.erl +++ b/src/amoc_distribution/amoc_cluster.erl @@ -132,8 +132,7 @@ handle_call(_Request, _From, State) -> -spec handle_cast(any(), state()) -> {noreply, state()}. handle_cast({connect_nodes, Nodes}, State) -> - telemetry:execute([amoc, cluster, connect_nodes], #{count => length(Nodes)}, - #{nodes => Nodes, state => state_to_map(State)}), + execute_nodes(connect_nodes, Nodes, state_to_map(State)), NewState = handle_connect_nodes(Nodes, State), schedule_timer(NewState), {noreply, NewState}; @@ -148,14 +147,11 @@ handle_info(timeout, State) -> schedule_timer(NewState), {noreply, NewState}; handle_info({nodedown, Node}, #state{master = Node} = State) -> - telemetry:execute([amoc, cluster, master_node_down], - #{count => 1}, - #{node => Node, state => state_to_map(State)}), + execute_nodes(master_node_down, [Node], state_to_map(State)), erlang:halt(), {noreply, State}; handle_info({nodedown, Node}, State) -> - telemetry:execute([amoc, cluster, nodedown], #{count => 1}, - #{node => Node, state => state_to_map(State)}), + execute_nodes(nodedown, [Node], state_to_map(State)), {noreply, merge(connection_lost, [Node], State)}; handle_info(_Info, State) -> {noreply, State}. @@ -284,3 +280,8 @@ maybe_set_master(Node, #state{new_connection_action = Action}) -> %% to avoid a possibility of the amoc_cluster deadlock while %% running the Action call set_master_node/2 asynchronously spawn(fun() -> set_master_node(Node, Action) end). + +-spec execute_nodes(atom(), [node()], #{any() => any()}) -> ok. +execute_nodes(Name, Nodes, State) -> + PrefixedName = [amoc, cluster, Name], + telemetry:execute(PrefixedName, #{count => length(Nodes)}, #{nodes => Nodes, state => State}). diff --git a/src/amoc_telemetry.erl b/src/amoc_telemetry.erl index 56ec5df1..26534c94 100644 --- a/src/amoc_telemetry.erl +++ b/src/amoc_telemetry.erl @@ -2,7 +2,7 @@ %% @copyright 2023 Erlang Solutions Ltd. -module(amoc_telemetry). --export([execute/3]). +-export([execute/3, execute_log/4]). -spec execute(EventName, Measurements, Metadata) -> ok when EventName :: telemetry:event_name(), @@ -10,6 +10,16 @@ Metadata :: telemetry:event_metadata(). execute(Name, Measurements, Metadata) -> TimeStamp = erlang:monotonic_time(), - NameWithAmocPrefix = [amoc | Name], + PrefixedName = [amoc | Name], MetadataWithTS = Metadata#{monotonic_time => TimeStamp}, - telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS). + telemetry:execute(PrefixedName, Measurements, MetadataWithTS). + +-spec execute_log(Level, EventName, Metadata, Msg) -> ok when + Level :: logger:level(), + EventName :: telemetry:event_name(), + Metadata :: telemetry:event_metadata(), + Msg :: binary(). +execute_log(Level, Name, Metadata, Message) -> + PrefixedName = [amoc | Name], + MetadataWithLog = Metadata#{log_level => Level, msg => Message}, + telemetry:execute(PrefixedName, #{Level => 1}, MetadataWithLog). diff --git a/src/amoc_throttle/amoc_throttle_process.erl b/src/amoc_throttle/amoc_throttle_process.erl index 4271bb66..43268bec 100644 --- a/src/amoc_throttle/amoc_throttle_process.erl +++ b/src/amoc_throttle/amoc_throttle_process.erl @@ -129,10 +129,10 @@ format_status(_Opt, [_PDict, State]) -> initial_state(Name, Interval, Rate) when Rate >= 0 -> NewRate = case {Rate =:= 0, Rate < 5} of {true, _} -> - internal_event(<<"invalid rate, must be higher than zero">>, Name), + internal_error(<<"invalid rate, must be higher than zero">>, Name, Rate), 1; {_, true} -> - internal_event(<<"too low rate, please reduce NoOfProcesses">>, Name), + internal_error(<<"too low rate, please reduce NoOfProcesses">>, Name, Rate), Rate; {_, false} -> Rate @@ -140,7 +140,7 @@ initial_state(Name, Interval, Rate) when Rate >= 0 -> Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of {0, _, _} -> 0; %% limit only No of simultaneous executions {_, I, _} when I < 10 -> - internal_event(<<"too high rate, please increase NoOfProcesses">>, Name), + internal_error(<<"too high rate, please increase NoOfProcesses">>, Name, Rate), 10; {_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions; {_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 @@ -202,26 +202,29 @@ async_runner(Fun) -> timeout(State) -> State#state.interval + ?DEFAULT_MSG_TIMEOUT. -inc_n(#state{n = N, max_n = MaxN} = State) -> +inc_n(#state{name = Name, n = N, max_n = MaxN} = State) -> NewN = N + 1, case MaxN < NewN of true -> - internal_event(<<"throttle proccess has invalid N">>, State), + PrintableState = printable_state(State), + Msg = <<"throttle proccess has invalid N">>, + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, n => NewN, state => PrintableState}, Msg), State#state{n = MaxN}; false -> State#state{n = NewN} end. +-spec internal_event(binary(), state()) -> any(). internal_event(Msg, #state{name = Name} = State) -> PrintableState = printable_state(State), - telemetry:execute([amoc, throttle, process], - #{msg => Msg, process => self()}, - #{printable_state => PrintableState, - monotonic_time => erlang:monotonic_time(), name => Name}); -internal_event(Msg, Name) when is_atom(Name) -> - telemetry:execute([amoc, throttle, process], - #{msg => Msg, process => self()}, - #{monotonic_time => erlang:monotonic_time(), name => Name}). + amoc_telemetry:execute_log( + debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg). + +-spec internal_error(binary(), atom(), non_neg_integer()) -> any(). +internal_error(Msg, Name, Rate) -> + amoc_telemetry:execute_log( + error, [throttle, process], #{name => Name, rate => Rate}, Msg). printable_state(#state{} = State) -> Fields = record_info(fields, state), From ef3eb42a489028e48d9f8badacd3689d854668a4 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 15 Dec 2023 23:02:32 +0100 Subject: [PATCH 5/6] Reuse exported telemetry functions --- guides/telemetry.md | 8 ++++++-- src/amoc_distribution/amoc_cluster.erl | 14 +++++++------- src/amoc_telemetry.erl | 3 +-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index f822f540..db21f8be 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -85,7 +85,8 @@ might want to log or reconfigure: ```erlang event_name: [amoc, throttle, process] measurements: #{logger:level() => 1} -metadata: #{log_level := logger:level(), +metadata: #{monotonic_time := integer(), + log_level := logger:level(), msg := binary(), rate => non_neg_integer(), state => map(), @@ -110,7 +111,10 @@ There are related to bad configuration events, they might deserve logging ```erlang event_name: [amoc, config, get | verify | env] measurements: #{logger:level() => 1} -metadata: #{log_level => logger:level(), setting => atom(), msg => binary(), _ => _} +metadata: #{monotonic_time := integer(), + log_level => logger:level(), + setting => atom(), + msg => binary(), _ => _} ``` ## Cluster diff --git a/src/amoc_distribution/amoc_cluster.erl b/src/amoc_distribution/amoc_cluster.erl index e9d95171..8266603c 100644 --- a/src/amoc_distribution/amoc_cluster.erl +++ b/src/amoc_distribution/amoc_cluster.erl @@ -132,7 +132,7 @@ handle_call(_Request, _From, State) -> -spec handle_cast(any(), state()) -> {noreply, state()}. handle_cast({connect_nodes, Nodes}, State) -> - execute_nodes(connect_nodes, Nodes, state_to_map(State)), + raise_nodes_event(connect_nodes, Nodes, state_to_map(State)), NewState = handle_connect_nodes(Nodes, State), schedule_timer(NewState), {noreply, NewState}; @@ -147,11 +147,11 @@ handle_info(timeout, State) -> schedule_timer(NewState), {noreply, NewState}; handle_info({nodedown, Node}, #state{master = Node} = State) -> - execute_nodes(master_node_down, [Node], state_to_map(State)), + raise_nodes_event(master_node_down, [Node], state_to_map(State)), erlang:halt(), {noreply, State}; handle_info({nodedown, Node}, State) -> - execute_nodes(nodedown, [Node], state_to_map(State)), + raise_nodes_event(nodedown, [Node], state_to_map(State)), {noreply, merge(connection_lost, [Node], State)}; handle_info(_Info, State) -> {noreply, State}. @@ -281,7 +281,7 @@ maybe_set_master(Node, #state{new_connection_action = Action}) -> %% running the Action call set_master_node/2 asynchronously spawn(fun() -> set_master_node(Node, Action) end). --spec execute_nodes(atom(), [node()], #{any() => any()}) -> ok. -execute_nodes(Name, Nodes, State) -> - PrefixedName = [amoc, cluster, Name], - telemetry:execute(PrefixedName, #{count => length(Nodes)}, #{nodes => Nodes, state => State}). +-spec raise_nodes_event(atom(), [node()], #{any() => any()}) -> ok. +raise_nodes_event(Name, Nodes, State) -> + amoc_telemetry:execute( + [cluster, Name], #{count => length(Nodes)}, #{nodes => Nodes, state => State}). diff --git a/src/amoc_telemetry.erl b/src/amoc_telemetry.erl index 26534c94..29e848da 100644 --- a/src/amoc_telemetry.erl +++ b/src/amoc_telemetry.erl @@ -20,6 +20,5 @@ execute(Name, Measurements, Metadata) -> Metadata :: telemetry:event_metadata(), Msg :: binary(). execute_log(Level, Name, Metadata, Message) -> - PrefixedName = [amoc | Name], MetadataWithLog = Metadata#{log_level => Level, msg => Message}, - telemetry:execute(PrefixedName, #{Level => 1}, MetadataWithLog). + execute(Name, #{Level => 1}, MetadataWithLog). From 46ecf8f4798f1d04d172450ac9636a61808bdcb4 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 15 Dec 2023 23:12:10 +0100 Subject: [PATCH 6/6] Add interval in the throttle_process internal events --- guides/telemetry.md | 1 + src/amoc_throttle/amoc_throttle_process.erl | 15 +++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index db21f8be..d4668b81 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -89,6 +89,7 @@ metadata: #{monotonic_time := integer(), log_level := logger:level(), msg := binary(), rate => non_neg_integer(), + interval => non_neg_integer(), state => map(), _ => _} ``` diff --git a/src/amoc_throttle/amoc_throttle_process.erl b/src/amoc_throttle/amoc_throttle_process.erl index 43268bec..9a526384 100644 --- a/src/amoc_throttle/amoc_throttle_process.erl +++ b/src/amoc_throttle/amoc_throttle_process.erl @@ -129,10 +129,12 @@ format_status(_Opt, [_PDict, State]) -> initial_state(Name, Interval, Rate) when Rate >= 0 -> NewRate = case {Rate =:= 0, Rate < 5} of {true, _} -> - internal_error(<<"invalid rate, must be higher than zero">>, Name, Rate), + Msg = <<"invalid rate, must be higher than zero">>, + internal_error(Msg, Name, Rate, Interval), 1; {_, true} -> - internal_error(<<"too low rate, please reduce NoOfProcesses">>, Name, Rate), + Msg = <<"too low rate, please reduce NoOfProcesses">>, + internal_error(Msg, Name, Rate, Interval), Rate; {_, false} -> Rate @@ -140,7 +142,8 @@ initial_state(Name, Interval, Rate) when Rate >= 0 -> Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of {0, _, _} -> 0; %% limit only No of simultaneous executions {_, I, _} when I < 10 -> - internal_error(<<"too high rate, please increase NoOfProcesses">>, Name, Rate), + Message = <<"too high rate, please increase NoOfProcesses">>, + internal_error(Message, Name, Rate, Interval), 10; {_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions; {_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 @@ -221,10 +224,10 @@ internal_event(Msg, #state{name = Name} = State) -> amoc_telemetry:execute_log( debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg). --spec internal_error(binary(), atom(), non_neg_integer()) -> any(). -internal_error(Msg, Name, Rate) -> +-spec internal_error(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any(). +internal_error(Msg, Name, Rate, Interval) -> amoc_telemetry:execute_log( - error, [throttle, process], #{name => Name, rate => Rate}, Msg). + error, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg). printable_state(#state{} = State) -> Fields = record_info(fields, state),