diff --git a/include/emqx.hrl b/include/emqx.hrl index 5a10b84e6..e13ab8215 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -112,19 +112,6 @@ node_id :: trie_node_id() }). -%%-------------------------------------------------------------------- -%% Alarm -%%-------------------------------------------------------------------- - --record(alarm, { - id :: binary(), - severity :: notice | warning | error | critical, - title :: iolist(), - summary :: iolist(), - %% Timestamp (Unit: millisecond) - timestamp :: integer() | undefined - }). - %%-------------------------------------------------------------------- %% Plugin %%-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index fee6e196d..050d9282b 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2103,7 +2103,7 @@ end}. {translation, "emqx.os_mon", fun(Conf) -> Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf), - [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] + [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- @@ -2126,5 +2126,5 @@ end}. {translation, "emqx.vm_mon", fun(Conf) -> Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf), - [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] + [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs] end}. diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl new file mode 100644 index 000000000..fe98ec21d --- /dev/null +++ b/src/emqx_alarm.erl @@ -0,0 +1,278 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_alarm). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("logger.hrl"). + +-logger_header("[Alarm Handler]"). + +-export([start_link/0, stop/0]). + +%% API +-export([ activate/1 + , activate/2 + , deactivate/1 + , delete_all_deactivated_alarms/0 + , get_alarms/0 + , get_alarms/1 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(alarm, { + name :: binary() | atom(), + + details :: map() | list(), + + message :: binary(), + + activate_at :: integer(), + + deactivate_at :: integer() | infinity, + + activated :: boolean() + }). + +-record(state, { + actions :: [action()] + }). + +-type action() :: log | publish | event. + +-define(TAB, emqx_alarm). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec(start_link() -> emqx_types:startlink_ret()). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +stop() -> + gen_server:stop(?MODULE). + +activate(Name) -> + activate(Name, #{}). + +activate(Name, Details) -> + gen_server:call(?MODULE, {activate_alarm, Name, Details}). + +deactivate(Name) -> + gen_server:call(?MODULE, {deactivate_alarm, Name}). + +delete_all_deactivated_alarms() -> + gen_server:call(?MODULE, delete_all_deactivated_alarms). + +get_alarms() -> + get_alarms(all). + +get_alarms(all) -> + gen_server:call(?MODULE, {get_alarms, all}); + +get_alarms(activated) -> + gen_server:call(?MODULE, {get_alarms, activated}); + +get_alarms(deactivated) -> + gen_server:call(?MODULE, {get_alarms, deactivated}). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + Opts = [{actions, [log, publish]}], + init([Opts]); +init([Opts]) -> + ok = ekka_mnesia:create_table(?TAB, + [{type, bag}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, alarm}, + {attributes, record_info(fields, alarm)}]), + Actions = proplists:get_value(actions, Opts, [log, publish]), + deactivate_all_alarms(), + {ok, #state{actions = Actions}}. + +handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) -> + case get(Name) of + set -> + {reply, {error, already_existed}, State}; + undefined -> + Alarm = #alarm{name = Name, + details = Details, + message = normalize_message(Name, Details), + activate_at = erlang:system_time(millisecond), + deactivate_at = infinity, + activated = true}, + mnesia:dirty_write(?TAB, Alarm), + put(Name, set), + do_actions(activate, Alarm, Actions), + {reply, ok, State} + end; + +handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions}) -> + case get(Name) of + set -> + MatchSpec = [{#alarm{name = '$1', activated = '$2', _ = '_'}, + [{'==', '$1', Name}, {'==', '$2', true}], + ['$_']}], + case mnesia:dirty_select(?TAB, MatchSpec) of + [] -> + erase(Name), + {reply, {error, not_found}, State}; + [Alarm | _] -> + NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond), + activated = false}, + mnesia:dirty_delete_object(?TAB, Alarm), + mnesia:dirty_write(?TAB, NAlarm), + erase(Name), + do_actions(deactivate, NAlarm, Actions), + {reply, ok, State} + end; + undefined -> + {reply, {error, not_found}, State} + end; + +handle_call(delete_all_deactivated_alarms, _From, State) -> + MatchSpec = [{#alarm{activated = '$1', _ = '_'}, + [{'==', '$1', false}], + ['$_']}], + lists:foreach(fun(Alarm) -> + mnesia:dirty_delete_object(?TAB, Alarm) + end, mnesia:dirty_select(?TAB, MatchSpec)), + {reply, ok, State}; + +handle_call({get_alarms, all}, _From, State) -> + Alarms = ets:tab2list(?TAB), + {reply, [normalize(Alarm) || Alarm <- Alarms], State}; + +handle_call({get_alarms, activated}, _From, State) -> + MatchSpec = [{#alarm{activated = '$1', _ = '_'}, + [{'==', '$1', true}], + ['$_']}], + Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)], + {reply, Alarms, State}; + +handle_call({get_alarms, deactivated}, _From, State) -> + MatchSpec = [{#alarm{activated = '$1', _ = '_'}, + [{'==', '$1', false}], + ['$_']}], + Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)], + {reply, Alarms, State}; + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +deactivate_all_alarms() -> + MatchSpec = [{#alarm{activated = '$1', _ = '_'}, + [{'==', '$1', true}], + ['$_']}], + case mnesia:dirty_select(?TAB, MatchSpec) of + [] -> + ok; + Alarms -> + lists:foreach(fun(Alarm) -> + NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond), + activated = false}, + mnesia:dirty_delete_object(?TAB, Alarm), + mnesia:dirty_write(?TAB, NAlarm) + end, Alarms) + end. + +do_actions(_, _, []) -> + ok; +do_actions(activate, Alarm = #alarm{name = Name, message = Message}, [log | More]) -> + ?LOG(warning, "Alarm ~p is activated, ~s", [Name, Message]), + do_actions(activate, Alarm, More); +do_actions(deactivate, Alarm = #alarm{name = Name}, [log | More]) -> + ?LOG(warning, "Alarm ~p is deactivated", [Name]), + do_actions(deactivate, Alarm, More); +do_actions(Operation, Alarm, [publish | More]) -> + Topic = topic(Operation), + {ok, Payload} = encode_to_json(Alarm), + Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true}, + #{properties => #{'Content-Type' => <<"application/json">>}}), + emqx_broker:safe_publish(Message), + do_actions(Operation, Alarm, More). + +encode_to_json(Alarm) -> + emqx_json:safe_encode(normalize(Alarm)). + +topic(activate) -> + emqx_topic:systop(<<"alarms/activate">>); +topic(deactivate) -> + emqx_topic:systop(<<"alarms/deactivate">>). + +normalize(#alarm{name = Name, + details = Details, + message = Message, + activate_at = ActivateAt, + deactivate_at = DeactivateAt, + activated = Activated}) -> + #{name => Name, + details => Details, + message => Message, + activate_at => ActivateAt, + deactivate_at => DeactivateAt, + activated => Activated}. + +normalize_message(high_system_memory_usage, _Details) -> + list_to_binary(io_lib:format("System memory usage is higher than ~p%", [emqx_os_mon:get_sysmem_high_watermark()])); +normalize_message(high_process_memory_usage, _Details) -> + list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [emqx_os_mon:get_procmem_high_watermark()])); +normalize_message(high_cpu_usage, #{usage := Usage}) -> + list_to_binary(io_lib:format("~p% cpu usage", [Usage])); +normalize_message(too_many_processes, #{high_watermark := HightWatermark}) -> + list_to_binary(io_lib:format("High_watermark: ~p%", [HightWatermark])); +normalize_message(_Name, _UnknownDetails) -> + <<"Unknown">>. + diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 77d2b60a0..55f103342 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -23,12 +23,6 @@ -logger_header("[Alarm Handler]"). -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - %% gen_event callbacks -export([ init/1 , handle_event/2 @@ -39,38 +33,8 @@ -export([ load/0 , unload/0 - , get_alarms/0 - , get_alarms/1 ]). --record(common_alarm, {id, desc}). --record(alarm_history, {id, desc, clear_at}). - --define(ALARM_TAB, emqx_alarm). --define(ALARM_HISTORY_TAB, emqx_alarm_history). - -%%-------------------------------------------------------------------- -%% Mnesia bootstrap -%%-------------------------------------------------------------------- - -mnesia(boot) -> - ok = ekka_mnesia:create_table(?ALARM_TAB, [ - {type, set}, - {disc_copies, [node()]}, - {local_content, true}, - {record_name, common_alarm}, - {attributes, record_info(fields, common_alarm)}]), - ok = ekka_mnesia:create_table(?ALARM_HISTORY_TAB, [ - {type, set}, - {disc_copies, [node()]}, - {local_content, true}, - {record_name, alarm_history}, - {attributes, record_info(fields, alarm_history)}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?ALARM_TAB), - ok = ekka_mnesia:copy_table(?ALARM_HISTORY_TAB). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -82,50 +46,32 @@ load() -> unload() -> gen_event:swap_handler(alarm_handler, {?MODULE, swap}, {alarm_handler, []}). -get_alarms() -> - get_alarms(present). - -get_alarms(present) -> - Alarms = ets:tab2list(?ALARM_TAB), - [{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms]; -get_alarms(history) -> - Alarms = ets:tab2list(?ALARM_HISTORY_TAB), - [{Id, Desc, ClearAt} || #alarm_history{id = Id, desc = Desc, clear_at = ClearAt} <- Alarms]. - %%-------------------------------------------------------------------- %% gen_event callbacks %%-------------------------------------------------------------------- -init({_Args, {alarm_handler, ExistingAlarms}}) -> - init_tables(ExistingAlarms), +init({_Args, {alarm_handler, _ExistingAlarms}}) -> {ok, []}; init(_) -> - init_tables([]), {ok, []}. -handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) -> - handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = erlang:system_time(millisecond)}}}, State); -handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> - ?LOG(warning, "New Alarm: ~p, Alarm Info: ~p", [AlarmId, AlarmDesc]), - case encode_alarm(Alarm) of - {ok, Json} -> - emqx_broker:safe_publish(alarm_msg(topic(alert), Json)); - {error, Reason} -> - ?LOG(error, "Failed to encode alarm: ~p", [Reason]) - end, - set_alarm_(AlarmId, AlarmDesc), +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + emqx_alarm:activate(high_system_memory_usage, #{}), {ok, State}; -handle_event({clear_alarm, AlarmId}, State) -> - ?LOG(info, "Clear Alarm: ~p", [AlarmId]), - case encode_alarm({AlarmId, undefined}) of - {ok, Json} -> - emqx_broker:safe_publish(alarm_msg(topic(clear), Json)); - {error, Reason} -> - ?LOG(error, "Failed to encode alarm: ~p", [Reason]) - end, - clear_alarm_(AlarmId), + +handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> + emqx_alarm:activate(high_process_memory_usage, #{pid => Pid}), {ok, State}; + +handle_event({clear_alarm, system_memory_high_watermark}, State) -> + emqx_alarm:deactivate(high_system_memory_usage), + {ok, State}; + +handle_event({clear_alarm, process_memory_high_watermark}, State) -> + emqx_alarm:deactivate(high_process_memory_usage), + {ok, State}; + handle_event(_, State) -> {ok, State}. @@ -136,69 +82,6 @@ handle_call(_Query, State) -> {ok, {error, bad_query}, State}. terminate(swap, _State) -> - {emqx_alarm_handler, get_alarms()}; + {emqx_alarm_handler, []}; terminate(_, _) -> - ok. - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - -init_tables(ExistingAlarms) -> - mnesia:clear_table(?ALARM_TAB), - lists:foreach(fun({Id, Desc}) -> - set_alarm_history(Id, Desc) - end, ExistingAlarms). - -encode_alarm({AlarmId, #alarm{severity = Severity, - title = Title, - summary = Summary, - timestamp = Ts}}) -> - Descr = #{severity => Severity, - title => iolist_to_binary(Title), - summary => iolist_to_binary(Summary), - timestamp => Ts - }, - emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId), - desc => Descr - }); - -encode_alarm({AlarmId, undefined}) -> - emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId)}); -encode_alarm({AlarmId, AlarmDesc}) -> - emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId), - desc => maybe_to_binary(AlarmDesc) - }). - -alarm_msg(Topic, Payload) -> - emqx_message:make(?MODULE, 0, Topic, Payload, - #{sys => true}, - #{properties => #{'Content-Type' => <<"application/json">>}} - ). - -topic(alert) -> - emqx_topic:systop(<<"alarms/alert">>); -topic(clear) -> - emqx_topic:systop(<<"alarms/clear">>). - -maybe_to_binary(Data) when is_binary(Data) -> - Data; -maybe_to_binary(Data) -> - iolist_to_binary(io_lib:format("~p", [Data])). - -set_alarm_(Id, Desc) -> - mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = Desc}). - -clear_alarm_(Id) -> - case mnesia:dirty_read(?ALARM_TAB, Id) of - [#common_alarm{desc = Desc}] -> - set_alarm_history(Id, Desc), - mnesia:dirty_delete(?ALARM_TAB, Id); - [] -> ok - end. - -set_alarm_history(Id, Desc) -> - His = #alarm_history{id = Id, - desc = Desc, - clear_at = erlang:system_time(millisecond)}, - mnesia:dirty_write(?ALARM_HISTORY_TAB, His). + ok. \ No newline at end of file diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 36323d90f..0c1b60ddd 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -130,7 +130,6 @@ handle_cast({detected, #flapping{clientid = ClientId, reason = <<"flapping is detected">>, at = Now, until = Now + Interval}, - alarm_handler:set_alarm({{flapping_detected, ClientId}, Banned}), emqx_banned:create(Banned); false -> ?LOG(warning, "~s(~s) disconnected ~w times in ~wms", diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 6f10a7cfe..33a6038f7 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -83,13 +83,13 @@ set_mem_check_interval(Seconds) -> memsup:set_check_interval(Seconds div 60). get_sysmem_high_watermark() -> - memsup:get_sysmem_high_watermark() / 100. + memsup:get_sysmem_high_watermark(). set_sysmem_high_watermark(Float) -> memsup:set_sysmem_high_watermark(Float). get_procmem_high_watermark() -> - memsup:get_procmem_high_watermark() / 100. + memsup:get_procmem_high_watermark(). set_procmem_high_watermark(Float) -> memsup:set_procmem_high_watermark(Float). @@ -102,14 +102,10 @@ call(Req) -> %%-------------------------------------------------------------------- init([Opts]) -> - set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)), - set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)), - set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)), - {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts, 0.80), - cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts, 0.60), - cpu_check_interval => proplists:get_value(cpu_check_interval, Opts, 60), - timer => undefined, - is_cpu_alarm_set => false})}. + {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts), + cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts), + cpu_check_interval => proplists:get_value(cpu_check_interval, Opts), + timer => undefined})}. handle_call(get_cpu_check_interval, _From, State) -> {reply, maps:get(cpu_check_interval, State, undefined), State}; @@ -139,21 +135,21 @@ handle_cast(Msg, State) -> handle_info({timeout, Timer, check}, State = #{timer := Timer, cpu_high_watermark := CPUHighWatermark, - cpu_low_watermark := CPULowWatermark, - is_cpu_alarm_set := IsCPUAlarmSet}) -> + cpu_low_watermark := CPULowWatermark}) -> NState = case emqx_vm:cpu_util() of %% TODO: should be improved? - 0 -> State#{timer := undefined}; + 0 -> + State#{timer := undefined}; Busy when Busy / 100 >= CPUHighWatermark -> - alarm_handler:set_alarm({cpu_high_watermark, Busy}), - ensure_check_timer(State#{is_cpu_alarm_set := true}); - Busy when Busy / 100 < CPULowWatermark -> - case IsCPUAlarmSet of - true -> alarm_handler:clear_alarm(cpu_high_watermark); - false -> ok - end, - ensure_check_timer(State#{is_cpu_alarm_set := false}); - _Busy -> ensure_check_timer(State) + emqx_alarm:activate(high_cpu_usage, #{usage => Busy, + high_watermark => CPUHighWatermark, + low_watermark => CPULowWatermark}), + ensure_check_timer(State); + Busy when Busy / 100 =< CPULowWatermark -> + emqx_alarm:deactivate(high_cpu_usage), + ensure_check_timer(State); + _Busy -> + ensure_check_timer(State) end, {noreply, NState}; diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 0eabb1253..5ee22c563 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -165,9 +165,9 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- handle_partition_event({partition, {occurred, Node}}) -> - alarm_handler:set_alarm({partitioned, Node}); + emqx_alarm:activate(partition, #{occurred => Node}); handle_partition_event({partition, {healed, _Node}}) -> - alarm_handler:clear_alarm(partitioned). + emqx_alarm:deactivate(partition). suppress(Key, SuccFun, State = #{events := Events}) -> case lists:member(Key, Events) of diff --git a/src/emqx_sys_sup.erl b/src/emqx_sys_sup.erl index 9b6e46db6..1c4bbfa53 100644 --- a/src/emqx_sys_sup.erl +++ b/src/emqx_sys_sup.erl @@ -27,6 +27,7 @@ start_link() -> init([]) -> Childs = [child_spec(emqx_sys), + child_spec(emqx_alarm), child_spec(emqx_sys_mon, [config(sysmon)]), child_spec(emqx_os_mon, [config(os_mon)]), child_spec(emqx_vm_mon, [config(vm_mon)])], diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 718b757a5..739858729 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -78,8 +78,7 @@ , route_entry/0 ]). --export_type([ alarm/0 - , plugin/0 +-export_type([ plugin/0 , banned/0 , command/0 ]). @@ -194,7 +193,6 @@ -type(route() :: #route{}). -type(sub_group() :: tuple() | binary()). -type(route_entry() :: {topic(), node()} | {topic, sub_group()}). --type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). diff --git a/src/emqx_vm_mon.erl b/src/emqx_vm_mon.erl index b4d1c6008..791e700d1 100644 --- a/src/emqx_vm_mon.erl +++ b/src/emqx_vm_mon.erl @@ -75,11 +75,10 @@ call(Req) -> %%-------------------------------------------------------------------- init([Opts]) -> - {ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts, 30), - process_high_watermark => proplists:get_value(process_high_watermark, Opts, 0.70), - process_low_watermark => proplists:get_value(process_low_watermark, Opts, 0.50), - timer => undefined, - is_process_alarm_set => false})}. + {ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts), + process_high_watermark => proplists:get_value(process_high_watermark, Opts), + process_low_watermark => proplists:get_value(process_low_watermark, Opts), + timer => undefined})}. handle_call(get_check_interval, _From, State) -> {reply, maps:get(check_interval, State, undefined), State}; @@ -110,22 +109,19 @@ handle_cast(Msg, State) -> handle_info({timeout, Timer, check}, State = #{timer := Timer, process_high_watermark := ProcHighWatermark, - process_low_watermark := ProcLowWatermark, - is_process_alarm_set := IsProcessAlarmSet}) -> + process_low_watermark := ProcLowWatermark}) -> ProcessCount = erlang:system_info(process_count), - NState = case ProcessCount / erlang:system_info(process_limit) of - Percent when Percent >= ProcHighWatermark -> - alarm_handler:set_alarm({too_many_processes, ProcessCount}), - State#{is_process_alarm_set := true}; - Percent when Percent < ProcLowWatermark -> - case IsProcessAlarmSet of - true -> alarm_handler:clear_alarm(too_many_processes); - false -> ok - end, - State#{is_process_alarm_set := false}; - _Precent -> State - end, - {noreply, ensure_check_timer(NState)}; + case ProcessCount / erlang:system_info(process_limit) * 100 of + Percent when Percent >= ProcHighWatermark -> + emqx_alarm:activate(too_many_processes, #{usage => Percent, + high_watermark => ProcHighWatermark, + low_watermark => ProcLowWatermark}); + Percent when Percent < ProcLowWatermark -> + emqx_alarm:deactivate(too_many_processes); + _Precent -> + ok + end, + {noreply, ensure_check_timer(State)}; handle_info(Info, State) -> ?LOG(error, "[VM_MON] Unexpected info: ~p", [Info]), @@ -143,4 +139,3 @@ code_change(_OldVsn, State, _Extra) -> ensure_check_timer(State = #{check_interval := Interval}) -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. - diff --git a/test/emqx_alarm_SUITE.erl b/test/emqx_alarm_SUITE.erl new file mode 100644 index 000000000..e4f2c68e5 --- /dev/null +++ b/test/emqx_alarm_SUITE.erl @@ -0,0 +1,68 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_alarm_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_alarm(_) -> + ok = emqx_alarm:activate(unknown_alarm), + {error, already_existed} = emqx_alarm:activate(unknown_alarm), + ?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms())), + ?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(activated))), + ?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))), + + ok = emqx_alarm:deactivate(unknown_alarm), + {error, not_found} = emqx_alarm:deactivate(unknown_alarm), + ?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(activated))), + ?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))), + + emqx_alarm:delete_all_deactivated_alarms(), + ?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))). + +t_deactivate_all_alarms(_) -> + ok = emqx_alarm:activate(unknown_alarm), + {error, already_existed} = emqx_alarm:activate(unknown_alarm), + ?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(activated))), + + emqx_alarm:deactivate_all_alarms(), + ?assertNotEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))), + + emqx_alarm:delete_all_deactivated_alarms(), + ?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))). + +get_alarm(Name, [Alarm = #{name := Name} | _More]) -> + Alarm; +get_alarm(Name, [_Alarm | More]) -> + get_alarm(Name, More); +get_alarm(_Name, []) -> + {error, not_found}. + diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl deleted file mode 100644 index 46fd9472f..000000000 --- a/test/emqx_alarm_handler_SUITE.erl +++ /dev/null @@ -1,113 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_alarm_handler_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -init_per_suite(Config) -> - emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([], fun set_special_configs/1), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -set_special_configs(emqx) -> - AclFile = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"), - application:set_env(emqx, acl_file, AclFile); -set_special_configs(_App) -> ok. - -t_alarm_handler(_) -> - with_connection( - fun(Sock) -> - emqtt_sock:send(Sock, - raw_send_serialize( - ?CONNECT_PACKET( - #mqtt_packet_connect{ - proto_ver = ?MQTT_PROTO_V5}) - )), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data), - - Topic1 = emqx_topic:systop(<<"alarms/alert">>), - Topic2 = emqx_topic:systop(<<"alarms/clear">>), - SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}, - emqtt_sock:send(Sock, - raw_send_serialize( - ?SUBSCRIBE_PACKET( - 1, - [{Topic1, SubOpts}, - {Topic2, SubOpts}]) - )), - - {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), <<>>, _} = raw_recv_parse(Data2), - - alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test, - severity = error, - title="alarm title", - summary="alarm summary" - }}), - - {ok, Data3} = gen_tcp:recv(Sock, 0), - - {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), <<>>, _} = raw_recv_parse(Data3), - - ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())), - - alarm_handler:clear_alarm(alarm_for_test), - - {ok, Data4} = gen_tcp:recv(Sock, 0), - - {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), <<>>, _} = raw_recv_parse(Data4), - - ?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())), - - emqx_alarm_handler:mnesia(copy), - ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms(history))), - - alarm_handler:clear_alarm(not_exist), - - gen_event:start({local, alarm_handler_2}, []), - gen_event:add_handler(alarm_handler_2, emqx_alarm_handler, []), - ?assertEqual({error,bad_query}, gen_event:call(alarm_handler_2, emqx_alarm_handler, bad_query)), - ?assertEqual(ok, gen_event:notify(alarm_handler_2, ignored)), - gen_event:stop(alarm_handler_2) - end). - -with_connection(DoFun) -> - {ok, Sock} = emqtt_sock:connect({127, 0, 0, 1}, 1883, - [binary, {packet, raw}, {active, false}], - 3000), - try - DoFun(Sock) - after - emqtt_sock:close(Sock) - end. - -raw_send_serialize(Packet) -> - emqx_frame:serialize(Packet, ?MQTT_PROTO_V5). - -raw_recv_parse(Bin) -> - emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ?MQTT_PROTO_V5})). diff --git a/test/emqx_os_mon_SUITE.erl b/test/emqx_os_mon_SUITE.erl index eb85ba58b..8ce2a8839 100644 --- a/test/emqx_os_mon_SUITE.erl +++ b/test/emqx_os_mon_SUITE.erl @@ -42,26 +42,26 @@ end_per_suite(_Config) -> t_api(_) -> gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), {ok, _} = emqx_os_mon:start_link([{cpu_check_interval, 1}, - {cpu_high_watermark, 0.05}, - {cpu_low_watermark, 0.80}, + {cpu_high_watermark, 5}, + {cpu_low_watermark, 80}, {mem_check_interval, 60}, - {sysmem_high_watermark, 0.70}, - {procmem_high_watermark, 0.05}]), + {sysmem_high_watermark, 70}, + {procmem_high_watermark, 5}]), ?assertEqual(1, emqx_os_mon:get_cpu_check_interval()), - ?assertEqual(0.05, emqx_os_mon:get_cpu_high_watermark()), - ?assertEqual(0.80, emqx_os_mon:get_cpu_low_watermark()), + ?assertEqual(5, emqx_os_mon:get_cpu_high_watermark()), + ?assertEqual(80, emqx_os_mon:get_cpu_low_watermark()), ?assertEqual(60, emqx_os_mon:get_mem_check_interval()), - ?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()), - ?assertEqual(0.05, emqx_os_mon:get_procmem_high_watermark()), + ?assertEqual(70, emqx_os_mon:get_sysmem_high_watermark()), + ?assertEqual(5, emqx_os_mon:get_procmem_high_watermark()), % timer:sleep(2000), % ?assertEqual(true, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())), emqx_os_mon:set_cpu_check_interval(0.05), - emqx_os_mon:set_cpu_high_watermark(0.8), - emqx_os_mon:set_cpu_low_watermark(0.75), + emqx_os_mon:set_cpu_high_watermark(80), + emqx_os_mon:set_cpu_low_watermark(75), ?assertEqual(0.05, emqx_os_mon:get_cpu_check_interval()), - ?assertEqual(0.8, emqx_os_mon:get_cpu_high_watermark()), - ?assertEqual(0.75, emqx_os_mon:get_cpu_low_watermark()), + ?assertEqual(80, emqx_os_mon:get_cpu_high_watermark()), + ?assertEqual(75, emqx_os_mon:get_cpu_low_watermark()), % timer:sleep(3000), % ?assertEqual(false, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())), ?assertEqual(ignored, gen_server:call(emqx_os_mon, ignored)), diff --git a/test/emqx_vm_mon_SUITE.erl b/test/emqx_vm_mon_SUITE.erl index 9dda1bf33..a93dfd5cd 100644 --- a/test/emqx_vm_mon_SUITE.erl +++ b/test/emqx_vm_mon_SUITE.erl @@ -58,19 +58,19 @@ t_api(_) -> end), gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), {ok, _} = emqx_vm_mon:start_link([{check_interval, 1}, - {process_high_watermark, 0.8}, - {process_low_watermark, 0.75}]), + {process_high_watermark, 80}, + {process_low_watermark, 75}]), timer:sleep(emqx_vm_mon:get_check_interval() * 1000), - emqx_vm_mon:set_process_high_watermark(0.0), - emqx_vm_mon:set_process_low_watermark(0.6), - ?assertEqual(0.0, emqx_vm_mon:get_process_high_watermark()), - ?assertEqual(0.6, emqx_vm_mon:get_process_low_watermark()), + emqx_vm_mon:set_process_high_watermark(0), + emqx_vm_mon:set_process_low_watermark(60), + ?assertEqual(0, emqx_vm_mon:get_process_high_watermark()), + ?assertEqual(60, emqx_vm_mon:get_process_low_watermark()), ?WAIT({Ref, set_alarm, {too_many_processes, _Count}}, 2000), ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), - emqx_vm_mon:set_process_high_watermark(0.8), - emqx_vm_mon:set_process_low_watermark(0.75), - ?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()), - ?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()), + emqx_vm_mon:set_process_high_watermark(80), + emqx_vm_mon:set_process_low_watermark(75), + ?assertEqual(80, emqx_vm_mon:get_process_high_watermark()), + ?assertEqual(75, emqx_vm_mon:get_process_low_watermark()), ?WAIT({Ref, clear_alarm, too_many_processes}, 3000), ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), emqx_vm_mon:set_check_interval(20),