diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 24c0bed56..312fc9e7d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -37,9 +37,9 @@ , test_resource/1 , start_resource/1 , get_resource_status/1 - , is_source_alive/1 - , is_source_alive/2 - , is_source_alive/3 + , is_resource_alive/1 + , is_resource_alive/2 + , is_resource_alive/3 , get_resource_params/1 , ensure_resource_deleted/1 , delete_resource/1 @@ -349,11 +349,11 @@ test_resource(#{type := Type} = Params) -> try case create_resource(maps:put(id, ResId, Params), no_retry) of {ok, _} -> - case is_source_alive(ResId, #{fetch => true}) of + case is_resource_alive(ResId, #{fetch => true}) of true -> ok; false -> - %% in is_source_alive, the cluster-call RPC logs errors + %% in is_resource_alive, the cluster-call RPC logs errors %% so we do not log anything here {error, {resource_down, ResId}} end; @@ -371,16 +371,16 @@ test_resource(#{type := Type} = Params) -> {error, {resource_type_not_found, Type}} end. -is_source_alive(ResId) -> - is_source_alive(ResId, #{fetch => false}). +is_resource_alive(ResId) -> + is_resource_alive(ResId, #{fetch => false}). -is_source_alive(ResId, Opts) -> - is_source_alive(ekka_mnesia:running_nodes(), ResId, Opts). +is_resource_alive(ResId, Opts) -> + is_resource_alive(ekka_mnesia:running_nodes(), ResId, Opts). --spec(is_source_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()). -is_source_alive(Node, ResId, Opts) when is_atom(Node) -> - is_source_alive([Node], ResId, Opts); -is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> +-spec(is_resource_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()). +is_resource_alive(Node, ResId, Opts) when is_atom(Node) -> + is_resource_alive([Node], ResId, Opts); +is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) -> try case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = ResType}} -> @@ -389,7 +389,7 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> case rpc:multicall(Nodes, ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of {ResL, []} -> - is_source_alive_(ResL); + is_resource_alive_(ResL); {_, _Error} -> false end; @@ -397,30 +397,30 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> false end catch E:R:S -> - ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + ?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]), false end; -is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) -> +is_resource_alive(Nodes, ResId, _Opts = #{fetch := false}) -> try case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of {ResL, []} -> - is_source_alive_(ResL); + is_resource_alive_(ResL); {_, _Errors} -> false end catch E:R:S -> - ?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), + ?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]), false end. %% fetch_resource_status -> #{is_alive => boolean()} %% get_resource_status -> {ok, #{is_alive => boolean()}} -is_source_alive_([]) -> true; -is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL); -is_source_alive_([#{is_alive := false} | _ResL]) -> false; -is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); -is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; -is_source_alive_([_Error | _ResL]) -> false. +is_resource_alive_([]) -> true; +is_resource_alive_([#{is_alive := true} | ResL]) -> is_resource_alive_(ResL); +is_resource_alive_([#{is_alive := false} | _ResL]) -> false; +is_resource_alive_([{ok, #{is_alive := true}} | ResL]) -> is_resource_alive_(ResL); +is_resource_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; +is_resource_alive_([_Error | _ResL]) -> false. -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index c196e03f7..6587f9c5f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -321,7 +321,7 @@ do_create_resource(Create, ParsedParams) -> list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data = lists:map(fun(Res = #{id := ResId}) -> - Status = emqx_rule_engine:is_source_alive(ResId), + Status = emqx_rule_engine:is_resource_alive(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). @@ -336,7 +336,7 @@ show_resource(#{id := Id}, _Params) -> fun(Node) -> #{ node => Node, - is_alive => emqx_rule_engine:is_source_alive(Node, Id, #{fetch => false}) + is_alive => emqx_rule_engine:is_resource_alive(Node, Id, #{fetch => false}) } end, Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()],