From 9cda6ab3c8e2d3bd22830d184f6f80a3f8a821d9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 9 Jul 2021 19:09:44 +0800 Subject: [PATCH] feat(alarm): update the validity_period timer --- apps/emqx/src/emqx_alarm.erl | 41 ++++++++++++++----- apps/emqx/test/emqx_alarm_SUITE.erl | 28 ++++--------- .../src/emqx_data_bridge_app.erl | 5 ++- 3 files changed, 43 insertions(+), 31 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 5b2fa6f40..0eaa16507 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -17,6 +17,7 @@ -module(emqx_alarm). -behaviour(gen_server). +-behaviour(emqx_config_handler). -include("emqx.hrl"). -include("logger.hrl"). @@ -29,6 +30,8 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). +-export([handle_update_config/2]). + -export([ start_link/0 , stop/0 ]). @@ -75,7 +78,7 @@ }). -record(state, { - timer = undefined :: undefined | reference() + timer :: reference() }). -define(ACTIVATED_ALARM, emqx_activated_alarm). @@ -148,14 +151,20 @@ get_alarms(activated) -> get_alarms(deactivated) -> gen_server:call(?MODULE, {get_alarms, deactivated}). +handle_update_config(#{<<"validity_period">> := Period0} = NewConf, OldConf) -> + ?MODULE ! {update_timer, hocon_postprocess:duration(Period0)}, + maps:merge(OldConf, NewConf); +handle_update_config(NewConf, OldConf) -> + maps:merge(OldConf, NewConf). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> deactivate_all_alarms(), - ensure_delete_timer(), - {ok, #state{}}. + emqx_config_handler:add_handler([alarm], ?MODULE), + {ok, #state{timer = ensure_timer(undefined, get_validity_period())}}. %% suppress dialyzer warning due to dirty read/write race condition. %% TODO: change from dirty_read/write to transactional. @@ -215,11 +224,15 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, State) -> - ValidityPeriod = emqx_config:get([alarm, validity_period]), - delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000), - ensure_delete_timer(), - {noreply, State}; +handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, + #state{timer = TRef} = State) -> + Period = get_validity_period(), + delete_expired_deactivated_alarms(erlang:system_time(microsecond) - Period * 1000), + {noreply, State#state{timer = ensure_timer(TRef, Period)}}; + +handle_info({update_timer, Period}, #state{timer = TRef} = State) -> + ?LOG(warning, "update the 'validity_period' timer to ~p", [Period]), + {noreply, State#state{timer = ensure_timer(TRef, Period)}}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), @@ -235,6 +248,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +get_validity_period() -> + timer:seconds(emqx_config:get([alarm, validity_period])). + deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name, details = Details0, message = Msg0}) -> SizeLimit = emqx_config:get([alarm, size_limit]), @@ -290,9 +306,12 @@ clear_table(TableName) -> ok end. -ensure_delete_timer() -> - emqx_misc:start_timer(emqx_config:get([alarm, validity_period]), - delete_expired_deactivated_alarm). +ensure_timer(OldTRef, Period) -> + case is_reference(OldTRef) of + true -> _ = erlang:cancel_timer(OldTRef); + false -> ok + end, + emqx_misc:start_timer(Period, delete_expired_deactivated_alarm). delete_expired_deactivated_alarms(Checkpoint) -> delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint). diff --git a/apps/emqx/test/emqx_alarm_SUITE.erl b/apps/emqx/test/emqx_alarm_SUITE.erl index db6cdfe7f..e21dff30a 100644 --- a/apps/emqx/test/emqx_alarm_SUITE.erl +++ b/apps/emqx/test/emqx_alarm_SUITE.erl @@ -27,27 +27,17 @@ all() -> emqx_ct:all(?MODULE). init_per_testcase(t_size_limit, Config) -> emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([], - fun(emqx) -> - application:set_env(emqx, alarm, [{actions, [log,publish]}, - {size_limit, 2}, - {validity_period, 3600}]), - ok; - (_) -> - ok - end), + emqx_ct_helpers:start_apps([]), + emqx_config:update_config([alarm], #{ + <<"size_limit">> => 2 + }), Config; init_per_testcase(t_validity_period, Config) -> emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([], - fun(emqx) -> - application:set_env(emqx, alarm, [{actions, [log,publish]}, - {size_limit, 1000}, - {validity_period, 1}]), - ok; - (_) -> - ok - end), + emqx_ct_helpers:start_apps([]), + emqx_config:update_config([alarm], #{ + <<"validity_period">> => <<"1s">> + }), Config; init_per_testcase(_, Config) -> emqx_ct_helpers:boot_modules(all), @@ -89,7 +79,7 @@ t_size_limit(_) -> ok = emqx_alarm:activate(b), ok = emqx_alarm:deactivate(b), ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), - ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + ?assertNotEqual({error, not_found}, get_alarm(b, emqx_alarm:get_alarms(deactivated))), ok = emqx_alarm:activate(c), ok = emqx_alarm:deactivate(c), ?assertNotEqual({error, not_found}, get_alarm(c, emqx_alarm:get_alarms(deactivated))), diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl index 967791643..ccd98f010 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl @@ -34,7 +34,10 @@ stop(_State) -> handle_update_config({update, Bridge = #{<<"name">> := Name}}, OldConf) -> [Bridge | remove_bridge(Name, OldConf)]; handle_update_config({delete, Name}, OldConf) -> - remove_bridge(Name, OldConf). + remove_bridge(Name, OldConf); +handle_update_config(NewConf, _OldConf) when is_list(NewConf) -> + %% overwrite the entire config! + NewConf. remove_bridge(_Name, undefined) -> [];