feature(alarm): new design for alarm

This commit is contained in:
zhouzb 2020-07-28 15:17:10 +08:00 committed by tigercl
parent 0817761aee
commit 43b49edd28
14 changed files with 425 additions and 333 deletions

View File

@ -112,19 +112,6 @@
node_id :: trie_node_id() node_id :: trie_node_id()
}). }).
%%--------------------------------------------------------------------
%% Alarm
%%--------------------------------------------------------------------
-record(alarm, {
id :: binary(),
severity :: notice | warning | error | critical,
title :: iolist(),
summary :: iolist(),
%% Timestamp (Unit: millisecond)
timestamp :: integer() | undefined
}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Plugin %% Plugin
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -2103,7 +2103,7 @@ end}.
{translation, "emqx.os_mon", fun(Conf) -> {translation, "emqx.os_mon", fun(Conf) ->
Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf), Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf),
[{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs]
end}. end}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -2126,5 +2126,5 @@ end}.
{translation, "emqx.vm_mon", fun(Conf) -> {translation, "emqx.vm_mon", fun(Conf) ->
Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf), Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf),
[{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs]
end}. end}.

278
src/emqx_alarm.erl Normal file
View File

@ -0,0 +1,278 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_alarm).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Alarm Handler]").
-export([start_link/0, stop/0]).
%% API
-export([ activate/1
, activate/2
, deactivate/1
, delete_all_deactivated_alarms/0
, get_alarms/0
, get_alarms/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(alarm, {
name :: binary() | atom(),
details :: map() | list(),
message :: binary(),
activate_at :: integer(),
deactivate_at :: integer() | infinity,
activated :: boolean()
}).
-record(state, {
actions :: [action()]
}).
-type action() :: log | publish | event.
-define(TAB, emqx_alarm).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:stop(?MODULE).
activate(Name) ->
activate(Name, #{}).
activate(Name, Details) ->
gen_server:call(?MODULE, {activate_alarm, Name, Details}).
deactivate(Name) ->
gen_server:call(?MODULE, {deactivate_alarm, Name}).
delete_all_deactivated_alarms() ->
gen_server:call(?MODULE, delete_all_deactivated_alarms).
get_alarms() ->
get_alarms(all).
get_alarms(all) ->
gen_server:call(?MODULE, {get_alarms, all});
get_alarms(activated) ->
gen_server:call(?MODULE, {get_alarms, activated});
get_alarms(deactivated) ->
gen_server:call(?MODULE, {get_alarms, deactivated}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
Opts = [{actions, [log, publish]}],
init([Opts]);
init([Opts]) ->
ok = ekka_mnesia:create_table(?TAB,
[{type, bag},
{disc_copies, [node()]},
{local_content, true},
{record_name, alarm},
{attributes, record_info(fields, alarm)}]),
Actions = proplists:get_value(actions, Opts, [log, publish]),
deactivate_all_alarms(),
{ok, #state{actions = Actions}}.
handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) ->
case get(Name) of
set ->
{reply, {error, already_existed}, State};
undefined ->
Alarm = #alarm{name = Name,
details = Details,
message = normalize_message(Name, Details),
activate_at = erlang:system_time(millisecond),
deactivate_at = infinity,
activated = true},
mnesia:dirty_write(?TAB, Alarm),
put(Name, set),
do_actions(activate, Alarm, Actions),
{reply, ok, State}
end;
handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions}) ->
case get(Name) of
set ->
MatchSpec = [{#alarm{name = '$1', activated = '$2', _ = '_'},
[{'==', '$1', Name}, {'==', '$2', true}],
['$_']}],
case mnesia:dirty_select(?TAB, MatchSpec) of
[] ->
erase(Name),
{reply, {error, not_found}, State};
[Alarm | _] ->
NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond),
activated = false},
mnesia:dirty_delete_object(?TAB, Alarm),
mnesia:dirty_write(?TAB, NAlarm),
erase(Name),
do_actions(deactivate, NAlarm, Actions),
{reply, ok, State}
end;
undefined ->
{reply, {error, not_found}, State}
end;
handle_call(delete_all_deactivated_alarms, _From, State) ->
MatchSpec = [{#alarm{activated = '$1', _ = '_'},
[{'==', '$1', false}],
['$_']}],
lists:foreach(fun(Alarm) ->
mnesia:dirty_delete_object(?TAB, Alarm)
end, mnesia:dirty_select(?TAB, MatchSpec)),
{reply, ok, State};
handle_call({get_alarms, all}, _From, State) ->
Alarms = ets:tab2list(?TAB),
{reply, [normalize(Alarm) || Alarm <- Alarms], State};
handle_call({get_alarms, activated}, _From, State) ->
MatchSpec = [{#alarm{activated = '$1', _ = '_'},
[{'==', '$1', true}],
['$_']}],
Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)],
{reply, Alarms, State};
handle_call({get_alarms, deactivated}, _From, State) ->
MatchSpec = [{#alarm{activated = '$1', _ = '_'},
[{'==', '$1', false}],
['$_']}],
Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)],
{reply, Alarms, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
deactivate_all_alarms() ->
MatchSpec = [{#alarm{activated = '$1', _ = '_'},
[{'==', '$1', true}],
['$_']}],
case mnesia:dirty_select(?TAB, MatchSpec) of
[] ->
ok;
Alarms ->
lists:foreach(fun(Alarm) ->
NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond),
activated = false},
mnesia:dirty_delete_object(?TAB, Alarm),
mnesia:dirty_write(?TAB, NAlarm)
end, Alarms)
end.
do_actions(_, _, []) ->
ok;
do_actions(activate, Alarm = #alarm{name = Name, message = Message}, [log | More]) ->
?LOG(warning, "Alarm ~p is activated, ~s", [Name, Message]),
do_actions(activate, Alarm, More);
do_actions(deactivate, Alarm = #alarm{name = Name}, [log | More]) ->
?LOG(warning, "Alarm ~p is deactivated", [Name]),
do_actions(deactivate, Alarm, More);
do_actions(Operation, Alarm, [publish | More]) ->
Topic = topic(Operation),
{ok, Payload} = encode_to_json(Alarm),
Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true},
#{properties => #{'Content-Type' => <<"application/json">>}}),
emqx_broker:safe_publish(Message),
do_actions(Operation, Alarm, More).
encode_to_json(Alarm) ->
emqx_json:safe_encode(normalize(Alarm)).
topic(activate) ->
emqx_topic:systop(<<"alarms/activate">>);
topic(deactivate) ->
emqx_topic:systop(<<"alarms/deactivate">>).
normalize(#alarm{name = Name,
details = Details,
message = Message,
activate_at = ActivateAt,
deactivate_at = DeactivateAt,
activated = Activated}) ->
#{name => Name,
details => Details,
message => Message,
activate_at => ActivateAt,
deactivate_at => DeactivateAt,
activated => Activated}.
normalize_message(high_system_memory_usage, _Details) ->
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [emqx_os_mon:get_sysmem_high_watermark()]));
normalize_message(high_process_memory_usage, _Details) ->
list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [emqx_os_mon:get_procmem_high_watermark()]));
normalize_message(high_cpu_usage, #{usage := Usage}) ->
list_to_binary(io_lib:format("~p% cpu usage", [Usage]));
normalize_message(too_many_processes, #{high_watermark := HightWatermark}) ->
list_to_binary(io_lib:format("High_watermark: ~p%", [HightWatermark]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown">>.

View File

@ -23,12 +23,6 @@
-logger_header("[Alarm Handler]"). -logger_header("[Alarm Handler]").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% gen_event callbacks %% gen_event callbacks
-export([ init/1 -export([ init/1
, handle_event/2 , handle_event/2
@ -39,38 +33,8 @@
-export([ load/0 -export([ load/0
, unload/0 , unload/0
, get_alarms/0
, get_alarms/1
]). ]).
-record(common_alarm, {id, desc}).
-record(alarm_history, {id, desc, clear_at}).
-define(ALARM_TAB, emqx_alarm).
-define(ALARM_HISTORY_TAB, emqx_alarm_history).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(?ALARM_TAB, [
{type, set},
{disc_copies, [node()]},
{local_content, true},
{record_name, common_alarm},
{attributes, record_info(fields, common_alarm)}]),
ok = ekka_mnesia:create_table(?ALARM_HISTORY_TAB, [
{type, set},
{disc_copies, [node()]},
{local_content, true},
{record_name, alarm_history},
{attributes, record_info(fields, alarm_history)}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ALARM_TAB),
ok = ekka_mnesia:copy_table(?ALARM_HISTORY_TAB).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -82,50 +46,32 @@ load() ->
unload() -> unload() ->
gen_event:swap_handler(alarm_handler, {?MODULE, swap}, {alarm_handler, []}). gen_event:swap_handler(alarm_handler, {?MODULE, swap}, {alarm_handler, []}).
get_alarms() ->
get_alarms(present).
get_alarms(present) ->
Alarms = ets:tab2list(?ALARM_TAB),
[{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms];
get_alarms(history) ->
Alarms = ets:tab2list(?ALARM_HISTORY_TAB),
[{Id, Desc, ClearAt} || #alarm_history{id = Id, desc = Desc, clear_at = ClearAt} <- Alarms].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_event callbacks %% gen_event callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init({_Args, {alarm_handler, ExistingAlarms}}) -> init({_Args, {alarm_handler, _ExistingAlarms}}) ->
init_tables(ExistingAlarms),
{ok, []}; {ok, []};
init(_) -> init(_) ->
init_tables([]),
{ok, []}. {ok, []}.
handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) -> handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = erlang:system_time(millisecond)}}}, State); emqx_alarm:activate(high_system_memory_usage, #{}),
handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
?LOG(warning, "New Alarm: ~p, Alarm Info: ~p", [AlarmId, AlarmDesc]),
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(alert), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
set_alarm_(AlarmId, AlarmDesc),
{ok, State}; {ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(info, "Clear Alarm: ~p", [AlarmId]), handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
case encode_alarm({AlarmId, undefined}) of emqx_alarm:activate(high_process_memory_usage, #{pid => Pid}),
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(clear), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
clear_alarm_(AlarmId),
{ok, State}; {ok, State};
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_system_memory_usage),
{ok, State};
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_process_memory_usage),
{ok, State};
handle_event(_, State) -> handle_event(_, State) ->
{ok, State}. {ok, State}.
@ -136,69 +82,6 @@ handle_call(_Query, State) ->
{ok, {error, bad_query}, State}. {ok, {error, bad_query}, State}.
terminate(swap, _State) -> terminate(swap, _State) ->
{emqx_alarm_handler, get_alarms()}; {emqx_alarm_handler, []};
terminate(_, _) -> terminate(_, _) ->
ok. ok.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
init_tables(ExistingAlarms) ->
mnesia:clear_table(?ALARM_TAB),
lists:foreach(fun({Id, Desc}) ->
set_alarm_history(Id, Desc)
end, ExistingAlarms).
encode_alarm({AlarmId, #alarm{severity = Severity,
title = Title,
summary = Summary,
timestamp = Ts}}) ->
Descr = #{severity => Severity,
title => iolist_to_binary(Title),
summary => iolist_to_binary(Summary),
timestamp => Ts
},
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId),
desc => Descr
});
encode_alarm({AlarmId, undefined}) ->
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId)});
encode_alarm({AlarmId, AlarmDesc}) ->
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId),
desc => maybe_to_binary(AlarmDesc)
}).
alarm_msg(Topic, Payload) ->
emqx_message:make(?MODULE, 0, Topic, Payload,
#{sys => true},
#{properties => #{'Content-Type' => <<"application/json">>}}
).
topic(alert) ->
emqx_topic:systop(<<"alarms/alert">>);
topic(clear) ->
emqx_topic:systop(<<"alarms/clear">>).
maybe_to_binary(Data) when is_binary(Data) ->
Data;
maybe_to_binary(Data) ->
iolist_to_binary(io_lib:format("~p", [Data])).
set_alarm_(Id, Desc) ->
mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = Desc}).
clear_alarm_(Id) ->
case mnesia:dirty_read(?ALARM_TAB, Id) of
[#common_alarm{desc = Desc}] ->
set_alarm_history(Id, Desc),
mnesia:dirty_delete(?ALARM_TAB, Id);
[] -> ok
end.
set_alarm_history(Id, Desc) ->
His = #alarm_history{id = Id,
desc = Desc,
clear_at = erlang:system_time(millisecond)},
mnesia:dirty_write(?ALARM_HISTORY_TAB, His).

View File

@ -130,7 +130,6 @@ handle_cast({detected, #flapping{clientid = ClientId,
reason = <<"flapping is detected">>, reason = <<"flapping is detected">>,
at = Now, at = Now,
until = Now + Interval}, until = Now + Interval},
alarm_handler:set_alarm({{flapping_detected, ClientId}, Banned}),
emqx_banned:create(Banned); emqx_banned:create(Banned);
false -> false ->
?LOG(warning, "~s(~s) disconnected ~w times in ~wms", ?LOG(warning, "~s(~s) disconnected ~w times in ~wms",

View File

@ -83,13 +83,13 @@ set_mem_check_interval(Seconds) ->
memsup:set_check_interval(Seconds div 60). memsup:set_check_interval(Seconds div 60).
get_sysmem_high_watermark() -> get_sysmem_high_watermark() ->
memsup:get_sysmem_high_watermark() / 100. memsup:get_sysmem_high_watermark().
set_sysmem_high_watermark(Float) -> set_sysmem_high_watermark(Float) ->
memsup:set_sysmem_high_watermark(Float). memsup:set_sysmem_high_watermark(Float).
get_procmem_high_watermark() -> get_procmem_high_watermark() ->
memsup:get_procmem_high_watermark() / 100. memsup:get_procmem_high_watermark().
set_procmem_high_watermark(Float) -> set_procmem_high_watermark(Float) ->
memsup:set_procmem_high_watermark(Float). memsup:set_procmem_high_watermark(Float).
@ -102,14 +102,10 @@ call(Req) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Opts]) -> init([Opts]) ->
set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)), {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts),
set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)), cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts),
set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)), cpu_check_interval => proplists:get_value(cpu_check_interval, Opts),
{ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts, 0.80), timer => undefined})}.
cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts, 0.60),
cpu_check_interval => proplists:get_value(cpu_check_interval, Opts, 60),
timer => undefined,
is_cpu_alarm_set => false})}.
handle_call(get_cpu_check_interval, _From, State) -> handle_call(get_cpu_check_interval, _From, State) ->
{reply, maps:get(cpu_check_interval, State, undefined), State}; {reply, maps:get(cpu_check_interval, State, undefined), State};
@ -139,21 +135,21 @@ handle_cast(Msg, State) ->
handle_info({timeout, Timer, check}, State = #{timer := Timer, handle_info({timeout, Timer, check}, State = #{timer := Timer,
cpu_high_watermark := CPUHighWatermark, cpu_high_watermark := CPUHighWatermark,
cpu_low_watermark := CPULowWatermark, cpu_low_watermark := CPULowWatermark}) ->
is_cpu_alarm_set := IsCPUAlarmSet}) ->
NState = NState =
case emqx_vm:cpu_util() of %% TODO: should be improved? case emqx_vm:cpu_util() of %% TODO: should be improved?
0 -> State#{timer := undefined}; 0 ->
State#{timer := undefined};
Busy when Busy / 100 >= CPUHighWatermark -> Busy when Busy / 100 >= CPUHighWatermark ->
alarm_handler:set_alarm({cpu_high_watermark, Busy}), emqx_alarm:activate(high_cpu_usage, #{usage => Busy,
ensure_check_timer(State#{is_cpu_alarm_set := true}); high_watermark => CPUHighWatermark,
Busy when Busy / 100 < CPULowWatermark -> low_watermark => CPULowWatermark}),
case IsCPUAlarmSet of ensure_check_timer(State);
true -> alarm_handler:clear_alarm(cpu_high_watermark); Busy when Busy / 100 =< CPULowWatermark ->
false -> ok emqx_alarm:deactivate(high_cpu_usage),
end, ensure_check_timer(State);
ensure_check_timer(State#{is_cpu_alarm_set := false}); _Busy ->
_Busy -> ensure_check_timer(State) ensure_check_timer(State)
end, end,
{noreply, NState}; {noreply, NState};

View File

@ -165,9 +165,9 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_partition_event({partition, {occurred, Node}}) -> handle_partition_event({partition, {occurred, Node}}) ->
alarm_handler:set_alarm({partitioned, Node}); emqx_alarm:activate(partition, #{occurred => Node});
handle_partition_event({partition, {healed, _Node}}) -> handle_partition_event({partition, {healed, _Node}}) ->
alarm_handler:clear_alarm(partitioned). emqx_alarm:deactivate(partition).
suppress(Key, SuccFun, State = #{events := Events}) -> suppress(Key, SuccFun, State = #{events := Events}) ->
case lists:member(Key, Events) of case lists:member(Key, Events) of

View File

@ -27,6 +27,7 @@ start_link() ->
init([]) -> init([]) ->
Childs = [child_spec(emqx_sys), Childs = [child_spec(emqx_sys),
child_spec(emqx_alarm),
child_spec(emqx_sys_mon, [config(sysmon)]), child_spec(emqx_sys_mon, [config(sysmon)]),
child_spec(emqx_os_mon, [config(os_mon)]), child_spec(emqx_os_mon, [config(os_mon)]),
child_spec(emqx_vm_mon, [config(vm_mon)])], child_spec(emqx_vm_mon, [config(vm_mon)])],

View File

@ -78,8 +78,7 @@
, route_entry/0 , route_entry/0
]). ]).
-export_type([ alarm/0 -export_type([ plugin/0
, plugin/0
, banned/0 , banned/0
, command/0 , command/0
]). ]).
@ -194,7 +193,6 @@
-type(route() :: #route{}). -type(route() :: #route{}).
-type(sub_group() :: tuple() | binary()). -type(sub_group() :: tuple() | binary()).
-type(route_entry() :: {topic(), node()} | {topic, sub_group()}). -type(route_entry() :: {topic(), node()} | {topic, sub_group()}).
-type(alarm() :: #alarm{}).
-type(plugin() :: #plugin{}). -type(plugin() :: #plugin{}).
-type(command() :: #command{}). -type(command() :: #command{}).

View File

@ -75,11 +75,10 @@ call(Req) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Opts]) -> init([Opts]) ->
{ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts, 30), {ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts),
process_high_watermark => proplists:get_value(process_high_watermark, Opts, 0.70), process_high_watermark => proplists:get_value(process_high_watermark, Opts),
process_low_watermark => proplists:get_value(process_low_watermark, Opts, 0.50), process_low_watermark => proplists:get_value(process_low_watermark, Opts),
timer => undefined, timer => undefined})}.
is_process_alarm_set => false})}.
handle_call(get_check_interval, _From, State) -> handle_call(get_check_interval, _From, State) ->
{reply, maps:get(check_interval, State, undefined), State}; {reply, maps:get(check_interval, State, undefined), State};
@ -110,22 +109,19 @@ handle_cast(Msg, State) ->
handle_info({timeout, Timer, check}, handle_info({timeout, Timer, check},
State = #{timer := Timer, State = #{timer := Timer,
process_high_watermark := ProcHighWatermark, process_high_watermark := ProcHighWatermark,
process_low_watermark := ProcLowWatermark, process_low_watermark := ProcLowWatermark}) ->
is_process_alarm_set := IsProcessAlarmSet}) ->
ProcessCount = erlang:system_info(process_count), ProcessCount = erlang:system_info(process_count),
NState = case ProcessCount / erlang:system_info(process_limit) of case ProcessCount / erlang:system_info(process_limit) * 100 of
Percent when Percent >= ProcHighWatermark -> Percent when Percent >= ProcHighWatermark ->
alarm_handler:set_alarm({too_many_processes, ProcessCount}), emqx_alarm:activate(too_many_processes, #{usage => Percent,
State#{is_process_alarm_set := true}; high_watermark => ProcHighWatermark,
Percent when Percent < ProcLowWatermark -> low_watermark => ProcLowWatermark});
case IsProcessAlarmSet of Percent when Percent < ProcLowWatermark ->
true -> alarm_handler:clear_alarm(too_many_processes); emqx_alarm:deactivate(too_many_processes);
false -> ok _Precent ->
end, ok
State#{is_process_alarm_set := false}; end,
_Precent -> State {noreply, ensure_check_timer(State)};
end,
{noreply, ensure_check_timer(NState)};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "[VM_MON] Unexpected info: ~p", [Info]), ?LOG(error, "[VM_MON] Unexpected info: ~p", [Info]),
@ -143,4 +139,3 @@ code_change(_OldVsn, State, _Extra) ->
ensure_check_timer(State = #{check_interval := Interval}) -> ensure_check_timer(State = #{check_interval := Interval}) ->
State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.

68
test/emqx_alarm_SUITE.erl Normal file
View File

@ -0,0 +1,68 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_alarm_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_alarm(_) ->
ok = emqx_alarm:activate(unknown_alarm),
{error, already_existed} = emqx_alarm:activate(unknown_alarm),
?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms())),
?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(activated))),
?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))),
ok = emqx_alarm:deactivate(unknown_alarm),
{error, not_found} = emqx_alarm:deactivate(unknown_alarm),
?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(activated))),
?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))),
emqx_alarm:delete_all_deactivated_alarms(),
?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))).
t_deactivate_all_alarms(_) ->
ok = emqx_alarm:activate(unknown_alarm),
{error, already_existed} = emqx_alarm:activate(unknown_alarm),
?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(activated))),
emqx_alarm:deactivate_all_alarms(),
?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))),
emqx_alarm:delete_all_deactivated_alarms(),
?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))).
get_alarm(Name, [Alarm = #{name := Name} | _More]) ->
Alarm;
get_alarm(Name, [_Alarm | More]) ->
get_alarm(Name, More);
get_alarm(_Name, []) ->
{error, not_found}.

View File

@ -1,113 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_alarm_handler_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
set_special_configs(emqx) ->
AclFile = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"),
application:set_env(emqx, acl_file, AclFile);
set_special_configs(_App) -> ok.
t_alarm_handler(_) ->
with_connection(
fun(Sock) ->
emqtt_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data),
Topic1 = emqx_topic:systop(<<"alarms/alert">>),
Topic2 = emqx_topic:systop(<<"alarms/clear">>),
SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
emqtt_sock:send(Sock,
raw_send_serialize(
?SUBSCRIBE_PACKET(
1,
[{Topic1, SubOpts},
{Topic2, SubOpts}])
)),
{ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2, 2]), <<>>, _} = raw_recv_parse(Data2),
alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test,
severity = error,
title="alarm title",
summary="alarm summary"
}}),
{ok, Data3} = gen_tcp:recv(Sock, 0),
{ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), <<>>, _} = raw_recv_parse(Data3),
?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())),
alarm_handler:clear_alarm(alarm_for_test),
{ok, Data4} = gen_tcp:recv(Sock, 0),
{ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), <<>>, _} = raw_recv_parse(Data4),
?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())),
emqx_alarm_handler:mnesia(copy),
?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms(history))),
alarm_handler:clear_alarm(not_exist),
gen_event:start({local, alarm_handler_2}, []),
gen_event:add_handler(alarm_handler_2, emqx_alarm_handler, []),
?assertEqual({error,bad_query}, gen_event:call(alarm_handler_2, emqx_alarm_handler, bad_query)),
?assertEqual(ok, gen_event:notify(alarm_handler_2, ignored)),
gen_event:stop(alarm_handler_2)
end).
with_connection(DoFun) ->
{ok, Sock} = emqtt_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw}, {active, false}],
3000),
try
DoFun(Sock)
after
emqtt_sock:close(Sock)
end.
raw_send_serialize(Packet) ->
emqx_frame:serialize(Packet, ?MQTT_PROTO_V5).
raw_recv_parse(Bin) ->
emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ?MQTT_PROTO_V5})).

View File

@ -42,26 +42,26 @@ end_per_suite(_Config) ->
t_api(_) -> t_api(_) ->
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
{ok, _} = emqx_os_mon:start_link([{cpu_check_interval, 1}, {ok, _} = emqx_os_mon:start_link([{cpu_check_interval, 1},
{cpu_high_watermark, 0.05}, {cpu_high_watermark, 5},
{cpu_low_watermark, 0.80}, {cpu_low_watermark, 80},
{mem_check_interval, 60}, {mem_check_interval, 60},
{sysmem_high_watermark, 0.70}, {sysmem_high_watermark, 70},
{procmem_high_watermark, 0.05}]), {procmem_high_watermark, 5}]),
?assertEqual(1, emqx_os_mon:get_cpu_check_interval()), ?assertEqual(1, emqx_os_mon:get_cpu_check_interval()),
?assertEqual(0.05, emqx_os_mon:get_cpu_high_watermark()), ?assertEqual(5, emqx_os_mon:get_cpu_high_watermark()),
?assertEqual(0.80, emqx_os_mon:get_cpu_low_watermark()), ?assertEqual(80, emqx_os_mon:get_cpu_low_watermark()),
?assertEqual(60, emqx_os_mon:get_mem_check_interval()), ?assertEqual(60, emqx_os_mon:get_mem_check_interval()),
?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()), ?assertEqual(70, emqx_os_mon:get_sysmem_high_watermark()),
?assertEqual(0.05, emqx_os_mon:get_procmem_high_watermark()), ?assertEqual(5, emqx_os_mon:get_procmem_high_watermark()),
% timer:sleep(2000), % timer:sleep(2000),
% ?assertEqual(true, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())), % ?assertEqual(true, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())),
emqx_os_mon:set_cpu_check_interval(0.05), emqx_os_mon:set_cpu_check_interval(0.05),
emqx_os_mon:set_cpu_high_watermark(0.8), emqx_os_mon:set_cpu_high_watermark(80),
emqx_os_mon:set_cpu_low_watermark(0.75), emqx_os_mon:set_cpu_low_watermark(75),
?assertEqual(0.05, emqx_os_mon:get_cpu_check_interval()), ?assertEqual(0.05, emqx_os_mon:get_cpu_check_interval()),
?assertEqual(0.8, emqx_os_mon:get_cpu_high_watermark()), ?assertEqual(80, emqx_os_mon:get_cpu_high_watermark()),
?assertEqual(0.75, emqx_os_mon:get_cpu_low_watermark()), ?assertEqual(75, emqx_os_mon:get_cpu_low_watermark()),
% timer:sleep(3000), % timer:sleep(3000),
% ?assertEqual(false, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())), % ?assertEqual(false, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())),
?assertEqual(ignored, gen_server:call(emqx_os_mon, ignored)), ?assertEqual(ignored, gen_server:call(emqx_os_mon, ignored)),

View File

@ -58,19 +58,19 @@ t_api(_) ->
end), end),
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
{ok, _} = emqx_vm_mon:start_link([{check_interval, 1}, {ok, _} = emqx_vm_mon:start_link([{check_interval, 1},
{process_high_watermark, 0.8}, {process_high_watermark, 80},
{process_low_watermark, 0.75}]), {process_low_watermark, 75}]),
timer:sleep(emqx_vm_mon:get_check_interval() * 1000), timer:sleep(emqx_vm_mon:get_check_interval() * 1000),
emqx_vm_mon:set_process_high_watermark(0.0), emqx_vm_mon:set_process_high_watermark(0),
emqx_vm_mon:set_process_low_watermark(0.6), emqx_vm_mon:set_process_low_watermark(60),
?assertEqual(0.0, emqx_vm_mon:get_process_high_watermark()), ?assertEqual(0, emqx_vm_mon:get_process_high_watermark()),
?assertEqual(0.6, emqx_vm_mon:get_process_low_watermark()), ?assertEqual(60, emqx_vm_mon:get_process_low_watermark()),
?WAIT({Ref, set_alarm, {too_many_processes, _Count}}, 2000), ?WAIT({Ref, set_alarm, {too_many_processes, _Count}}, 2000),
?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
emqx_vm_mon:set_process_high_watermark(0.8), emqx_vm_mon:set_process_high_watermark(80),
emqx_vm_mon:set_process_low_watermark(0.75), emqx_vm_mon:set_process_low_watermark(75),
?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()), ?assertEqual(80, emqx_vm_mon:get_process_high_watermark()),
?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()), ?assertEqual(75, emqx_vm_mon:get_process_low_watermark()),
?WAIT({Ref, clear_alarm, too_many_processes}, 3000), ?WAIT({Ref, clear_alarm, too_many_processes}, 3000),
?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
emqx_vm_mon:set_check_interval(20), emqx_vm_mon:set_check_interval(20),