Merge pull request #7453 from DDDHuang/re_source_test_4314

fix(rule): connection test when creating a resource
This commit is contained in:
DDDHuang 2022-03-30 15:21:48 +08:00 committed by GitHub
commit e801f5b3a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 21 deletions

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.8",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
@ -84,7 +85,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.8",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},

View File

@ -37,6 +37,7 @@
, test_resource/1
, start_resource/1
, get_resource_status/1
, is_source_alive/1
, get_resource_params/1
, delete_resource/1
, update_resource/2
@ -314,24 +315,37 @@ start_resource(ResId) ->
end.
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
test_resource(#{type := Type, config := Config0}) ->
test_resource(#{type := Type} = Params) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {ModC, Create},
on_destroy = {ModD, Destroy},
params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = resource_id(),
{ok, #resource_type{}} ->
ResId = maps:get(id, Params, resource_id()),
try
_ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]),
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
_ = create_resource(maps:put(id, ResId, Params)),
true = is_source_alive(ResId),
ok
catch
throw:Reason -> {error, Reason}
catch E:R:S ->
?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
{error, R}
after
_ = ?CLUSTER_CALL(delete_resource, [ResId])
end;
not_found ->
{error, {resource_type_not_found, Type}}
end.
is_source_alive(ResId) ->
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of
{ResL, []} ->
is_source_alive_(ResL);
{_, _Errors} ->
false
end.
is_source_alive_([]) -> true;
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL);
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
is_source_alive_([_Error | _ResL]) -> false.
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) ->
case emqx_rule_registry:find_resource_params(ResId) of

View File

@ -296,7 +296,7 @@ do_create_resource(Create, ParsedParams) ->
list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := ResId}) ->
Status = get_aggregated_status(ResId),
Status = emqx_rule_engine:is_source_alive(ResId),
maps:put(status, Status, Res)
end, Data0),
return({ok, Data}).
@ -304,14 +304,6 @@ list_resources(#{}, _Params) ->
list_resources_by_type(#{type := Type}, _Params) ->
return_all(emqx_rule_registry:get_resources_by_type(Type)).
get_aggregated_status(ResId) ->
lists:all(fun(Node) ->
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
{ok, #{is_alive := true}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()).
show_resource(#{id := Id}, _Params) ->
case emqx_rule_registry:find_resource(Id) of
{ok, R} ->