fix(rule): safe apply & test resource in cluster
This commit is contained in:
parent
63616c5e93
commit
46cfcf662e
|
@ -2,7 +2,8 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.8",
|
||||
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
@ -81,7 +82,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.8",
|
||||
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
, create_resource/1
|
||||
, test_resource/1
|
||||
, start_resource/1
|
||||
, is_source_alive/1
|
||||
, get_resource_status/1
|
||||
, get_resource_params/1
|
||||
, delete_resource/1
|
||||
|
@ -318,45 +319,33 @@ test_resource(#{type := Type} = Params) ->
|
|||
case emqx_rule_registry:find_resource_type(Type) of
|
||||
{ok, #resource_type{}} ->
|
||||
ResId = maps:get(id, Params, resource_id()),
|
||||
CreateFun = fun() -> _ = create_resource(Params) end,
|
||||
StatusFun =
|
||||
fun() ->
|
||||
case get_resource_status(ResId) of
|
||||
{ok, #{is_alive := true}} ->
|
||||
ignore;
|
||||
{ok, #{is_alive := false}} ->
|
||||
error(not_alive);
|
||||
{error, R} ->
|
||||
error(R)
|
||||
end
|
||||
end,
|
||||
DeleteFun = fun() -> _ = ?CLUSTER_CALL(delete_resource, [ResId]) end,
|
||||
case
|
||||
%% create error or status failed
|
||||
(ok == safe_test_resource(CreateFun, create_resource))
|
||||
andalso
|
||||
safe_test_resource(StatusFun, get_resource_status)
|
||||
of
|
||||
ok ->
|
||||
_ = safe_test_resource(DeleteFun, delete_resource),
|
||||
ok;
|
||||
_ ->
|
||||
_ = safe_test_resource(DeleteFun, delete_resource),
|
||||
{error, {resource_error, not_available}}
|
||||
try
|
||||
_ = create_resource(maps:put(id, ResId, Params)),
|
||||
true = is_source_alive(ResId),
|
||||
ok
|
||||
catch E:R:S ->
|
||||
?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
|
||||
{error, R}
|
||||
after
|
||||
_ = ?CLUSTER_CALL(delete_resource, [ResId])
|
||||
end;
|
||||
not_found ->
|
||||
{error, {resource_type_not_found, Type}}
|
||||
end.
|
||||
|
||||
safe_test_resource(Fun, ErrorLogInfo) ->
|
||||
try
|
||||
_ = Fun(),
|
||||
ok
|
||||
catch E:R:S ->
|
||||
?LOG(warning, "safe exec fun error, ~0p, ~0p:~0p ~0p", [ErrorLogInfo, E, R, S]),
|
||||
{error, R}
|
||||
is_source_alive(ResId) ->
|
||||
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of
|
||||
{ResL, []} ->
|
||||
is_source_alive_(ResL);
|
||||
{_, _Errors} ->
|
||||
false
|
||||
end.
|
||||
|
||||
is_source_alive_([]) -> true;
|
||||
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL);
|
||||
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
|
||||
is_source_alive_([_Error | _ResL]) -> false.
|
||||
|
||||
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
|
||||
get_resource_status(ResId) ->
|
||||
case emqx_rule_registry:find_resource_params(ResId) of
|
||||
|
|
|
@ -296,7 +296,7 @@ do_create_resource(Create, ParsedParams) ->
|
|||
list_resources(#{}, _Params) ->
|
||||
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
|
||||
Data = lists:map(fun(Res = #{id := ResId}) ->
|
||||
Status = get_aggregated_status(ResId),
|
||||
Status = emqx_rule_engine:is_source_alive(ResId),
|
||||
maps:put(status, Status, Res)
|
||||
end, Data0),
|
||||
return({ok, Data}).
|
||||
|
@ -304,14 +304,6 @@ list_resources(#{}, _Params) ->
|
|||
list_resources_by_type(#{type := Type}, _Params) ->
|
||||
return_all(emqx_rule_registry:get_resources_by_type(Type)).
|
||||
|
||||
get_aggregated_status(ResId) ->
|
||||
lists:all(fun(Node) ->
|
||||
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
|
||||
{ok, #{is_alive := true}} -> true;
|
||||
_ -> false
|
||||
end
|
||||
end, ekka_mnesia:running_nodes()).
|
||||
|
||||
show_resource(#{id := Id}, _Params) ->
|
||||
case emqx_rule_registry:find_resource(Id) of
|
||||
{ok, R} ->
|
||||
|
|
Loading…
Reference in New Issue