diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 0f62a2ac7..72514a320 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -95,6 +95,9 @@ end end()). +-define(GET_RES_ALIVE_TIMEOUT, 60000). +-define(PROBE_RES_PREFIX, "__probe__:"). + %%------------------------------------------------------------------------------ %% Load resource/action providers from all available applications %%------------------------------------------------------------------------------ @@ -363,7 +366,7 @@ test_resource(#{type := Type} = Params) -> {ok, #resource_type{}} -> %% Resource will be deleted after test. %% Use random resource id, ensure test func will not delete the resource in used. - ResId = resource_id(), + ResId = probe_resource_id(), try case create_resource(maps:put(id, ResId, Params), no_retry) of {ok, _} -> @@ -405,7 +408,7 @@ is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) -> {ok, #resource_type{on_status = {Mod, OnStatus}}} = emqx_rule_registry:find_resource_type(ResType), case rpc:multicall(Nodes, - ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of + ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], ?GET_RES_ALIVE_TIMEOUT) of {ResL, []} -> is_resource_alive_(ResL); {_, _Error} -> @@ -420,7 +423,7 @@ is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) -> end; is_resource_alive(Nodes, ResId, _Opts = #{fetch := false}) -> try - case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of + case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], ?GET_RES_ALIVE_TIMEOUT) of {ResL, []} -> is_resource_alive_(ResL); {_, _Errors} -> @@ -532,10 +535,15 @@ refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) -> refresh_resource_status() -> lists:foreach( fun(#resource{id = ResId, type = ResType}) -> - case emqx_rule_registry:find_resource_type(ResType) of - {ok, #resource_type{on_status = {Mod, OnStatus}}} -> - fetch_resource_status(Mod, OnStatus, ResId); - _ -> ok + case is_prober(ResId) of + false -> + case emqx_rule_registry:find_resource_type(ResType) of + {ok, #resource_type{on_status = {Mod, OnStatus}}} -> + fetch_resource_status(Mod, OnStatus, ResId); + _ -> ok + end; + true -> + ok end end, emqx_rule_registry:get_resources()). @@ -662,6 +670,9 @@ ignore_lib_apps(Apps) -> resource_id() -> gen_id("resource:", fun emqx_rule_registry:find_resource/1). +probe_resource_id() -> + gen_id(?PROBE_RES_PREFIX, fun emqx_rule_registry:find_resource/1). + rule_id() -> gen_id("rule:", fun emqx_rule_registry:get_rule/1). @@ -812,3 +823,8 @@ find_type(ResId) -> alarm_name_of_resource_down(Type, ResId) -> list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])). + +is_prober(<>) -> + true; +is_prober(_ResId) -> + false. diff --git a/rebar.config b/rebar.config index 26fbf7f8e..e109fe242 100644 --- a/rebar.config +++ b/rebar.config @@ -61,7 +61,7 @@ , {getopt, "1.0.1"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}} - , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.14"}}} + , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.15"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} ]}.