Revert "fix(rule): connection test when creating a resource"
This commit is contained in:
parent
0684b4f716
commit
47e807b2ac
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_rule_engine,
|
{application, emqx_rule_engine,
|
||||||
[{description, "EMQ X Rule Engine"},
|
[{description, "EMQ X Rule Engine"},
|
||||||
{vsn, "4.3.9"}, % strict semver, bump manually!
|
{vsn, "4.3.8"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
||||||
{applications, [kernel,stdlib,rulesql,getopt]},
|
{applications, [kernel,stdlib,rulesql,getopt]},
|
||||||
|
|
|
@ -1,13 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.8",
|
[{"4.3.7",
|
||||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
|
||||||
{"4.3.7",
|
|
||||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
@ -84,13 +78,7 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.8",
|
[{"4.3.7",
|
||||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
|
||||||
{"4.3.7",
|
|
||||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -36,7 +36,6 @@
|
||||||
, create_resource/1
|
, create_resource/1
|
||||||
, test_resource/1
|
, test_resource/1
|
||||||
, start_resource/1
|
, start_resource/1
|
||||||
, is_source_alive/1
|
|
||||||
, get_resource_status/1
|
, get_resource_status/1
|
||||||
, get_resource_params/1
|
, get_resource_params/1
|
||||||
, delete_resource/1
|
, delete_resource/1
|
||||||
|
@ -315,37 +314,24 @@ start_resource(ResId) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
|
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
|
||||||
test_resource(#{type := Type} = Params) ->
|
test_resource(#{type := Type, config := Config0}) ->
|
||||||
case emqx_rule_registry:find_resource_type(Type) of
|
case emqx_rule_registry:find_resource_type(Type) of
|
||||||
{ok, #resource_type{}} ->
|
{ok, #resource_type{on_create = {ModC, Create},
|
||||||
ResId = maps:get(id, Params, resource_id()),
|
on_destroy = {ModD, Destroy},
|
||||||
|
params_spec = ParamSpec}} ->
|
||||||
|
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
|
||||||
|
ResId = resource_id(),
|
||||||
try
|
try
|
||||||
_ = create_resource(maps:put(id, ResId, Params)),
|
_ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]),
|
||||||
true = is_source_alive(ResId),
|
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
|
||||||
ok
|
ok
|
||||||
catch E:R:S ->
|
catch
|
||||||
?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
|
throw:Reason -> {error, Reason}
|
||||||
{error, R}
|
|
||||||
after
|
|
||||||
_ = ?CLUSTER_CALL(delete_resource, [ResId])
|
|
||||||
end;
|
end;
|
||||||
not_found ->
|
not_found ->
|
||||||
{error, {resource_type_not_found, Type}}
|
{error, {resource_type_not_found, Type}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_source_alive(ResId) ->
|
|
||||||
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of
|
|
||||||
{ResL, []} ->
|
|
||||||
is_source_alive_(ResL);
|
|
||||||
{_, _Errors} ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
is_source_alive_([]) -> true;
|
|
||||||
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL);
|
|
||||||
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
|
|
||||||
is_source_alive_([_Error | _ResL]) -> false.
|
|
||||||
|
|
||||||
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
|
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
|
||||||
get_resource_status(ResId) ->
|
get_resource_status(ResId) ->
|
||||||
case emqx_rule_registry:find_resource_params(ResId) of
|
case emqx_rule_registry:find_resource_params(ResId) of
|
||||||
|
|
|
@ -296,7 +296,7 @@ do_create_resource(Create, ParsedParams) ->
|
||||||
list_resources(#{}, _Params) ->
|
list_resources(#{}, _Params) ->
|
||||||
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
|
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
|
||||||
Data = lists:map(fun(Res = #{id := ResId}) ->
|
Data = lists:map(fun(Res = #{id := ResId}) ->
|
||||||
Status = emqx_rule_engine:is_source_alive(ResId),
|
Status = get_aggregated_status(ResId),
|
||||||
maps:put(status, Status, Res)
|
maps:put(status, Status, Res)
|
||||||
end, Data0),
|
end, Data0),
|
||||||
return({ok, Data}).
|
return({ok, Data}).
|
||||||
|
@ -304,6 +304,14 @@ list_resources(#{}, _Params) ->
|
||||||
list_resources_by_type(#{type := Type}, _Params) ->
|
list_resources_by_type(#{type := Type}, _Params) ->
|
||||||
return_all(emqx_rule_registry:get_resources_by_type(Type)).
|
return_all(emqx_rule_registry:get_resources_by_type(Type)).
|
||||||
|
|
||||||
|
get_aggregated_status(ResId) ->
|
||||||
|
lists:all(fun(Node) ->
|
||||||
|
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
|
||||||
|
{ok, #{is_alive := true}} -> true;
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
end, ekka_mnesia:running_nodes()).
|
||||||
|
|
||||||
show_resource(#{id := Id}, _Params) ->
|
show_resource(#{id := Id}, _Params) ->
|
||||||
case emqx_rule_registry:find_resource(Id) of
|
case emqx_rule_registry:find_resource(Id) of
|
||||||
{ok, R} ->
|
{ok, R} ->
|
||||||
|
|
Loading…
Reference in New Issue