fix(rule_engine): better function name for resource

This commit is contained in:
DDDHuang 2022-05-18 11:14:12 +08:00
parent f3bef3c81c
commit a67dff4568
2 changed files with 26 additions and 26 deletions

View File

@ -37,9 +37,9 @@
, test_resource/1 , test_resource/1
, start_resource/1 , start_resource/1
, get_resource_status/1 , get_resource_status/1
, is_source_alive/1 , is_resource_alive/1
, is_source_alive/2 , is_resource_alive/2
, is_source_alive/3 , is_resource_alive/3
, get_resource_params/1 , get_resource_params/1
, ensure_resource_deleted/1 , ensure_resource_deleted/1
, delete_resource/1 , delete_resource/1
@ -349,11 +349,11 @@ test_resource(#{type := Type} = Params) ->
try try
case create_resource(maps:put(id, ResId, Params), no_retry) of case create_resource(maps:put(id, ResId, Params), no_retry) of
{ok, _} -> {ok, _} ->
case is_source_alive(ResId, #{fetch => true}) of case is_resource_alive(ResId, #{fetch => true}) of
true -> true ->
ok; ok;
false -> false ->
%% in is_source_alive, the cluster-call RPC logs errors %% in is_resource_alive, the cluster-call RPC logs errors
%% so we do not log anything here %% so we do not log anything here
{error, {resource_down, ResId}} {error, {resource_down, ResId}}
end; end;
@ -371,16 +371,16 @@ test_resource(#{type := Type} = Params) ->
{error, {resource_type_not_found, Type}} {error, {resource_type_not_found, Type}}
end. end.
is_source_alive(ResId) -> is_resource_alive(ResId) ->
is_source_alive(ResId, #{fetch => false}). is_resource_alive(ResId, #{fetch => false}).
is_source_alive(ResId, Opts) -> is_resource_alive(ResId, Opts) ->
is_source_alive(ekka_mnesia:running_nodes(), ResId, Opts). is_resource_alive(ekka_mnesia:running_nodes(), ResId, Opts).
-spec(is_source_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()). -spec(is_resource_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()).
is_source_alive(Node, ResId, Opts) when is_atom(Node) -> is_resource_alive(Node, ResId, Opts) when is_atom(Node) ->
is_source_alive([Node], ResId, Opts); is_resource_alive([Node], ResId, Opts);
is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) -> is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) ->
try try
case emqx_rule_registry:find_resource(ResId) of case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType}} -> {ok, #resource{type = ResType}} ->
@ -389,7 +389,7 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) ->
case rpc:multicall(Nodes, case rpc:multicall(Nodes,
?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of ?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of
{ResL, []} -> {ResL, []} ->
is_source_alive_(ResL); is_resource_alive_(ResL);
{_, _Error} -> {_, _Error} ->
false false
end; end;
@ -397,30 +397,30 @@ is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) ->
false false
end end
catch E:R:S -> catch E:R:S ->
?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), ?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]),
false false
end; end;
is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) -> is_resource_alive(Nodes, ResId, _Opts = #{fetch := false}) ->
try try
case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of
{ResL, []} -> {ResL, []} ->
is_source_alive_(ResL); is_resource_alive_(ResL);
{_, _Errors} -> {_, _Errors} ->
false false
end end
catch E:R:S -> catch E:R:S ->
?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]), ?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]),
false false
end. end.
%% fetch_resource_status -> #{is_alive => boolean()} %% fetch_resource_status -> #{is_alive => boolean()}
%% get_resource_status -> {ok, #{is_alive => boolean()}} %% get_resource_status -> {ok, #{is_alive => boolean()}}
is_source_alive_([]) -> true; is_resource_alive_([]) -> true;
is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL); is_resource_alive_([#{is_alive := true} | ResL]) -> is_resource_alive_(ResL);
is_source_alive_([#{is_alive := false} | _ResL]) -> false; is_resource_alive_([#{is_alive := false} | _ResL]) -> false;
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); is_resource_alive_([{ok, #{is_alive := true}} | ResL]) -> is_resource_alive_(ResL);
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; is_resource_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
is_source_alive_([_Error | _ResL]) -> false. is_resource_alive_([_Error | _ResL]) -> false.
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) -> get_resource_status(ResId) ->

View File

@ -321,7 +321,7 @@ do_create_resource(Create, ParsedParams) ->
list_resources(#{}, _Params) -> list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := ResId}) -> Data = lists:map(fun(Res = #{id := ResId}) ->
Status = emqx_rule_engine:is_source_alive(ResId), Status = emqx_rule_engine:is_resource_alive(ResId),
maps:put(status, Status, Res) maps:put(status, Status, Res)
end, Data0), end, Data0),
return({ok, Data}). return({ok, Data}).
@ -336,7 +336,7 @@ show_resource(#{id := Id}, _Params) ->
fun(Node) -> fun(Node) ->
#{ #{
node => Node, node => Node,
is_alive => emqx_rule_engine:is_source_alive(Node, Id, #{fetch => false}) is_alive => emqx_rule_engine:is_resource_alive(Node, Id, #{fetch => false})
} }
end, end,
Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()], Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()],