diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f3e1651d4..6e8f00329 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -698,6 +698,7 @@ init_resource(Module, OnCreate, ResId, Config) -> ResParams = #resource_params{id = ResId, params = Params, status = #{is_alive => true}}, + maybe_resource_down(ResId, clear), emqx_rule_registry:add_resource_params(ResParams). init_resource_with_retrier(Module, OnCreate, ResId, Config) -> @@ -706,6 +707,7 @@ init_resource_with_retrier(Module, OnCreate, ResId, Config) -> ResParams = #resource_params{id = ResId, params = Params, status = #{is_alive => true}}, + maybe_resource_down(ResId, clear), emqx_rule_registry:add_resource_params(ResParams). init_action(Module, OnCreate, ActionInstId, Params) -> @@ -726,12 +728,10 @@ init_action(Module, OnCreate, ActionInstId, Params) -> end. clear_resource(_Module, undefined, ResId, Type) -> - Name = alarm_name_of_resource_down(Type, ResId), - _ = emqx_alarm:deactivate(Name), + clear_resource_down(ResId, Type), ok = emqx_rule_registry:remove_resource_params(ResId); clear_resource(Module, Destroy, ResId, Type) -> - Name = alarm_name_of_resource_down(Type, ResId), - _ = emqx_alarm:deactivate(Name), + clear_resource_down(ResId, Type), case emqx_rule_registry:find_resource_params(ResId) of {ok, #resource_params{params = Params}} -> ?RAISE(Module:Destroy(ResId, Params), @@ -779,14 +779,10 @@ fetch_resource_status(Module, OnStatus, ResId) -> case Module:OnStatus(ResId, Params) of #{is_alive := LastIsAlive} = Status -> Status; #{is_alive := true} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:deactivate(Name), + maybe_resource_down(ResId, clear), Status; #{is_alive := false} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:activate(Name, #{id => ResId, type => Type}), + maybe_resource_down(ResId, alarm), Status end catch _Error:Reason:STrace -> @@ -824,9 +820,23 @@ refresh_actions(Actions, Pred) -> end end, Actions). -find_type(ResId) -> - {ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId), - {ok, Type}. +maybe_resource_down(ResId, AlarmOrClear) -> + case emqx_rule_registry:find_resource(ResId) of + {ok, #resource{type = Type}} -> + _ = case AlarmOrClear of + alarm -> alarm_resource_down(ResId, Type); + clear -> clear_resource_down(ResId, Type) + end, + ok; + not_found -> + ok + end. + +alarm_resource_down(ResId, Type) -> + emqx_alarm:activate(alarm_name_of_resource_down(Type, ResId), + #{id => ResId, type => Type}). +clear_resource_down(ResId, Type) -> + emqx_alarm:deactivate(alarm_name_of_resource_down(Type, ResId)). alarm_name_of_resource_down(Type, ResId) -> unicode:characters_to_binary(io_lib:format("resource/~ts/~ts/down", [Type, ResId])). diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index fac76d235..b3a44ff34 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -89,13 +89,17 @@ end_per_testcase(_, Config) -> Config. common_init_per_testcase() -> + AlarmOpts = [{actions, [log, publish]}, {size_limit, 1000}, {validity_period, 86400}], + {ok, _} = emqx_alarm:start_link(AlarmOpts), {ok, _} = emqx_rule_monitor:start_link(). common_end_per_testcases() -> + ok = emqx_alarm:stop(), emqx_rule_monitor:erase_retry_interval(), emqx_rule_monitor:stop(). t_restart_resource(_) -> + ct:pal("emqx_alarm: ~p", [sys:get_state(whereis(emqx_alarm))]), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1, diff --git a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl index ae3f72254..c924b1d71 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl @@ -21,7 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). --define(PORT, 9876). +-define(PORT, 29876). all() -> emqx_ct:all(?MODULE).