fix: merge main-4.3

This commit is contained in:
DDDHuang 2022-05-17 09:49:19 +08:00
parent 810ca65011
commit 86e8de4737
3 changed files with 59 additions and 15 deletions

View File

@ -6,7 +6,9 @@
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date}, {add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.8", {"4.3.8",
[{add_module,emqx_rule_date}, [{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
@ -136,6 +138,8 @@
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]}, {delete_module,emqx_rule_date}]},
{"4.3.8", {"4.3.8",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},

View File

@ -38,6 +38,8 @@
, start_resource/1 , start_resource/1
, get_resource_status/1 , get_resource_status/1
, is_source_alive/1 , is_source_alive/1
, is_source_alive/2
, is_source_alive/3
, get_resource_params/1 , get_resource_params/1
, ensure_resource_deleted/1 , ensure_resource_deleted/1
, delete_resource/1 , delete_resource/1
@ -340,7 +342,7 @@ test_resource(#{type := Type} = Params) ->
try try
case create_resource(maps:put(id, ResId, Params), no_retry) of case create_resource(maps:put(id, ResId, Params), no_retry) of
{ok, _} -> {ok, _} ->
case is_source_alive(ResId) of case is_source_alive(ResId, #{fetch => true}) of
true -> true ->
ok; ok;
false -> false ->
@ -363,15 +365,53 @@ test_resource(#{type := Type} = Params) ->
end. end.
is_source_alive(ResId) -> is_source_alive(ResId) ->
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of is_source_alive(ResId, #{fetch => false}).
{ResL, []} ->
is_source_alive_(ResL); is_source_alive(ResId, Opts) ->
{_, _Errors} -> is_source_alive(ekka_mnesia:running_nodes(), ResId, Opts).
false
-spec(is_source_alive(list(node()) | node(), resource_id(), #{fetch := boolean()}) -> boolean()).
is_source_alive(Node, ResId, Opts) when is_atom(Node) ->
is_source_alive([Node], ResId, Opts);
is_source_alive(Nodes, ResId, _Opts = #{fetch := true}) ->
try
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType}} ->
{ok, #resource_type{on_status = {Mod, OnStatus}}}
= emqx_rule_registry:find_resource_type(ResType),
case rpc:multicall(Nodes,
?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of
{ResL, []} ->
is_source_alive_(ResL);
{_, _Error} ->
false
end;
not_found ->
false
end
catch E:R:S ->
?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]),
false
end;
is_source_alive(Nodes, ResId, _Opts = #{fetch := false}) ->
try
case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of
{ResL, []} ->
is_source_alive_(ResL);
{_, _Errors} ->
false
end
catch E:R:S ->
?LOG(warning, "is_source_alive failed, ~0p:~0p ~0p", [E, R, S]),
false
end. end.
%% fetch_resource_status -> #{is_alive => boolean()}
%% get_resource_status -> {ok #{is_alive => boolean()}}
is_source_alive_([]) -> true; is_source_alive_([]) -> true;
is_source_alive_([#{is_alive := true} | ResL]) -> is_source_alive_(ResL);
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL);
is_source_alive_([#{is_alive := false} | _ResL]) -> false;
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
is_source_alive_([_Error | _ResL]) -> false. is_source_alive_([_Error | _ResL]) -> false.

View File

@ -332,14 +332,14 @@ list_resources_by_type(#{type := Type}, _Params) ->
show_resource(#{id := Id}, _Params) -> show_resource(#{id := Id}, _Params) ->
case emqx_rule_registry:find_resource(Id) of case emqx_rule_registry:find_resource(Id) of
{ok, R} -> {ok, R} ->
Status = StatusFun =
lists:concat( fun(Node) ->
[ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of #{
{badrpc, _} -> []; node => Node,
{ok, St} -> [maps:put(node, Node, St)]; is_alive => emqx_rule_engine:is_source_alive(Node, Id, #{fetch => false})
{error, _} -> [maps:put(node, Node, #{is_alive => false})] }
end end,
|| Node <- ekka_mnesia:running_nodes()]), Status = [StatusFun(Node) || Node <- ekka_mnesia:running_nodes()],
return({ok, maps:put(status, Status, record_to_map(R))}); return({ok, maps:put(status, Status, record_to_map(R))});
not_found -> not_found ->
return({error, 404, <<"Not Found">>}) return({error, 404, <<"Not Found">>})