diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 0f62a2ac7..c9f32b962 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -95,6 +95,9 @@ end end()). +-define(GET_RES_ALIVE_TIMEOUT, 60000). +-define(PROBE_RES_PREFIX, "__probe__:"). + %%------------------------------------------------------------------------------ %% Load resource/action providers from all available applications %%------------------------------------------------------------------------------ @@ -363,7 +366,7 @@ test_resource(#{type := Type} = Params) -> {ok, #resource_type{}} -> %% Resource will be deleted after test. %% Use random resource id, ensure test func will not delete the resource in used. - ResId = resource_id(), + ResId = probe_resource_id(), try case create_resource(maps:put(id, ResId, Params), no_retry) of {ok, _} -> @@ -405,7 +408,7 @@ is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) -> {ok, #resource_type{on_status = {Mod, OnStatus}}} = emqx_rule_registry:find_resource_type(ResType), case rpc:multicall(Nodes, - ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of + ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], ?GET_RES_ALIVE_TIMEOUT) of {ResL, []} -> is_resource_alive_(ResL); {_, _Error} -> @@ -420,7 +423,7 @@ is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) -> end; is_resource_alive(Nodes, ResId, _Opts = #{fetch := false}) -> try - case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of + case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], ?GET_RES_ALIVE_TIMEOUT) of {ResL, []} -> is_resource_alive_(ResL); {_, _Errors} -> @@ -532,10 +535,15 @@ refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) -> refresh_resource_status() -> lists:foreach( fun(#resource{id = ResId, type = ResType}) -> - case emqx_rule_registry:find_resource_type(ResType) of - {ok, #resource_type{on_status = {Mod, OnStatus}}} -> - fetch_resource_status(Mod, OnStatus, ResId); - _ -> ok + case is_prober(ResId) of + false -> + case emqx_rule_registry:find_resource_type(ResType) of + {ok, #resource_type{on_status = {Mod, OnStatus}}} -> + fetch_resource_status(Mod, OnStatus, ResId); + _ -> ok + end; + true -> + ok end end, emqx_rule_registry:get_resources()). @@ -662,6 +670,9 @@ ignore_lib_apps(Apps) -> resource_id() -> gen_id("resource:", fun emqx_rule_registry:find_resource/1). +probe_resource_id() -> + gen_id(?PROBE_RES_PREFIX, fun emqx_rule_registry:find_resource/1). + rule_id() -> gen_id("rule:", fun emqx_rule_registry:get_rule/1). @@ -811,4 +822,9 @@ find_type(ResId) -> {ok, Type}. alarm_name_of_resource_down(Type, ResId) -> - list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])). + unicode:characters_to_binary(io_lib:format("resource/~ts/~ts/down", [Type, ResId])). + +is_prober(<>) -> + true; +is_prober(_ResId) -> + false. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 105dcc334..2e0015998 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -62,7 +62,8 @@ groups() -> t_create_rule, t_reset_metrics, t_reset_metrics_fallbacks, - t_create_resource + t_create_resource, + t_clean_resource_alarms ]}, {actions, [], [t_inspect_action @@ -309,21 +310,29 @@ t_create_resource(_Config) -> ok. t_clean_resource_alarms(_Config) -> + lists:foreach(fun(ResId) -> + clean_resource_alarms(ResId) + end, [<<"abc">>, <<"哈喽"/utf8>>]). + +clean_resource_alarms(ResId) -> + emqx_rule_registry:register_resource_types( + [make_simple_debug_resource_type()]), ok = emqx_rule_engine:load_providers(), {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( - #{type => built_in, + #{id => ResId, + type => built_in, config => #{}, description => <<"debug resource">>}), - ?assert(true, is_binary(ResId)), Name = emqx_rule_engine:alarm_name_of_resource_down(ResId, built_in), _ = emqx_alarm:activate(Name, #{id => ResId, type => built_in}), AlarmExist = fun(#{name := AName}) -> AName == Name end, - Len = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())), - ?assert(Len == 1), + Len = length(lists:filter(AlarmExist, emqx_alarm:get_alarms(activated))), + ?assertEqual(1, Len), + emqx_rule_engine:ensure_resource_deleted(ResId), + emqx_alarm:deactivate(Name), + LenAfterRemove = length(lists:filter(AlarmExist, emqx_alarm:get_alarms(activated))), + ?assertEqual(0, LenAfterRemove), ok = emqx_rule_engine:unload_providers(), - emqx_rule_registry:remove_resource(ResId), - LenAfterRemove = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())), - ?assert(LenAfterRemove == 0), ok. %%------------------------------------------------------------------------------ diff --git a/rebar.config b/rebar.config index ae93dbc04..5462565ac 100644 --- a/rebar.config +++ b/rebar.config @@ -61,7 +61,7 @@ , {getopt, "1.0.1"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}} - , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.14"}}} + , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.15"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} ]}.