Merge pull request #7745 from DDDHuang/fetch_re_status
fix: test resource with fetch new status
This commit is contained in:
commit
f269260293
|
@ -6,7 +6,9 @@
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{add_module,emqx_rule_date},
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{add_module,emqx_rule_date},
|
[{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
|
@ -136,6 +138,8 @@
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -37,7 +37,9 @@
|
||||||
, test_resource/1
|
, test_resource/1
|
||||||
, start_resource/1
|
, start_resource/1
|
||||||
, get_resource_status/1
|
, get_resource_status/1
|
||||||
, is_source_alive/1
|
, is_resource_alive/1
|
||||||
|
, is_resource_alive/2
|
||||||
|
, is_resource_alive/3
|
||||||
, get_resource_params/1
|
, get_resource_params/1
|
||||||
, ensure_resource_deleted/1
|
, ensure_resource_deleted/1
|
||||||
, delete_resource/1
|
, delete_resource/1
|
||||||
|
@ -46,7 +48,7 @@
|
||||||
|
|
||||||
-export([ init_resource/4
|
-export([ init_resource/4
|
||||||
, init_action/4
|
, init_action/4
|
||||||
, clear_resource/3
|
, clear_resource/4
|
||||||
, clear_rule/1
|
, clear_rule/1
|
||||||
, clear_actions/1
|
, clear_actions/1
|
||||||
, clear_action/3
|
, clear_action/3
|
||||||
|
@ -55,6 +57,13 @@
|
||||||
-export([ restore_action_metrics/2
|
-export([ restore_action_metrics/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ fetch_resource_status/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([alarm_name_of_resource_down/2]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-type(rule() :: #rule{}).
|
-type(rule() :: #rule{}).
|
||||||
-type(action() :: #action{}).
|
-type(action() :: #action{}).
|
||||||
-type(resource() :: #resource{}).
|
-type(resource() :: #resource{}).
|
||||||
|
@ -340,11 +349,11 @@ test_resource(#{type := Type} = Params) ->
|
||||||
try
|
try
|
||||||
case create_resource(maps:put(id, ResId, Params), no_retry) of
|
case create_resource(maps:put(id, ResId, Params), no_retry) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
case is_source_alive(ResId) of
|
case is_resource_alive(ResId, #{fetch => true}) of
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
%% in is_source_alive, the cluster-call RPC logs errors
|
%% in is_resource_alive, the cluster-call RPC logs errors
|
||||||
%% so we do not log anything here
|
%% so we do not log anything here
|
||||||
{error, {resource_down, ResId}}
|
{error, {resource_down, ResId}}
|
||||||
end;
|
end;
|
||||||
|
@ -362,18 +371,56 @@ test_resource(#{type := Type} = Params) ->
|
||||||
{error, {resource_type_not_found, Type}}
|
{error, {resource_type_not_found, Type}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_source_alive(ResId) ->
|
is_resource_alive(ResId) ->
|
||||||
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of
|
is_resource_alive(ResId, #{fetch => false}).
|
||||||
|
|
||||||
|
is_resource_alive(ResId, Opts) ->
|
||||||
|
is_resource_alive(ekka_mnesia:running_nodes(), ResId, Opts).
|
||||||
|
|
||||||
|
-spec(is_resource_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()).
|
||||||
|
is_resource_alive(Node, ResId, Opts) when is_atom(Node) ->
|
||||||
|
is_resource_alive([Node], ResId, Opts);
|
||||||
|
is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) ->
|
||||||
|
try
|
||||||
|
case emqx_rule_registry:find_resource(ResId) of
|
||||||
|
{ok, #resource{type = ResType}} ->
|
||||||
|
{ok, #resource_type{on_status = {Mod, OnStatus}}}
|
||||||
|
= emqx_rule_registry:find_resource_type(ResType),
|
||||||
|
case rpc:multicall(Nodes,
|
||||||
|
?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of
|
||||||
{ResL, []} ->
|
{ResL, []} ->
|
||||||
is_source_alive_(ResL);
|
is_resource_alive_(ResL);
|
||||||
|
{_, _Error} ->
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
not_found ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
catch E:R:S ->
|
||||||
|
?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]),
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
is_resource_alive(Nodes, ResId, _Opts = #{fetch := false}) ->
|
||||||
|
try
|
||||||
|
case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of
|
||||||
|
{ResL, []} ->
|
||||||
|
is_resource_alive_(ResL);
|
||||||
{_, _Errors} ->
|
{_, _Errors} ->
|
||||||
false
|
false
|
||||||
|
end
|
||||||
|
catch E:R:S ->
|
||||||
|
?LOG(warning, "is_resource_alive failed, ~0p:~0p ~0p", [E, R, S]),
|
||||||
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_source_alive_([]) -> true;
|
%% fetch_resource_status -> #{is_alive => boolean()}
|
||||||
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL);
|
%% get_resource_status -> {ok, #{is_alive => boolean()}}
|
||||||
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
|
is_resource_alive_([]) -> true;
|
||||||
is_source_alive_([_Error | _ResL]) -> false.
|
is_resource_alive_([#{is_alive := true} | ResL]) -> is_resource_alive_(ResL);
|
||||||
|
is_resource_alive_([#{is_alive := false} | _ResL]) -> false;
|
||||||
|
is_resource_alive_([{ok, #{is_alive := true}} | ResL]) -> is_resource_alive_(ResL);
|
||||||
|
is_resource_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
|
||||||
|
is_resource_alive_([_Error | _ResL]) -> false.
|
||||||
|
|
||||||
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
|
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
|
||||||
get_resource_status(ResId) ->
|
get_resource_status(ResId) ->
|
||||||
|
@ -407,7 +454,7 @@ delete_resource(ResId) ->
|
||||||
try
|
try
|
||||||
case emqx_rule_registry:remove_resource(ResId) of
|
case emqx_rule_registry:remove_resource(ResId) of
|
||||||
ok ->
|
ok ->
|
||||||
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
|
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId, ResType]),
|
||||||
ok;
|
ok;
|
||||||
{error, _} = R -> R
|
{error, _} = R -> R
|
||||||
end
|
end
|
||||||
|
@ -609,9 +656,13 @@ init_action(Module, OnCreate, ActionInstId, Params) ->
|
||||||
#action_instance_params{id = ActionInstId, params = Params, apply = Apply})
|
#action_instance_params{id = ActionInstId, params = Params, apply = Apply})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clear_resource(_Module, undefined, ResId) ->
|
clear_resource(_Module, undefined, Type, ResId) ->
|
||||||
|
Name = alarm_name_of_resource_down(Type, ResId),
|
||||||
|
_ = emqx_alarm:deactivate(Name),
|
||||||
ok = emqx_rule_registry:remove_resource_params(ResId);
|
ok = emqx_rule_registry:remove_resource_params(ResId);
|
||||||
clear_resource(Module, Destroy, ResId) ->
|
clear_resource(Module, Destroy, Type, ResId) ->
|
||||||
|
Name = alarm_name_of_resource_down(Type, ResId),
|
||||||
|
_ = emqx_alarm:deactivate(Name),
|
||||||
case emqx_rule_registry:find_resource_params(ResId) of
|
case emqx_rule_registry:find_resource_params(ResId) of
|
||||||
{ok, #resource_params{params = Params}} ->
|
{ok, #resource_params{params = Params}} ->
|
||||||
?RAISE(Module:Destroy(ResId, Params),
|
?RAISE(Module:Destroy(ResId, Params),
|
||||||
|
|
|
@ -321,7 +321,7 @@ do_create_resource(Create, ParsedParams) ->
|
||||||
list_resources(#{}, _Params) ->
|
list_resources(#{}, _Params) ->
|
||||||
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
|
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
|
||||||
Data = lists:map(fun(Res = #{id := ResId}) ->
|
Data = lists:map(fun(Res = #{id := ResId}) ->
|
||||||
Status = emqx_rule_engine:is_source_alive(ResId),
|
Status = emqx_rule_engine:is_resource_alive(ResId),
|
||||||
maps:put(status, Status, Res)
|
maps:put(status, Status, Res)
|
||||||
end, Data0),
|
end, Data0),
|
||||||
return({ok, Data}).
|
return({ok, Data}).
|
||||||
|
@ -332,14 +332,14 @@ list_resources_by_type(#{type := Type}, _Params) ->
|
||||||
show_resource(#{id := Id}, _Params) ->
|
show_resource(#{id := Id}, _Params) ->
|
||||||
case emqx_rule_registry:find_resource(Id) of
|
case emqx_rule_registry:find_resource(Id) of
|
||||||
{ok, R} ->
|
{ok, R} ->
|
||||||
Status =
|
StatusFun =
|
||||||
lists:concat(
|
fun(Node) ->
|
||||||
[ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of
|
#{
|
||||||
{badrpc, _} -> [];
|
node => Node,
|
||||||
{ok, St} -> [maps:put(node, Node, St)];
|
is_alive => emqx_rule_engine:is_resource_alive(Node, Id, #{fetch => false})
|
||||||
{error, _} -> [maps:put(node, Node, #{is_alive => false})]
|
}
|
||||||
end
|
end,
|
||||||
|| Node <- ekka_mnesia:running_nodes()]),
|
Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()],
|
||||||
return({ok, maps:put(status, Status, record_to_map(R))});
|
return({ok, maps:put(status, Status, record_to_map(R))});
|
||||||
not_found ->
|
not_found ->
|
||||||
return({error, 404, <<"Not Found">>})
|
return({error, 404, <<"Not Found">>})
|
||||||
|
|
|
@ -165,7 +165,7 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
on_resource_create(_id, _) -> #{}.
|
on_resource_create(_id, _) -> #{}.
|
||||||
on_resource_destroy(_id, _) -> ok.
|
on_resource_destroy(_id, _) -> ok.
|
||||||
on_get_resource_status(_id, _) -> #{}.
|
on_get_resource_status(_id, _) -> #{is_alive => true}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Group specific setup/teardown
|
%% Group specific setup/teardown
|
||||||
|
@ -327,6 +327,24 @@ t_create_resource(_Config) ->
|
||||||
emqx_rule_registry:remove_resource(ResId),
|
emqx_rule_registry:remove_resource(ResId),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_clean_resource_alarms(_Config) ->
|
||||||
|
ok = emqx_rule_engine:load_providers(),
|
||||||
|
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
|
||||||
|
#{type => built_in,
|
||||||
|
config => #{},
|
||||||
|
description => <<"debug resource">>}),
|
||||||
|
?assert(true, is_binary(ResId)),
|
||||||
|
Name = emqx_rule_engine:alarm_name_of_resource_down(built_in, ResId),
|
||||||
|
_ = emqx_alarm:activate(Name, #{id => ResId, type => built_in}),
|
||||||
|
AlarmExist = fun(#{name := AName}) -> AName == Name end,
|
||||||
|
Len = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())),
|
||||||
|
?assert(Len == 1),
|
||||||
|
ok = emqx_rule_engine:unload_providers(),
|
||||||
|
emqx_rule_registry:remove_resource(ResId),
|
||||||
|
LenAfterRemove = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())),
|
||||||
|
?assert(LenAfterRemove == 0),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Test cases for rule actions
|
%% Test cases for rule actions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue