fix(rule): get the cached status when calling emqx_rule_engine:get_resource_status/1

This commit is contained in:
Shawn 2022-03-23 15:09:45 +08:00
parent ce2e4f51ac
commit 5275e6a30f
2 changed files with 19 additions and 16 deletions

View File

@ -334,14 +334,11 @@ test_resource(#{type := Type, config := Config0}) ->
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) ->
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType}} ->
{ok, #resource_type{on_status = {Mod, OnStatus}}}
= emqx_rule_registry:find_resource_type(ResType),
Status = fetch_resource_status(Mod, OnStatus, ResId),
case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource_params{status = Status}} ->
{ok, Status};
not_found ->
{error, {resource_not_found, ResId}}
{error, resource_not_initialized}
end.
-spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}).

View File

@ -295,13 +295,8 @@ do_create_resource(Create, ParsedParams) ->
list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := Id}) ->
Status = lists:all(fun(Node) ->
case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of
{ok, #resource_params{status = #{is_alive := true}}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()),
Data = lists:map(fun(Res = #{id := ResId}) ->
Status = get_aggregated_status(ResId),
maps:put(status, Status, Res)
end, Data0),
return({ok, Data}).
@ -309,12 +304,23 @@ list_resources(#{}, _Params) ->
list_resources_by_type(#{type := Type}, _Params) ->
return_all(emqx_rule_registry:get_resources_by_type(Type)).
get_aggregated_status(ResId) ->
lists:all(fun(Node) ->
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
{ok, #{is_alive := true}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()).
show_resource(#{id := Id}, _Params) ->
case emqx_rule_registry:find_resource(Id) of
{ok, R} ->
Status =
[begin
{ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]),
St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of
{ok, St0} -> St0;
{error, _} -> #{is_alive => false}
end,
maps:put(node, Node, St)
end || Node <- ekka_mnesia:running_nodes()],
return({ok, maps:put(status, Status, record_to_map(R))});
@ -326,8 +332,8 @@ get_resource_status(#{id := Id}, _Params) ->
case emqx_rule_engine:get_resource_status(Id) of
{ok, Status} ->
return({ok, Status});
{error, {resource_not_found, ResId}} ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)})
{error, resource_not_initialized} ->
return({error, 400, ?ERR_NO_RESOURCE(Id)})
end.
start_resource(#{id := Id}, _Params) ->