Emqx alarm (#5994)

* chore(alarm): normalize_message outside emqx_alarm

* chore(alarm): don't cache config in emqx_alarm; remove dirty_write/read; add desc/example to alarm; add more test

* chore(alarm_api): alarm_api with hocon schema

* fix: activted's nullable is true

* fix(swagger): translate map to object

* fix(cluster_rpc): debug failed cluster_rpc test

* fix: Update schema description

Co-authored-by: Zaiming (Stone) Shi <zmstone@gmail.com>

Co-authored-by: Zaiming (Stone) Shi <zmstone@gmail.com>
This commit is contained in:
zhongwencool 2021-10-28 18:03:51 +08:00 committed by GitHub
parent 2c69c00906
commit e62fde321c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 291 additions and 205 deletions

View File

@ -48,6 +48,12 @@
%% Queue topic %% Queue topic
-define(QUEUE, <<"$queue/">>). -define(QUEUE, <<"$queue/">>).
%%--------------------------------------------------------------------
%% alarms
%%--------------------------------------------------------------------
-define(ACTIVATED_ALARM, emqx_activated_alarm).
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Message and Delivery %% Message and Delivery
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -17,7 +17,6 @@
-module(emqx_alarm). -module(emqx_alarm).
-behaviour(gen_server). -behaviour(gen_server).
-behaviour(emqx_config_handler).
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
@ -27,22 +26,19 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-export([post_config_update/4]). -export([start_link/0
-export([ start_link/0
, stop/0
]). ]).
-export([format/1]).
%% API %% API
-export([ activate/1 -export([ activate/1
, activate/2 , activate/2
, activate/3
, deactivate/1 , deactivate/1
, deactivate/2 , deactivate/2
, deactivate/3
, delete_all_deactivated_alarms/0 , delete_all_deactivated_alarms/0
, get_alarms/0 , get_alarms/0
, get_alarms/1 , get_alarms/1
, format/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -56,34 +52,19 @@
-record(activated_alarm, { -record(activated_alarm, {
name :: binary() | atom(), name :: binary() | atom(),
details :: map() | list(), details :: map() | list(),
message :: binary(), message :: binary(),
activate_at :: integer() activate_at :: integer()
}). }).
-record(deactivated_alarm, { -record(deactivated_alarm, {
activate_at :: integer(), activate_at :: integer(),
name :: binary() | atom(), name :: binary() | atom(),
details :: map() | list(), details :: map() | list(),
message :: binary(), message :: binary(),
deactivate_at :: integer() | infinity deactivate_at :: integer() | infinity
}). }).
-record(state, {
timer :: reference()
}).
-define(ACTIVATED_ALARM, emqx_activated_alarm).
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -114,20 +95,23 @@ mnesia(boot) ->
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:stop(?MODULE).
activate(Name) -> activate(Name) ->
activate(Name, #{}). activate(Name, #{}).
activate(Name, Details) -> activate(Name, Details) ->
gen_server:call(?MODULE, {activate_alarm, Name, Details}). activate(Name, Details, <<"">>).
activate(Name, Details, Message) ->
gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}).
deactivate(Name) -> deactivate(Name) ->
gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}). deactivate(Name, no_details, <<"">>).
deactivate(Name, Details) -> deactivate(Name, Details) ->
gen_server:call(?MODULE, {deactivate_alarm, Name, Details}). deactivate(Name, Details, <<"">>).
deactivate(Name, Details, Message) ->
gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}).
delete_all_deactivated_alarms() -> delete_all_deactivated_alarms() ->
gen_server:call(?MODULE, delete_all_deactivated_alarms). gen_server:call(?MODULE, delete_all_deactivated_alarms).
@ -144,10 +128,6 @@ get_alarms(activated) ->
get_alarms(deactivated) -> get_alarms(deactivated) ->
gen_server:call(?MODULE, {get_alarms, deactivated}). gen_server:call(?MODULE, {get_alarms, deactivated}).
post_config_update(_, #{validity_period := Period0}, _OldConf, _AppEnv) ->
?MODULE ! {update_timer, Period0},
ok.
format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) ->
Now = erlang:system_time(microsecond), Now = erlang:system_time(microsecond),
#{ #{
@ -168,9 +148,7 @@ format(#deactivated_alarm{name = Name, message = Message, activate_at = At, deta
activate_at => to_rfc3339(At), activate_at => to_rfc3339(At),
deactivate_at => to_rfc3339(DAt), deactivate_at => to_rfc3339(DAt),
details => Details details => Details
}; }.
format(_) ->
{error, unknow_alarm}.
to_rfc3339(Timestamp) -> to_rfc3339(Timestamp) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp div 1000, [{unit, millisecond}])). list_to_binary(calendar:system_time_to_rfc3339(Timestamp div 1000, [{unit, millisecond}])).
@ -180,85 +158,72 @@ to_rfc3339(Timestamp) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
_ = mria:wait_for_tables([?ACTIVATED_ALARM, ?DEACTIVATED_ALARM]), ok = mria:wait_for_tables([?ACTIVATED_ALARM, ?DEACTIVATED_ALARM]),
deactivate_all_alarms(), deactivate_all_alarms(),
ok = emqx_config_handler:add_handler([alarm], ?MODULE), {ok, #{}, get_validity_period()}.
{ok, #state{timer = ensure_timer(undefined, get_validity_period())}}.
%% suppress dialyzer warning due to dirty read/write race condition. handle_call({activate_alarm, Name, Details, Message}, _From, State) ->
%% TODO: change from dirty_read/write to transactional. Res = mria:transaction(mria:local_content_shard(),
%% TODO: handle mnesia write errors. fun create_activate_alarm/3,
-dialyzer([{nowarn_function, [handle_call/3]}]). [Name, Details, Message]),
handle_call({activate_alarm, Name, Details}, _From, State) -> case Res of
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of {atomic, Alarm} ->
[#activated_alarm{name = Name}] ->
{reply, {error, already_existed}, State};
[] ->
Alarm = #activated_alarm{name = Name,
details = Details,
message = normalize_message(Name, Details),
activate_at = erlang:system_time(microsecond)},
mria:dirty_write(?ACTIVATED_ALARM, Alarm),
do_actions(activate, Alarm, emqx:get_config([alarm, actions])), do_actions(activate, Alarm, emqx:get_config([alarm, actions])),
{reply, ok, State} {reply, ok, State, get_validity_period()};
{aborted, Reason} ->
{reply, Reason, State, get_validity_period()}
end; end;
handle_call({deactivate_alarm, Name, Details}, _From, State) -> handle_call({deactivate_alarm, Name, Details, Message}, _From, State) ->
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
[] -> [] ->
{reply, {error, not_found}, State}; {reply, {error, not_found}, State};
[Alarm] -> [Alarm] ->
deactivate_alarm(Details, Alarm), deactivate_alarm(Alarm, Details, Message),
{reply, ok, State} {reply, ok, State, get_validity_period()}
end; end;
handle_call(delete_all_deactivated_alarms, _From, State) -> handle_call(delete_all_deactivated_alarms, _From, State) ->
clear_table(?DEACTIVATED_ALARM), clear_table(?DEACTIVATED_ALARM),
{reply, ok, State}; {reply, ok, State, get_validity_period()};
handle_call({get_alarms, all}, _From, State) -> handle_call({get_alarms, all}, _From, State) ->
{atomic, Alarms} = {atomic, Alarms} =
mria:ro_transaction( mria:ro_transaction(
?COMMON_SHARD, mria:local_content_shard(),
fun() -> fun() ->
[normalize(Alarm) || [normalize(Alarm) ||
Alarm <- ets:tab2list(?ACTIVATED_ALARM) Alarm <- ets:tab2list(?ACTIVATED_ALARM)
++ ets:tab2list(?DEACTIVATED_ALARM)] ++ ets:tab2list(?DEACTIVATED_ALARM)]
end), end),
{reply, Alarms, State}; {reply, Alarms, State, get_validity_period()};
handle_call({get_alarms, activated}, _From, State) -> handle_call({get_alarms, activated}, _From, State) ->
Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM)], Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM)],
{reply, Alarms, State}; {reply, Alarms, State, get_validity_period()};
handle_call({get_alarms, deactivated}, _From, State) -> handle_call({get_alarms, deactivated}, _From, State) ->
Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?DEACTIVATED_ALARM)], Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?DEACTIVATED_ALARM)],
{reply, Alarms, State}; {reply, Alarms, State, get_validity_period()};
handle_call(Req, _From, State) -> handle_call(Req, From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call_req => Req, from => From}),
{reply, ignored, State}. {reply, ignored, State, get_validity_period()}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast_req => Msg}),
{noreply, State}. {noreply, State, get_validity_period()}.
handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, handle_info(timeout, State) ->
#state{timer = TRef} = State) ->
Period = get_validity_period(), Period = get_validity_period(),
delete_expired_deactivated_alarms(erlang:system_time(microsecond) - Period * 1000), delete_expired_deactivated_alarms(erlang:system_time(microsecond) - Period * 1000),
{noreply, State#state{timer = ensure_timer(TRef, Period)}}; {noreply, State, Period};
handle_info({update_timer, Period}, #state{timer = TRef} = State) ->
?SLOG(warning, #{msg => "validity_timer_updated", period => Period}),
{noreply, State#state{timer = ensure_timer(TRef, Period)}};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info_req => Info}),
{noreply, State}. {noreply, State, get_validity_period()}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok = emqx_config_handler:remove_handler([alarm]),
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -271,8 +236,21 @@ code_change(_OldVsn, State, _Extra) ->
get_validity_period() -> get_validity_period() ->
emqx:get_config([alarm, validity_period]). emqx:get_config([alarm, validity_period]).
deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name, create_activate_alarm(Name, Details, Message) ->
details = Details0, message = Msg0}) -> case mnesia:read(?ACTIVATED_ALARM, Name) of
[#activated_alarm{name = Name}] ->
mnesia:abort({error, already_existed});
[] ->
Alarm = #activated_alarm{name = Name,
details = Details,
message = normalize_message(Name, iolist_to_binary(Message)),
activate_at = erlang:system_time(microsecond)},
ok = mnesia:write(?ACTIVATED_ALARM, Alarm, write),
Alarm
end.
deactivate_alarm(#activated_alarm{activate_at = ActivateAt, name = Name,
details = Details0, message = Msg0}, Details, Message) ->
SizeLimit = emqx:get_config([alarm, size_limit]), SizeLimit = emqx:get_config([alarm, size_limit]),
case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
true -> true ->
@ -286,7 +264,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name
HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0, HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0,
erlang:system_time(microsecond)), erlang:system_time(microsecond)),
DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details,
normalize_message(Name, Details), normalize_message(Name, iolist_to_binary(Message)),
erlang:system_time(microsecond)), erlang:system_time(microsecond)),
mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
mria:dirty_delete(?ACTIVATED_ALARM, Name), mria:dirty_delete(?ACTIVATED_ALARM, Name),
@ -329,13 +307,6 @@ clear_table(TableName) ->
ok ok
end. end.
ensure_timer(OldTRef, Period) ->
_ = case is_reference(OldTRef) of
true -> erlang:cancel_timer(OldTRef);
false -> ok
end,
emqx_misc:start_timer(Period, delete_expired_deactivated_alarm).
delete_expired_deactivated_alarms(Checkpoint) -> delete_expired_deactivated_alarms(Checkpoint) ->
delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint). delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint).
@ -368,16 +339,12 @@ do_actions(deactivate, Alarm = #deactivated_alarm{name = Name}, [log | More]) ->
do_actions(deactivate, Alarm, More); do_actions(deactivate, Alarm, More);
do_actions(Operation, Alarm, [publish | More]) -> do_actions(Operation, Alarm, [publish | More]) ->
Topic = topic(Operation), Topic = topic(Operation),
{ok, Payload} = encode_to_json(Alarm), {ok, Payload} = emqx_json:safe_encode(normalize(Alarm)),
Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true}, Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true},
#{properties => #{'Content-Type' => <<"application/json">>}}), #{properties => #{'Content-Type' => <<"application/json">>}}),
%% TODO log failed publishes
_ = emqx_broker:safe_publish(Message), _ = emqx_broker:safe_publish(Message),
do_actions(Operation, Alarm, More). do_actions(Operation, Alarm, More).
encode_to_json(Alarm) ->
emqx_json:safe_encode(normalize(Alarm)).
topic(activate) -> topic(activate) ->
emqx_topic:systop(<<"alarms/activate">>); emqx_topic:systop(<<"alarms/activate">>);
topic(deactivate) -> topic(deactivate) ->
@ -405,25 +372,6 @@ normalize(#deactivated_alarm{activate_at = ActivateAt,
deactivate_at => DeactivateAt, deactivate_at => DeactivateAt,
activated => false}. activated => false}.
normalize_message(Name, no_details) -> normalize_message(Name, <<"">>) ->
list_to_binary(io_lib:format("~p", [Name])); list_to_binary(io_lib:format("~p", [Name]));
normalize_message(runq_overload, #{node := Node, runq_length := Len}) -> normalize_message(_Name, Message) -> Message.
list_to_binary(io_lib:format("VM is overloaded on node: ~p: ~p", [Node, Len]));
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [HighWatermark]));
normalize_message(high_cpu_usage, #{usage := Usage}) ->
list_to_binary(io_lib:format("~ts cpu usage", [Usage]));
normalize_message(too_many_processes, #{usage := Usage}) ->
list_to_binary(io_lib:format("~ts process usage", [Usage]));
normalize_message(cluster_rpc_apply_failed, #{tnx_id := TnxId}) ->
list_to_binary(io_lib:format("cluster_rpc_apply_failed:~w", [TnxId]));
normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~ts", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~ts(~ts) is down", [Type, ID]));
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("connection congested: ~ts", [Info]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -57,14 +57,18 @@ init(_) ->
{ok, []}. {ok, []}.
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
HighWatermark = emqx_os_mon:get_sysmem_high_watermark(),
Message = to_bin("System memory usage is higher than ~p%", [HighWatermark]),
emqx_alarm:activate(high_system_memory_usage, emqx_alarm:activate(high_system_memory_usage,
#{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}), #{high_watermark => HighWatermark}, Message),
{ok, State}; {ok, State};
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
HighWatermark = emqx_os_mon:get_procmem_high_watermark(),
Message = to_bin("Process memory usage is higher than ~p%", [HighWatermark]),
emqx_alarm:activate(high_process_memory_usage, emqx_alarm:activate(high_process_memory_usage,
#{pid => list_to_binary(pid_to_list(Pid)), #{pid => list_to_binary(pid_to_list(Pid)),
high_watermark => emqx_os_mon:get_procmem_high_watermark()}), high_watermark => HighWatermark}, Message),
{ok, State}; {ok, State};
handle_event({clear_alarm, system_memory_high_watermark}, State) -> handle_event({clear_alarm, system_memory_high_watermark}, State) ->
@ -76,7 +80,9 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) ->
{ok, State}; {ok, State};
handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
emqx_alarm:activate(runq_overload, Info), #{node := Node, runq_length := Len} = Info,
Message = to_bin("VM is overloaded on node: ~p: ~p", [Node, Len]),
emqx_alarm:activate(runq_overload, Info, Message),
{ok, State}; {ok, State};
handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) -> handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) ->
@ -96,3 +102,6 @@ terminate(swap, _State) ->
{emqx_alarm_handler, []}; {emqx_alarm_handler, []};
terminate(_, _) -> terminate(_, _) ->
ok. ok.
to_bin(Format, Args) ->
io_lib:format(Format, Args).

View File

@ -78,13 +78,15 @@ cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
do_alarm_congestion(Socket, Transport, Channel, Reason) -> do_alarm_congestion(Socket, Transport, Channel, Reason) ->
ok = update_alarm_sent_at(Reason), ok = update_alarm_sent_at(Reason),
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), Message = io_lib:format("connection congested: ~ts", [AlarmDetails]),
emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message),
ok. ok.
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
ok = remove_alarm_sent_at(Reason), ok = remove_alarm_sent_at(Reason),
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), Message = io_lib:format("connection congested: ~ts", [AlarmDetails]),
emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message),
ok. ok.
is_tcp_congested(Socket, Transport) -> is_tcp_congested(Socket, Transport) ->

View File

@ -96,12 +96,26 @@ handle_info({timeout, _Timer, check}, State) ->
_ = case emqx_vm:cpu_util() of %% TODO: should be improved? _ = case emqx_vm:cpu_util() of %% TODO: should be improved?
0 -> ok; 0 -> ok;
Busy when Busy >= CPUHighWatermark -> Busy when Busy >= CPUHighWatermark ->
emqx_alarm:activate(high_cpu_usage, #{usage => io_lib:format("~p%", [Busy]), Usage = io_lib:format("~p%", [Busy]),
Message = [Usage, " cpu usage"],
emqx_alarm:activate(high_cpu_usage,
#{
usage => Usage,
high_watermark => CPUHighWatermark, high_watermark => CPUHighWatermark,
low_watermark => CPULowWatermark}), low_watermark => CPULowWatermark
},
Message),
start_check_timer(); start_check_timer();
Busy when Busy =< CPULowWatermark -> Busy when Busy =< CPULowWatermark ->
emqx_alarm:deactivate(high_cpu_usage), Usage = io_lib:format("~p%", [Busy]),
Message = [Usage, " cpu usage"],
emqx_alarm:deactivate(high_cpu_usage,
#{
usage => Usage,
high_watermark => CPUHighWatermark,
low_watermark => CPULowWatermark
},
Message),
start_check_timer(); start_check_timer();
_Busy -> _Busy ->
start_check_timer() start_check_timer()

View File

@ -51,6 +51,7 @@
-export([ validate_heap_size/1 -export([ validate_heap_size/1
, parse_user_lookup_fun/1 , parse_user_lookup_fun/1
, validate_alarm_actions/1
]). ]).
% workaround: prevent being recognized as unused functions % workaround: prevent being recognized as unused functions
@ -889,17 +890,34 @@ fields("sysmon_os") ->
fields("alarm") -> fields("alarm") ->
[ {"actions", [ {"actions",
sc(hoconsc:array(atom()), sc(hoconsc:array(atom()),
#{ default => [log, publish] #{ default => [log, publish],
validator => fun ?MODULE:validate_alarm_actions/1,
example => [log, publish],
desc =>
"""The actions triggered when the alarm is activated.<\br>
Currently supports two actions, 'log' and 'publish'.
'log' is to write the alarm to log (console or file).
'publish' is to publish the alarm as an MQTT message to the system topics:
<code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/activate</code> and <code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/deactivate</code>"""
}) })
} }
, {"size_limit", , {"size_limit",
sc(integer(), sc(range(1, 3000),
#{ default => 1000 #{ default => 1000,
example => 1000,
desc =>
"""The maximum total number of deactivated alarms to keep as history.<br>
When this limit is exceeded, the oldest deactivated alarms are deleted to cap the total number.
"""
}) })
} }
, {"validity_period", , {"validity_period",
sc(duration(), sc(duration(),
#{ default => "24h" #{ default => "24h",
example => "24h",
desc =>
"""Retention time of deactivated alarms. Alarms are not deleted immediately when deactivated, but after the retention time.
"""
}) })
} }
]. ].
@ -1345,6 +1363,14 @@ validate_heap_size(Siz) ->
true -> error(io_lib:format("force_shutdown_policy: heap-size ~ts is too large", [Siz])); true -> error(io_lib:format("force_shutdown_policy: heap-size ~ts is too large", [Siz]));
false -> ok false -> ok
end. end.
validate_alarm_actions(Actions) ->
UnSupported = lists:filter(fun(Action) -> Action =/= log andalso Action =/= publish end, Actions),
case UnSupported of
[] -> ok;
Error -> {error, Error}
end.
parse_user_lookup_fun(StrConf) -> parse_user_lookup_fun(StrConf) ->
[ModStr, FunStr] = string:tokens(str(StrConf), ":"), [ModStr, FunStr] = string:tokens(str(StrConf), ":"),
Mod = list_to_atom(ModStr), Mod = list_to_atom(ModStr),

View File

@ -170,9 +170,11 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_partition_event({partition, {occurred, Node}}) -> handle_partition_event({partition, {occurred, Node}}) ->
emqx_alarm:activate(partition, #{occurred => Node}); Message = io_lib:format("Partition occurs at node ~ts", [Node]),
handle_partition_event({partition, {healed, _Node}}) -> emqx_alarm:activate(partition, #{occurred => Node}, Message);
emqx_alarm:deactivate(partition). handle_partition_event({partition, {healed, Node}}) ->
Message = io_lib:format("Partition healed at node ~ts", [Node]),
emqx_alarm:deactivate(partition, no_details, Message).
suppress(Key, SuccFun, State = #{events := Events}) -> suppress(Key, SuccFun, State = #{events := Events}) ->
case lists:member(Key, Events) of case lists:member(Key, Events) of

View File

@ -62,12 +62,23 @@ handle_info({timeout, _Timer, check}, State) ->
ProcessCount = erlang:system_info(process_count), ProcessCount = erlang:system_info(process_count),
case ProcessCount / erlang:system_info(process_limit) of case ProcessCount / erlang:system_info(process_limit) of
Percent when Percent >= ProcHighWatermark -> Percent when Percent >= ProcHighWatermark ->
emqx_alarm:activate(too_many_processes, #{ Usage = io_lib:format("~p%", [Percent*100]),
usage => io_lib:format("~p%", [Percent*100]), Message = [Usage, " process usage"],
emqx_alarm:activate(too_many_processes,
#{
usage => Usage,
high_watermark => ProcHighWatermark, high_watermark => ProcHighWatermark,
low_watermark => ProcLowWatermark}); low_watermark => ProcLowWatermark},
Message);
Percent when Percent < ProcLowWatermark -> Percent when Percent < ProcLowWatermark ->
emqx_alarm:deactivate(too_many_processes); Usage = io_lib:format("~p%", [Percent*100]),
Message = [Usage, " process usage"],
emqx_alarm:deactivate(too_many_processes,
#{
usage => Usage,
high_watermark => ProcHighWatermark,
low_watermark => ProcLowWatermark},
Message);
_Precent -> _Precent ->
ok ok
end, end,

View File

@ -32,16 +32,12 @@ init_per_testcase(t_size_limit, Config) ->
<<"size_limit">> => 2 <<"size_limit">> => 2
}), }),
Config; Config;
init_per_testcase(t_validity_period, Config) -> init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([]),
{ok, _} = emqx:update_config([alarm], #{ {ok, _} = emqx:update_config([alarm], #{
<<"validity_period">> => <<"1s">> <<"validity_period">> => <<"1s">>
}), }),
Config;
init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config. Config.
end_per_testcase(_, _Config) -> end_per_testcase(_, _Config) ->
@ -86,17 +82,77 @@ t_size_limit(_) ->
?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
emqx_alarm:delete_all_deactivated_alarms(). emqx_alarm:delete_all_deactivated_alarms().
t_validity_period(_) -> t_validity_period(_Config) ->
ok = emqx_alarm:activate(a), ok = emqx_alarm:activate(a, #{msg => "Request frequency is too high"}, <<"Reach Rate Limit">>),
ok = emqx_alarm:deactivate(a), ok = emqx_alarm:deactivate(a, #{msg => "Request frequency returns to normal"}),
?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
%% call with unknown msg
?assertEqual(ignored, gen_server:call(emqx_alarm, unknown_alarm)),
ct:sleep(3000), ct:sleep(3000),
?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))). ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))).
t_validity_period_1(_Config) ->
ok = emqx_alarm:activate(a, #{msg => "Request frequency is too high"}, <<"Reach Rate Limit">>),
ok = emqx_alarm:deactivate(a, #{msg => "Request frequency returns to normal"}),
?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
%% info with unknown msg
erlang:send(emqx_alarm, unknown_alarm),
ct:sleep(3000),
?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))).
t_validity_period_2(_Config) ->
ok = emqx_alarm:activate(a, #{msg => "Request frequency is too high"}, <<"Reach Rate Limit">>),
ok = emqx_alarm:deactivate(a, #{msg => "Request frequency returns to normal"}),
?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
%% cast with unknown msg
gen_server:cast(emqx_alarm, unknown_alarm),
ct:sleep(3000),
?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))).
-record(activated_alarm, {
name :: binary() | atom(),
details :: map() | list(),
message :: binary(),
activate_at :: integer()
}).
-record(deactivated_alarm, {
activate_at :: integer(),
name :: binary() | atom(),
details :: map() | list(),
message :: binary(),
deactivate_at :: integer() | infinity
}).
t_format(_Config) ->
Name = test_alarm,
Message = "test_msg",
At = erlang:system_time(microsecond),
Details = "test_details",
Node = node(),
Activate = #activated_alarm{name = Name, message = Message, activate_at = At, details = Details},
#{
node := Node,
name := Name,
message := Message,
duration := 0,
details := Details
} = emqx_alarm:format(Activate),
Deactivate = #deactivated_alarm{name = Name, message = Message, activate_at = At, details = Details,
deactivate_at = At},
#{
node := Node,
name := Name,
message := Message,
duration := 0,
details := Details
} = emqx_alarm:format(Deactivate),
ok.
get_alarm(Name, [Alarm = #{name := Name} | _More]) -> get_alarm(Name, [Alarm = #{name := Name} | _More]) ->
Alarm; Alarm;
get_alarm(Name, [_Alarm | More]) -> get_alarm(Name, [_Alarm | More]) ->
get_alarm(Name, More); get_alarm(Name, More);
get_alarm(_Name, []) -> get_alarm(_Name, []) ->
{error, not_found}. {error, not_found}.

View File

@ -320,21 +320,23 @@ apply_mfa(TnxId, {M, F, A}) ->
end, end,
Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)}, Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)},
IsSuccess = is_success(Res), IsSuccess = is_success(Res),
log_and_alarm(IsSuccess, Res, Meta), log_and_alarm(IsSuccess, Res, Meta, TnxId),
{IsSuccess, Res}. {IsSuccess, Res}.
is_success(ok) -> true; is_success(ok) -> true;
is_success({ok, _}) -> true; is_success({ok, _}) -> true;
is_success(_) -> false. is_success(_) -> false.
log_and_alarm(true, Res, Meta) -> log_and_alarm(true, Res, Meta, TnxId) ->
OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => Res}, OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => Res},
?SLOG(debug, OkMeta), ?SLOG(debug, OkMeta),
emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)}); Message = ["cluster_rpc_apply_failed:", integer_to_binary(TnxId)],
log_and_alarm(false, Res, Meta) -> emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)}, Message);
log_and_alarm(false, Res, Meta, TnxId) ->
NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => Res}, NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => Res},
?SLOG(error, NotOkMeta), ?SLOG(error, NotOkMeta),
emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}). Message = ["cluster_rpc_apply_failed:", integer_to_binary(TnxId)],
emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}, Message).
wait_for_all_nodes_commit(TnxId, Delay, Remain) -> wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
case lagging_node(TnxId) of case lagging_node(TnxId) of

View File

@ -43,8 +43,8 @@ init_per_suite(Config) ->
ok = ekka:start(), ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 2, ok), meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 2, ok), meck:expect(emqx_alarm, deactivate, 3, ok),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
@ -122,17 +122,21 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
emqx_cluster_rpc:reset(), emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]),
{M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]),
{ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
ct:pal("333:~p~n", [ets:tab2list(cluster_rpc_commit)]),
ct:pal("444:~p~n", [emqx_cluster_rpc:status()]),
{atomic, [Status|L]} = emqx_cluster_rpc:status(), {atomic, [Status|L]} = emqx_cluster_rpc:status(),
?assertEqual([], L), ?assertEqual([], L),
?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)), ?assertEqual(node(), maps:get(node, Status)),
sleep(2300), ct:sleep(2300),
{atomic, [Status1]} = emqx_cluster_rpc:status(), {atomic, [Status1]} = emqx_cluster_rpc:status(),
?assertEqual(Status, Status1), ?assertEqual(Status, Status1),
sleep(3600), ct:sleep(3600),
{atomic, NewStatus} = emqx_cluster_rpc:status(), {atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)), ?assertEqual(3, length(NewStatus)),
Pid = self(), Pid = self(),
@ -161,7 +165,7 @@ t_del_stale_mfa(_Config) ->
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
TnxId end || _ <- Keys2], TnxId end || _ <- Keys2],
?assertEqual(Keys2, Ids2), ?assertEqual(Keys2, Ids2),
sleep(1200), ct:sleep(1200),
[begin [begin
?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I)) ?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I))
end || I <- lists:seq(1, 50)], end || I <- lists:seq(1, 50)],
@ -177,7 +181,7 @@ t_skip_failed_commit(_Config) ->
emqx_cluster_rpc:reset(), emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
sleep(180), ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(), {atomic, List1} = emqx_cluster_rpc:status(),
Node = node(), Node = node(),
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
@ -250,8 +254,3 @@ failed_on_other_recover_after_5_second(Pid, CreatedAt) ->
false -> ok false -> ok
end end
end. end.
sleep(Ms) ->
receive _ -> ok
after Ms -> timeout
end.

View File

@ -362,7 +362,7 @@ typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, examp
#{type => integer, example => 100}], example => infinity}; #{type => integer, example => 100}], example => infinity};
typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>}; typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>};
typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>}; typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>};
typename_to_spec("map()", _Mod) -> #{type => string, example => <<>>}; typename_to_spec("map()", _Mod) -> #{type => object, example => #{}};
typename_to_spec("comma_separated_list()", _Mod) -> #{type => string, example => <<"item1,item2">>}; typename_to_spec("comma_separated_list()", _Mod) -> #{type => string, example => <<"item1,item2">>};
typename_to_spec("comma_separated_atoms()", _Mod) -> #{type => string, example => <<"item1,item2">>}; typename_to_spec("comma_separated_atoms()", _Mod) -> #{type => string, example => <<"item1,item2">>};
typename_to_spec("pool_type()", _Mod) -> #{type => string, enum => [random, hash], example => hash}; typename_to_spec("pool_type()", _Mod) -> #{type => string, enum => [random, hash], example => hash};

View File

@ -172,7 +172,7 @@ t_complicated_type(_Config) ->
[#{example => infinity, type => string}, #{example => 100, type => integer}]}}, [#{example => infinity, type => string}, #{example => 100, type => integer}]}},
{<<"bytesize">>, #{example => <<"32MB">>, type => string}}, {<<"bytesize">>, #{example => <<"32MB">>, type => string}},
{<<"wordsize">>, #{example => <<"1024KB">>, type => string}}, {<<"wordsize">>, #{example => <<"1024KB">>, type => string}},
{<<"maps">>, #{example => <<>>, type => string}}, {<<"maps">>, #{example => #{}, type => object}},
{<<"comma_separated_list">>, #{example => <<"item1,item2">>, type => string}}, {<<"comma_separated_list">>, #{example => <<"item1,item2">>, type => string}},
{<<"comma_separated_atoms">>, #{example => <<"item1,item2">>, type => string}}, {<<"comma_separated_atoms">>, #{example => <<"item1,item2">>, type => string}},
{<<"log_level">>, {<<"log_level">>,

View File

@ -18,65 +18,76 @@
-behaviour(minirest_api). -behaviour(minirest_api).
-export([api_spec/0]). -include_lib("emqx/include/emqx.hrl").
-include_lib("typerefl/include/types.hrl").
-export([api_spec/0, paths/0, schema/1, fields/1]).
-export([alarms/2]). -export([alarms/2]).
%% internal export (for query) %% internal export (for query)
-export([ query/4 -export([query/4]).
]).
%% notice: from emqx_alarms
-define(ACTIVATED_ALARM, emqx_activated_alarm).
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
-import(emqx_mgmt_util, [ object_array_schema/2
, schema/1
, properties/1
]).
api_spec() -> api_spec() ->
{[alarms_api()], []}. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
properties() -> paths() ->
properties([ ["/alarms"].
{node, string, <<"Alarm in node">>},
{name, string, <<"Alarm name">>},
{message, string, <<"Alarm readable information">>},
{details, object},
{duration, integer, <<"Alarms duration time; UNIX time stamp, millisecond">>},
{activate_at, string, <<"Alarms activate time, RFC 3339">>},
{deactivate_at, string, <<"Nullable, alarms deactivate time, RFC 3339">>}
]).
alarms_api() -> schema("/alarms") ->
Metadata = #{ #{
operationId => alarms,
get => #{ get => #{
description => <<"EMQ X alarms">>, description => <<"EMQ X alarms">>,
parameters => emqx_mgmt_util:page_params() ++ [#{ parameters => [
name => activated, hoconsc:ref(emqx_dashboard_swagger, page),
in => query, hoconsc:ref(emqx_dashboard_swagger, limit),
description => <<"All alarms, if not specified">>, {activated, hoconsc:mk(boolean(), #{in => query,
required => false, desc => <<"All alarms, if not specified">>,
schema => #{type => boolean, default => true} nullable => true})}
}], ],
responses => #{ responses => #{
<<"200">> => 200 => [
object_array_schema(properties(), <<"List all alarms">>)}}, {data, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, alarm)), #{})},
{meta, hoconsc:mk(hoconsc:ref(?MODULE, meta), #{})}
]
}
},
delete => #{ delete => #{
description => <<"Remove all deactivated alarms">>, description => <<"Remove all deactivated alarms">>,
responses => #{ responses => #{
<<"200">> => 200 => <<"Remove all deactivated alarms ok">>
schema(<<"Remove all deactivated alarms ok">>)}}}, }
{"/alarms", Metadata, alarms}. }
}.
fields(alarm) ->
[
{node, hoconsc:mk(binary(), #{desc => <<"Alarm in node">>, example => atom_to_list(node())})},
{name, hoconsc:mk(binary(), #{desc => <<"Alarm name">>, example => <<"high_system_memory_usage">>})},
{message, hoconsc:mk(binary(), #{desc => <<"Alarm readable information">>,
example => <<"System memory usage is higher than 70%">>})},
{details, hoconsc:mk(map(), #{desc => <<"Alarm details information">>,
example => #{<<"high_watermark">> => 70}})},
{duration, hoconsc:mk(integer(), #{desc => <<"Alarms duration time; UNIX time stamp, millisecond">>,
example => 297056})},
{activate_at, hoconsc:mk(binary(), #{desc => <<"Alarms activate time, RFC 3339">>,
example => <<"2021-10-25T11:52:52.548+08:00">>})},
{deactivate_at, hoconsc:mk(binary(), #{desc => <<"Nullable, alarms deactivate time, RFC 3339">>,
example => <<"2021-10-31T10:52:52.548+08:00">>})}
];
fields(meta) ->
emqx_dashboard_swagger:fields(page) ++
emqx_dashboard_swagger:fields(limit) ++
[{count, hoconsc:mk(integer(), #{example => 1})}].
%%%============================================================================================== %%%==============================================================================================
%% parameters trans %% parameters trans
alarms(get, #{query_string := Qs}) -> alarms(get, #{query_string := Qs}) ->
Table = Table =
case maps:get(<<"activated">>, Qs, <<"true">>) of case maps:get(<<"activated">>, Qs, true) of
<<"true">> -> ?ACTIVATED_ALARM; true -> ?ACTIVATED_ALARM;
<<"false">> -> ?DEACTIVATED_ALARM false -> ?DEACTIVATED_ALARM
end, end,
Response = emqx_mgmt_api:cluster_query(Qs, Table, [], {?MODULE, query}), Response = emqx_mgmt_api:cluster_query(Qs, Table, [], {?MODULE, query}),
emqx_mgmt_util:generate_response(Response); emqx_mgmt_util:generate_response(Response);