diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 25438c828..24a4a5358 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -6,7 +6,9 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -136,6 +138,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 477c5ad9a..53bd4c1b6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -38,6 +38,8 @@ , start_resource/1 , get_resource_status/1 , is_source_alive/1 + , is_source_alive/2 + , is_source_alive/3 , get_resource_params/1 , ensure_resource_deleted/1 , delete_resource/1 @@ -340,7 +342,7 @@ test_resource(#{type := Type} = Params) -> try case create_resource(maps:put(id, ResId, Params), no_retry) of {ok, _} -> - case is_source_alive(ResId) of + case is_source_alive(ResId, #{fetch => true}) of true -> ok; false -> @@ -363,15 +365,53 @@ test_resource(#{type := Type} = Params) -> end. is_source_alive(ResId) -> - case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of - {ResL, []} -> - is_source_alive_(ResL); - {_, _Errors} -> - false + is_source_alive(ResId, #{fetch => false}). + +is_source_alive(ResId, Opts) -> + is_source_alive(ekka_mnesia:running_nodes(), ResId, Opts). + +-spec(is_source_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()). +is_source_alive(Node, ResId, Opts) when is_atom(Node) -> + is_source_alive([Node], ResId, Opts); +is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> + try + case emqx_rule_registry:find_resource(ResId) of + {ok, #resource{type = ResType}} -> + {ok, #resource_type{on_status = {Mod, OnStatus}}} + = emqx_rule_registry:find_resource_type(ResType), + case rpc:multicall(Nodes, + ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of + {ResL, []} -> + is_source_alive_(ResL); + {_, _Error} -> + false + end; + not_found -> + false + end + catch E:R:S -> + ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + false + end; +is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) -> + try + case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of + {ResL, []} -> + is_source_alive_(ResL); + {_, _Errors} -> + false + end + catch E:R:S -> + ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + false end. +%% fetch_resource_status -> #{is_alive => boolean()} +%% get_resource_status -> {ok #{is_alive => boolean()}} is_source_alive_([]) -> true; +is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL); is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); +is_source_alive_([#{is_alive := false} | _ResL]) -> false; is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; is_source_alive_([_Error | _ResL]) -> false. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 3430bc98d..c196e03f7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -332,14 +332,14 @@ list_resources_by_type(#{type := Type}, _Params) -> show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> - Status = - lists:concat( - [ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of - {badrpc, _} -> []; - {ok, St} -> [maps:put(node, Node, St)]; - {error, _} -> [maps:put(node, Node, #{is_alive => false})] - end - || Node <- ekka_mnesia:running_nodes()]), + StatusFun = + fun(Node) -> + #{ + node => Node, + is_alive => emqx_rule_engine:is_source_alive(Node, Id, #{fetch => false}) + } + end, + Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()], return({ok, maps:put(status, Status, record_to_map(R))}); not_found -> return({error, 404, <<"Not Found">>})