From 1ba4743213f94d2ccdcd6bc5279a0ec7cf59b29e Mon Sep 17 00:00:00 2001 From: zhouzb Date: Wed, 29 Jul 2020 15:07:29 +0800 Subject: [PATCH] refactor(alarm): new data structure and support regular cleaning of deactivated alarms --- etc/emqx.conf | 27 +++++ priv/emqx.schema | 37 ++++++- src/emqx_alarm.erl | 222 +++++++++++++++++++++++--------------- src/emqx_sys_sup.erl | 2 +- test/emqx_alarm_SUITE.erl | 55 ++++++++++ 5 files changed, 253 insertions(+), 90 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 36b50c211..6b67f34b9 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2158,4 +2158,31 @@ vm_mon.process_high_watermark = 80% ## Default: 60% vm_mon.process_low_watermark = 60% +## Specifies the actions to take when an alarm is activated +## +## Value: String +## - log +## - publish +## +## Default: log,publish +alarm.actions = log,publish + +## The maximum number of deactivated alarms +## +## Value: Integer +## +## Default: 1000 +alarm.size_limit = 1000 + +## Validity Period of deactivated alarms +## +## Value: Duration +## - h: hour +## - m: minute +## - s: second +## - ms: milliseconds +## +## Default: 24h +alarm.validity_period = 24h + {{ additional_configs }} diff --git a/priv/emqx.schema b/priv/emqx.schema index 050d9282b..335f84aa8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2102,8 +2102,12 @@ end}. ]}. {translation, "emqx.os_mon", fun(Conf) -> - Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf), - [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs] + [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)}, + {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf) * 100}, + {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf) * 100}, + {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)}, + {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf) * 100}, + {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf) * 100}] end}. %%-------------------------------------------------------------------- @@ -2125,6 +2129,31 @@ end}. ]}. {translation, "emqx.vm_mon", fun(Conf) -> - Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf), - [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs] + [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, + {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf) * 100}, + {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf) * 100}] +end}. + +%%-------------------------------------------------------------------- +%% Alarm +%%-------------------------------------------------------------------- +{mapping, "alarm.actions", "emqx.alarm", [ + {default, "log,publish"}, + {datatype, string} +]}. + +{mapping, "alarm.size_limit", "emqx.alarm", [ + {default, 1000}, + {datatype, integer} +]}. + +{mapping, "alarm.validity_period", "emqx.alarm", [ + {default, "24h"}, + {datatype, {duration, s}} +]}. + +{translation, "emqx.alarm", fun(Conf) -> + [{actions, [list_to_atom(Action) || Action <- string:tokens(cuttlefish:conf_get("alarm.actions", Conf), ",")]}, + {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)}, + {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}] end}. diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index b6d01d908..bbdfa09b7 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -23,7 +23,9 @@ -logger_header("[Alarm Handler]"). --export([start_link/0, stop/0]). +-export([ start_link/1 + , stop/0 + ]). %% API -export([ activate/1 @@ -43,27 +45,43 @@ , code_change/3 ]). --record(alarm, { +-record(activated_alarm, { name :: binary() | atom(), details :: map() | list(), message :: binary(), + activate_at :: integer() + }). + +-record(deactivated_alarm, { activate_at :: integer(), + + name :: binary() | atom(), + + details :: map() | list(), - deactivate_at :: integer() | infinity, + message :: binary(), - activated :: boolean() + deactivate_at :: integer() | infinity }). -record(state, { - actions :: [action()] + actions :: [action()], + + size_limit :: non_neg_integer(), + + validity_period :: non_neg_integer(), + + timer = undefined :: undefined | reference() }). -type action() :: log | publish | event. --define(TAB, emqx_alarm). +-define(ACTIVATED_ALARM, emqx_activated_alarm). + +-define(DEACTIVATED_ALARM, emqx_deactivated_alarm). -ifdef(TEST). -compile(export_all). @@ -74,9 +92,8 @@ %% API %%-------------------------------------------------------------------- --spec(start_link() -> emqx_types:startlink_ret()). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). stop() -> gen_server:stop(?MODULE). @@ -113,81 +130,85 @@ init([]) -> Opts = [{actions, [log, publish]}], init([Opts]); init([Opts]) -> - ok = ekka_mnesia:create_table(?TAB, - [{type, bag}, + ok = ekka_mnesia:create_table(?ACTIVATED_ALARM, + [{type, set}, {disc_copies, [node()]}, {local_content, true}, - {record_name, alarm}, - {attributes, record_info(fields, alarm)}]), - Actions = proplists:get_value(actions, Opts, [log, publish]), + {record_name, activated_alarm}, + {attributes, record_info(fields, activated_alarm)}]), + ok = ekka_mnesia:create_table(?DEACTIVATED_ALARM, + [{type, ordered_set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, deactivated_alarm}, + {attributes, record_info(fields, deactivated_alarm)}]), deactivate_all_alarms(), - {ok, #state{actions = Actions}}. + Actions = proplists:get_value(actions, Opts), + SizeLimit = proplists:get_value(size_limit, Opts), + ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)), + {ok, ensure_delete_timer(#state{actions = Actions, + size_limit = SizeLimit, + validity_period = ValidityPeriod})}. handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) -> - case get(Name) of - set -> + case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of + [#activated_alarm{name = Name}] -> {reply, {error, already_existed}, State}; - undefined -> - Alarm = #alarm{name = Name, - details = Details, - message = normalize_message(Name, Details), - activate_at = erlang:system_time(millisecond), - deactivate_at = infinity, - activated = true}, - mnesia:dirty_write(?TAB, Alarm), - put(Name, set), + [] -> + Alarm = #activated_alarm{name = Name, + details = Details, + message = normalize_message(Name, Details), + activate_at = erlang:system_time(microsecond)}, + mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), do_actions(activate, Alarm, Actions), {reply, ok, State} end; -handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions}) -> - case get(Name) of - set -> - MatchSpec = [{#alarm{name = '$1', activated = '$2', _ = '_'}, - [{'==', '$1', Name}, {'==', '$2', true}], - ['$_']}], - case mnesia:dirty_select(?TAB, MatchSpec) of - [] -> - erase(Name), - {reply, {error, not_found}, State}; - [Alarm | _] -> - NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond), - activated = false}, - mnesia:dirty_delete_object(?TAB, Alarm), - mnesia:dirty_write(?TAB, NAlarm), - erase(Name), - do_actions(deactivate, NAlarm, Actions), - {reply, ok, State} - end; - undefined -> - {reply, {error, not_found}, State} +handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions, + size_limit = SizeLimit}) -> + case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of + [] -> + {reply, {error, not_found}, State}; + [#activated_alarm{name = Name, + details = Details, + message = Message, + activate_at = ActivateAt}] -> + case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of + true -> + case mnesia:dirty_first(?DEACTIVATED_ALARM) of + '$end_of_table' -> + ok; + ActivateAt2 -> + mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + end; + false -> + ok + end, + Alarm = #deactivated_alarm{activate_at = ActivateAt, + name = Name, + details = Details, + message = Message, + deactivate_at = erlang:system_time(microsecond)}, + mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm), + do_actions(deactivate, Alarm, Actions), + {reply, ok, State} end; handle_call(delete_all_deactivated_alarms, _From, State) -> - MatchSpec = [{#alarm{activated = '$1', _ = '_'}, - [{'==', '$1', false}], - ['$_']}], - lists:foreach(fun(Alarm) -> - mnesia:dirty_delete_object(?TAB, Alarm) - end, mnesia:dirty_select(?TAB, MatchSpec)), + mnesia:clear_table(?DEACTIVATED_ALARM), {reply, ok, State}; handle_call({get_alarms, all}, _From, State) -> - Alarms = ets:tab2list(?TAB), - {reply, [normalize(Alarm) || Alarm <- Alarms], State}; + Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM) ++ ets:tab2list(?DEACTIVATED_ALARM)], + {reply, Alarms, State}; handle_call({get_alarms, activated}, _From, State) -> - MatchSpec = [{#alarm{activated = '$1', _ = '_'}, - [{'==', '$1', true}], - ['$_']}], - Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)], + Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM)], {reply, Alarms, State}; handle_call({get_alarms, deactivated}, _From, State) -> - MatchSpec = [{#alarm{activated = '$1', _ = '_'}, - [{'==', '$1', false}], - ['$_']}], - Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)], + Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?DEACTIVATED_ALARM)], {reply, Alarms, State}; handle_call(Req, _From, State) -> @@ -198,6 +219,12 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected msg: ~p", [Msg]), {noreply, State}. +handle_info({timeout, TRef, delete_expired_deactivated_alarm}, + State = #state{timer = TRef, + validity_period = ValidityPeriod}) -> + delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000), + {noreply, ensure_delete_timer(State)}; + handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. @@ -213,27 +240,43 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ deactivate_all_alarms() -> - MatchSpec = [{#alarm{activated = '$1', _ = '_'}, - [{'==', '$1', true}], - ['$_']}], - case mnesia:dirty_select(?TAB, MatchSpec) of - [] -> - ok; - Alarms -> - lists:foreach(fun(Alarm) -> - NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond), - activated = false}, - mnesia:dirty_delete_object(?TAB, Alarm), - mnesia:dirty_write(?TAB, NAlarm) - end, Alarms) + lists:foreach(fun(#activated_alarm{name = Name, + details = Details, + message = Message, + activate_at = ActivateAt}) -> + mnesia:dirty_write(?DEACTIVATED_ALARM, + #deactivated_alarm{activate_at = ActivateAt, + name = Name, + details = Details, + message = Message, + deactivate_at = erlang:system_time(microsecond)}) + end, ets:tab2list(?ACTIVATED_ALARM)), + mnesia:clear_table(?ACTIVATED_ALARM). + +ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) -> + State#state{timer = emqx_misc:start_timer(ValidityPeriod div 1, delete_expired_deactivated_alarm)}. + +delete_expired_deactivated_alarms(Checkpoint) -> + delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint). + +delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) -> + ok; +delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> + case ActivatedAt =< Checkpoint of + true -> + mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), + NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), + delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); + false -> + ok end. do_actions(_, _, []) -> ok; -do_actions(activate, Alarm = #alarm{name = Name, message = Message}, [log | More]) -> +do_actions(activate, Alarm = #activated_alarm{name = Name, message = Message}, [log | More]) -> ?LOG(warning, "Alarm ~p is activated, ~s", [Name, Message]), do_actions(activate, Alarm, More); -do_actions(deactivate, Alarm = #alarm{name = Name}, [log | More]) -> +do_actions(deactivate, Alarm = #deactivated_alarm{name = Name}, [log | More]) -> ?LOG(warning, "Alarm ~p is deactivated", [Name]), do_actions(deactivate, Alarm, More); do_actions(Operation, Alarm, [publish | More]) -> @@ -252,18 +295,27 @@ topic(activate) -> topic(deactivate) -> emqx_topic:systop(<<"alarms/deactivate">>). -normalize(#alarm{name = Name, - details = Details, - message = Message, - activate_at = ActivateAt, - deactivate_at = DeactivateAt, - activated = Activated}) -> +normalize(#activated_alarm{name = Name, + details = Details, + message = Message, + activate_at = ActivateAt}) -> + #{name => Name, + details => Details, + message => Message, + activate_at => ActivateAt, + deactivate_at => infinity, + activated => true}; +normalize(#deactivated_alarm{activate_at = ActivateAt, + name = Name, + details = Details, + message = Message, + deactivate_at = DeactivateAt}) -> #{name => Name, details => Details, message => Message, activate_at => ActivateAt, deactivate_at => DeactivateAt, - activated => Activated}. + activated => false}. normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); diff --git a/src/emqx_sys_sup.erl b/src/emqx_sys_sup.erl index 1c4bbfa53..0a246e22a 100644 --- a/src/emqx_sys_sup.erl +++ b/src/emqx_sys_sup.erl @@ -27,7 +27,7 @@ start_link() -> init([]) -> Childs = [child_spec(emqx_sys), - child_spec(emqx_alarm), + child_spec(emqx_alarm, [config(alarm)]), child_spec(emqx_sys_mon, [config(sysmon)]), child_spec(emqx_os_mon, [config(os_mon)]), child_spec(emqx_vm_mon, [config(vm_mon)])], diff --git a/test/emqx_alarm_SUITE.erl b/test/emqx_alarm_SUITE.erl index e4f2c68e5..b9cac2ab6 100644 --- a/test/emqx_alarm_SUITE.erl +++ b/test/emqx_alarm_SUITE.erl @@ -33,6 +33,38 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +init_per_testcase(t_size_limit, Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([], + fun(emqx) -> + application:set_env(emqx, alarm, [{actions, [log,publish]}, + {size_limit, 2}, + {validity_period, 3600}]), + ok; + (_) -> + ok + end), + Config; +init_per_testcase(t_validity_period, Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([], + fun(emqx) -> + application:set_env(emqx, alarm, [{actions, [log,publish]}, + {size_limit, 1000}, + {validity_period, 1}]), + ok; + (_) -> + ok + end), + Config; +init_per_testcase(_, Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_testcase(_, _Config) -> + emqx_ct_helpers:stop_apps([]). + t_alarm(_) -> ok = emqx_alarm:activate(unknown_alarm), {error, already_existed} = emqx_alarm:activate(unknown_alarm), @@ -59,6 +91,29 @@ t_deactivate_all_alarms(_) -> emqx_alarm:delete_all_deactivated_alarms(), ?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))). +t_size_limit(_) -> + ok = emqx_alarm:activate(a), + ok = emqx_alarm:deactivate(a), + ok = emqx_alarm:activate(b), + ok = emqx_alarm:deactivate(b), + ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + ok = emqx_alarm:activate(c), + ok = emqx_alarm:deactivate(c), + ?assertNotEqual({error, not_found}, get_alarm(c, emqx_alarm:get_alarms(deactivated))), + ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + emqx_alarm:delete_all_deactivated_alarms(). + +t_validity_period(_) -> + ok = emqx_alarm:activate(a), + ok = emqx_alarm:deactivate(a), + dbg:tracer(), + dbg:p(all, c), + dbg:tpl(emqx_alarm, delete_expired_deactivated_alarms, cx), + ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + ct:sleep(2000), + ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))). + get_alarm(Name, [Alarm = #{name := Name} | _More]) -> Alarm; get_alarm(Name, [_Alarm | More]) ->