feat(alarm): update the validity_period timer
This commit is contained in:
parent
14af90d0c3
commit
9cda6ab3c8
|
@ -17,6 +17,7 @@
|
|||
-module(emqx_alarm).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-behaviour(emqx_config_handler).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
@ -29,6 +30,8 @@
|
|||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
-export([handle_update_config/2]).
|
||||
|
||||
-export([ start_link/0
|
||||
, stop/0
|
||||
]).
|
||||
|
@ -75,7 +78,7 @@
|
|||
}).
|
||||
|
||||
-record(state, {
|
||||
timer = undefined :: undefined | reference()
|
||||
timer :: reference()
|
||||
}).
|
||||
|
||||
-define(ACTIVATED_ALARM, emqx_activated_alarm).
|
||||
|
@ -148,14 +151,20 @@ get_alarms(activated) ->
|
|||
get_alarms(deactivated) ->
|
||||
gen_server:call(?MODULE, {get_alarms, deactivated}).
|
||||
|
||||
handle_update_config(#{<<"validity_period">> := Period0} = NewConf, OldConf) ->
|
||||
?MODULE ! {update_timer, hocon_postprocess:duration(Period0)},
|
||||
maps:merge(OldConf, NewConf);
|
||||
handle_update_config(NewConf, OldConf) ->
|
||||
maps:merge(OldConf, NewConf).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
deactivate_all_alarms(),
|
||||
ensure_delete_timer(),
|
||||
{ok, #state{}}.
|
||||
emqx_config_handler:add_handler([alarm], ?MODULE),
|
||||
{ok, #state{timer = ensure_timer(undefined, get_validity_period())}}.
|
||||
|
||||
%% suppress dialyzer warning due to dirty read/write race condition.
|
||||
%% TODO: change from dirty_read/write to transactional.
|
||||
|
@ -215,11 +224,15 @@ handle_cast(Msg, State) ->
|
|||
?LOG(error, "Unexpected msg: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, State) ->
|
||||
ValidityPeriod = emqx_config:get([alarm, validity_period]),
|
||||
delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000),
|
||||
ensure_delete_timer(),
|
||||
{noreply, State};
|
||||
handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
|
||||
#state{timer = TRef} = State) ->
|
||||
Period = get_validity_period(),
|
||||
delete_expired_deactivated_alarms(erlang:system_time(microsecond) - Period * 1000),
|
||||
{noreply, State#state{timer = ensure_timer(TRef, Period)}};
|
||||
|
||||
handle_info({update_timer, Period}, #state{timer = TRef} = State) ->
|
||||
?LOG(warning, "update the 'validity_period' timer to ~p", [Period]),
|
||||
{noreply, State#state{timer = ensure_timer(TRef, Period)}};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
|
@ -235,6 +248,9 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
get_validity_period() ->
|
||||
timer:seconds(emqx_config:get([alarm, validity_period])).
|
||||
|
||||
deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name,
|
||||
details = Details0, message = Msg0}) ->
|
||||
SizeLimit = emqx_config:get([alarm, size_limit]),
|
||||
|
@ -290,9 +306,12 @@ clear_table(TableName) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
ensure_delete_timer() ->
|
||||
emqx_misc:start_timer(emqx_config:get([alarm, validity_period]),
|
||||
delete_expired_deactivated_alarm).
|
||||
ensure_timer(OldTRef, Period) ->
|
||||
case is_reference(OldTRef) of
|
||||
true -> _ = erlang:cancel_timer(OldTRef);
|
||||
false -> ok
|
||||
end,
|
||||
emqx_misc:start_timer(Period, delete_expired_deactivated_alarm).
|
||||
|
||||
delete_expired_deactivated_alarms(Checkpoint) ->
|
||||
delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint).
|
||||
|
|
|
@ -27,27 +27,17 @@ all() -> emqx_ct:all(?MODULE).
|
|||
|
||||
init_per_testcase(t_size_limit, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([],
|
||||
fun(emqx) ->
|
||||
application:set_env(emqx, alarm, [{actions, [log,publish]},
|
||||
{size_limit, 2},
|
||||
{validity_period, 3600}]),
|
||||
ok;
|
||||
(_) ->
|
||||
ok
|
||||
end),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_config:update_config([alarm], #{
|
||||
<<"size_limit">> => 2
|
||||
}),
|
||||
Config;
|
||||
init_per_testcase(t_validity_period, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([],
|
||||
fun(emqx) ->
|
||||
application:set_env(emqx, alarm, [{actions, [log,publish]},
|
||||
{size_limit, 1000},
|
||||
{validity_period, 1}]),
|
||||
ok;
|
||||
(_) ->
|
||||
ok
|
||||
end),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_config:update_config([alarm], #{
|
||||
<<"validity_period">> => <<"1s">>
|
||||
}),
|
||||
Config;
|
||||
init_per_testcase(_, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
|
@ -89,7 +79,7 @@ t_size_limit(_) ->
|
|||
ok = emqx_alarm:activate(b),
|
||||
ok = emqx_alarm:deactivate(b),
|
||||
?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
|
||||
?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
|
||||
?assertNotEqual({error, not_found}, get_alarm(b, emqx_alarm:get_alarms(deactivated))),
|
||||
ok = emqx_alarm:activate(c),
|
||||
ok = emqx_alarm:deactivate(c),
|
||||
?assertNotEqual({error, not_found}, get_alarm(c, emqx_alarm:get_alarms(deactivated))),
|
||||
|
|
|
@ -34,7 +34,10 @@ stop(_State) ->
|
|||
handle_update_config({update, Bridge = #{<<"name">> := Name}}, OldConf) ->
|
||||
[Bridge | remove_bridge(Name, OldConf)];
|
||||
handle_update_config({delete, Name}, OldConf) ->
|
||||
remove_bridge(Name, OldConf).
|
||||
remove_bridge(Name, OldConf);
|
||||
handle_update_config(NewConf, _OldConf) when is_list(NewConf) ->
|
||||
%% overwrite the entire config!
|
||||
NewConf.
|
||||
|
||||
remove_bridge(_Name, undefined) ->
|
||||
[];
|
||||
|
|
Loading…
Reference in New Issue