feat(plugin): add plugin http api test case
This commit is contained in:
parent
7160bc06b3
commit
0f681f6a08
|
@ -4,7 +4,7 @@
|
||||||
{vsn, "5.0.0"}, % strict semver, bump manually!
|
{vsn, "5.0.0"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_management_sup]},
|
{registered, [emqx_management_sup]},
|
||||||
{applications, [kernel,stdlib,minirest,emqx]},
|
{applications, [kernel,stdlib,emqx_plugins,minirest,emqx]},
|
||||||
{mod, {emqx_mgmt_app,[]}},
|
{mod, {emqx_mgmt_app,[]}},
|
||||||
{env, []},
|
{env, []},
|
||||||
{licenses, ["Apache-2.0"]},
|
{licenses, ["Apache-2.0"]},
|
||||||
|
|
|
@ -246,7 +246,6 @@ validate_name(Name) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% API CallBack Begin
|
%% API CallBack Begin
|
||||||
|
|
||||||
list_plugins(get, _) ->
|
list_plugins(get, _) ->
|
||||||
Plugins = cluster_call(emqx_plugins_monitor, get_plugins, [], 15000),
|
Plugins = cluster_call(emqx_plugins_monitor, get_plugins, [], 15000),
|
||||||
{200, format_plugins(Plugins)}.
|
{200, format_plugins(Plugins)}.
|
||||||
|
@ -263,7 +262,8 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -
|
||||||
{400, #{code => 'UNEXPECTED_ERROR',
|
{400, #{code => 'UNEXPECTED_ERROR',
|
||||||
message => iolist_to_binary(io_lib:format("~p", [Reason]))}}
|
message => iolist_to_binary(io_lib:format("~p", [Reason]))}}
|
||||||
end;
|
end;
|
||||||
upload_install(post, #{}) ->
|
upload_install(post, #{} = Body) ->
|
||||||
|
io:format("~p~n", [Body]),
|
||||||
{400, #{code => 'BAD_FORM_DATA',
|
{400, #{code => 'BAD_FORM_DATA',
|
||||||
message => <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
|
message => <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
|
||||||
}.
|
}.
|
||||||
|
@ -279,7 +279,7 @@ plugin(delete, #{bindings := #{name := Name}}) ->
|
||||||
return(204, cluster_rpc(?MODULE, delete_package, [Name])).
|
return(204, cluster_rpc(?MODULE, delete_package, [Name])).
|
||||||
|
|
||||||
update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
|
update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
|
||||||
return(200, cluster_rpc(?MODULE, ensure_action, [Name, Action])).
|
return(204, cluster_rpc(?MODULE, ensure_action, [Name, Action])).
|
||||||
|
|
||||||
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
|
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
|
||||||
case parse_position(Body, Name) of
|
case parse_position(Body, Name) of
|
||||||
|
@ -298,6 +298,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
|
||||||
%% For RPC upload_install/2
|
%% For RPC upload_install/2
|
||||||
install_package(FileName, Bin) ->
|
install_package(FileName, Bin) ->
|
||||||
File = filename:join(emqx_plugins:install_dir(), FileName),
|
File = filename:join(emqx_plugins:install_dir(), FileName),
|
||||||
|
io:format("xx:~p~n", [File]),
|
||||||
ok = file:write_file(File, Bin),
|
ok = file:write_file(File, Bin),
|
||||||
PackageName = string:trim(FileName, trailing, ".tar.gz"),
|
PackageName = string:trim(FileName, trailing, ".tar.gz"),
|
||||||
emqx_plugins:ensure_installed(PackageName).
|
emqx_plugins:ensure_installed(PackageName).
|
||||||
|
@ -364,40 +365,28 @@ parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) -> {behin
|
||||||
parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}.
|
parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}.
|
||||||
|
|
||||||
format_plugins(List) ->
|
format_plugins(List) ->
|
||||||
StatusList = merge_running_status(List, #{}),
|
StatusMap = emqx_plugins_monitor:aggregate_status(List),
|
||||||
|
SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end,
|
||||||
|
SortList = lists:sort(SortFun, List),
|
||||||
|
pack_status_in_order(SortList, StatusMap).
|
||||||
|
|
||||||
|
pack_status_in_order(List, StatusMap) ->
|
||||||
{Plugins, _} =
|
{Plugins, _} =
|
||||||
lists:foldr(fun({_Node, Plugins}, {Acc, StatusAcc}) ->
|
lists:foldl(fun({_Node, PluginList}, {Acc, StatusAcc}) ->
|
||||||
format_plugins_in_order(Plugins, Acc, StatusAcc)
|
pack_plugin_in_order(PluginList, Acc, StatusAcc)
|
||||||
end, {[], StatusList}, List),
|
end, {[], StatusMap}, List),
|
||||||
Plugins.
|
lists:reverse(Plugins).
|
||||||
|
|
||||||
format_plugins_in_order(Plugins, Acc0, StatusAcc0) ->
|
pack_plugin_in_order([], Acc, StatusAcc) -> {Acc, StatusAcc};
|
||||||
lists:foldr(fun(Plugin0, {Acc, StatusAcc}) ->
|
pack_plugin_in_order(_, Acc, StatusAcc)when map_size(StatusAcc) =:= 0 -> {Acc, StatusAcc};
|
||||||
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin0,
|
pack_plugin_in_order([Plugin0 | Plugins], Acc, StatusAcc) ->
|
||||||
case maps:find({Name, Vsn}, StatusAcc) of
|
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin0,
|
||||||
{ok, Status} ->
|
case maps:find({Name, Vsn}, StatusAcc) of
|
||||||
Plugin1 = maps:without([running_status, config_status], Plugin0),
|
{ok, Status} ->
|
||||||
Plugins2 = Plugin1#{running_status => Status},
|
Plugin1 = maps:without([running_status, config_status], Plugin0),
|
||||||
{
|
Plugins2 = Plugin1#{running_status => Status},
|
||||||
[Plugins2 | Acc],
|
NewStatusAcc = maps:remove({Name, Vsn}, StatusAcc),
|
||||||
maps:remove({Name, Vsn}, StatusAcc)
|
pack_plugin_in_order(Plugins, [Plugins2 | Acc], NewStatusAcc);
|
||||||
};
|
error ->
|
||||||
error -> {Acc, StatusAcc}
|
pack_plugin_in_order(Plugins, Acc, StatusAcc)
|
||||||
end
|
end.
|
||||||
end, {Acc0, StatusAcc0}, Plugins).
|
|
||||||
|
|
||||||
merge_running_status([], Acc) -> Acc;
|
|
||||||
merge_running_status([{Node, Plugins} | List], Acc) ->
|
|
||||||
NewAcc =
|
|
||||||
lists:foldl(fun(Plugin, SubAcc) ->
|
|
||||||
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin,
|
|
||||||
Key = {Name, Vsn},
|
|
||||||
Value = #{node => Node, status => plugin_status(Plugin)},
|
|
||||||
SubAcc#{Key => [Value | maps:get(Key, Acc, [])]}
|
|
||||||
end, Acc, Plugins),
|
|
||||||
merge_running_status(List, NewAcc).
|
|
||||||
|
|
||||||
%% running_status: running loaded, stopped
|
|
||||||
%% config_status: not_configured disable enable
|
|
||||||
plugin_status(#{running_status := running}) -> running;
|
|
||||||
plugin_status(_) -> stopped.
|
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_mgmt_api_plugins_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0-rc.1").
|
||||||
|
-define(PACKAGE_SUFFIX, ".tar.gz").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
WorkDir = proplists:get_value(data_dir, Config),
|
||||||
|
ok = filelib:ensure_dir(WorkDir),
|
||||||
|
DemoShDir1 = string:replace(WorkDir, "emqx_mgmt_api_plugins", "emqx_plugins"),
|
||||||
|
DemoShDir = string:replace(DemoShDir1, "emqx_management", "emqx_plugins"),
|
||||||
|
OrigInstallDir = emqx_plugins:get_config(install_dir, undefined),
|
||||||
|
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_plugins]),
|
||||||
|
emqx_plugins:put_config(install_dir, WorkDir),
|
||||||
|
|
||||||
|
[{demo_sh_dir, DemoShDir}, {orig_install_dir, OrigInstallDir} | Config].
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
|
emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]),
|
||||||
|
%% restore config
|
||||||
|
case proplists:get_value(orig_install_dir, Config) of
|
||||||
|
undefined -> ok;
|
||||||
|
OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir)
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_plugins(Config) ->
|
||||||
|
DemoShDir = proplists:get_value(demo_sh_dir, Config),
|
||||||
|
PackagePath = build_demo_plugin_package(DemoShDir),
|
||||||
|
NameVsn = filename:basename(PackagePath, ?PACKAGE_SUFFIX),
|
||||||
|
ok = install_plugin(PackagePath),
|
||||||
|
{ok, StopRes} = describe_plugins(NameVsn),
|
||||||
|
?assertMatch(#{<<"running_status">> := [
|
||||||
|
#{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"stopped">>}]}, StopRes),
|
||||||
|
{ok, StopRes1} = update_plugin(NameVsn, "start"),
|
||||||
|
?assertEqual([], StopRes1),
|
||||||
|
{ok, StartRes} = describe_plugins(NameVsn),
|
||||||
|
?assertMatch(#{<<"running_status">> := [
|
||||||
|
#{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"running">>}]}, StartRes),
|
||||||
|
{ok, []} = update_plugin(NameVsn, "stop"),
|
||||||
|
{ok, StopRes2} = describe_plugins(NameVsn),
|
||||||
|
?assertMatch(#{<<"running_status">> := [
|
||||||
|
#{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"stopped">>}]}, StopRes2),
|
||||||
|
{ok, []} = uninstall_plugin(NameVsn),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
list_plugins() ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["plugins"]),
|
||||||
|
case emqx_mgmt_api_test_util:request_api(get, Path) of
|
||||||
|
{ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])};
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
describe_plugins(Name) ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["plugins", Name]),
|
||||||
|
case emqx_mgmt_api_test_util:request_api(get, Path) of
|
||||||
|
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
install_plugin(FilePath) ->
|
||||||
|
{ok, Token} = emqx_dashboard_admin:sign_token(<<"admin">>, <<"public">>),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["plugins", "install"]),
|
||||||
|
case emqx_mgmt_api_test_util:upload_request(Path, FilePath, "plugin",
|
||||||
|
<<"application/gzip">>, [], Token) of
|
||||||
|
{ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok;
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_plugin(Name, Action) ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, Action]),
|
||||||
|
emqx_mgmt_api_test_util:request_api(put, Path).
|
||||||
|
|
||||||
|
update_boot_order(Name, MoveBody) ->
|
||||||
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, "move"]),
|
||||||
|
case emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, MoveBody) of
|
||||||
|
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
uninstall_plugin(Name) ->
|
||||||
|
DeletePath = emqx_mgmt_api_test_util:api_path(["plugins", Name]),
|
||||||
|
emqx_mgmt_api_test_util:request_api(delete, DeletePath).
|
||||||
|
|
||||||
|
|
||||||
|
build_demo_plugin_package(Dir) ->
|
||||||
|
BuildSh = filename:join([Dir, "build-demo-plugin.sh"]),
|
||||||
|
case emqx_run_sh:do(BuildSh ++ " " ++ ?EMQX_PLUGIN_TEMPLATE_VSN,
|
||||||
|
[{cd, Dir}]) of
|
||||||
|
{ok, _} ->
|
||||||
|
FileName = "emqx_plugin_template-" ++ ?EMQX_PLUGIN_TEMPLATE_VSN ++ ?PACKAGE_SUFFIX,
|
||||||
|
Pkg = filename:join([Dir, FileName]),
|
||||||
|
case filelib:is_regular(Pkg) of
|
||||||
|
true ->
|
||||||
|
PluginPath = "./" ++ FileName,
|
||||||
|
_ = os:cmd("mv " ++ Pkg ++ " " ++ PluginPath),
|
||||||
|
true = filelib:is_regular(PluginPath),
|
||||||
|
PluginPath;
|
||||||
|
false -> error(#{reason => unexpected_build_result, not_found => Pkg})
|
||||||
|
end;
|
||||||
|
{error, {Rc, Output}} ->
|
||||||
|
io:format(user, "failed_to_build_demo_plugin, Exit = ~p, Output:~n~ts\n", [Rc, Output]),
|
||||||
|
error(failed_to_build_demo_plugin)
|
||||||
|
end.
|
|
@ -102,3 +102,70 @@ auth_header_() ->
|
||||||
|
|
||||||
api_path(Parts)->
|
api_path(Parts)->
|
||||||
?SERVER ++ filename:join([?BASE_PATH | Parts]).
|
?SERVER ++ filename:join([?BASE_PATH | Parts]).
|
||||||
|
|
||||||
|
%% Usage:
|
||||||
|
%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, [], <<"some-token">>)
|
||||||
|
%%
|
||||||
|
%% Usage with RequestData:
|
||||||
|
%% Payload = [{upload_type, <<"user_picture">>}],
|
||||||
|
%% PayloadContent = jsx:encode(Payload),
|
||||||
|
%% RequestData = [
|
||||||
|
%% {<<"payload">>, PayloadContent}
|
||||||
|
%% ]
|
||||||
|
%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, RequestData, <<"some-token">>)
|
||||||
|
-spec upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> {ok, binary()} | {error, list()} when
|
||||||
|
URL:: binary(),
|
||||||
|
FilePath:: binary(),
|
||||||
|
Name:: binary(),
|
||||||
|
MimeType:: binary(),
|
||||||
|
RequestData:: list(),
|
||||||
|
AuthorizationToken:: binary().
|
||||||
|
upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) ->
|
||||||
|
Method = post,
|
||||||
|
Filename = filename:basename(FilePath),
|
||||||
|
{ok, Data} = file:read_file(FilePath),
|
||||||
|
Boundary = emqx_guid:to_base62(emqx_guid:gen()),
|
||||||
|
RequestBody = format_multipart_formdata(Data, RequestData, Name, [Filename], MimeType, Boundary),
|
||||||
|
ContentType = "multipart/form-data; boundary=" ++ binary_to_list(Boundary),
|
||||||
|
ContentLength = integer_to_list(length(binary_to_list(RequestBody))),
|
||||||
|
Headers = [
|
||||||
|
{"Content-Length", ContentLength},
|
||||||
|
case AuthorizationToken =/= undefined of
|
||||||
|
true -> {"Authorization", "Bearer " ++ binary_to_list(AuthorizationToken)};
|
||||||
|
false -> {}
|
||||||
|
end
|
||||||
|
],
|
||||||
|
HTTPOptions = [],
|
||||||
|
Options = [{body_format, binary}],
|
||||||
|
inets:start(),
|
||||||
|
httpc:request(Method, {URL, Headers, ContentType, RequestBody}, HTTPOptions, Options).
|
||||||
|
|
||||||
|
-spec format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> binary() when
|
||||||
|
Data:: binary(),
|
||||||
|
Params:: list(),
|
||||||
|
Name:: binary(),
|
||||||
|
FileNames:: list(),
|
||||||
|
MimeType:: binary(),
|
||||||
|
Boundary:: binary().
|
||||||
|
format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) ->
|
||||||
|
StartBoundary = erlang:iolist_to_binary([<<"--">>, Boundary]),
|
||||||
|
LineSeparator = <<"\r\n">>,
|
||||||
|
WithParams = lists:foldl(fun({Key, Value}, Acc) ->
|
||||||
|
erlang:iolist_to_binary([
|
||||||
|
Acc,
|
||||||
|
StartBoundary, LineSeparator,
|
||||||
|
<<"Content-Disposition: form-data; name=\"">>, Key, <<"\"">>, LineSeparator, LineSeparator,
|
||||||
|
Value, LineSeparator
|
||||||
|
])
|
||||||
|
end, <<"">>, Params),
|
||||||
|
WithPaths = lists:foldl(fun(FileName, Acc) ->
|
||||||
|
erlang:iolist_to_binary([
|
||||||
|
Acc,
|
||||||
|
StartBoundary, LineSeparator,
|
||||||
|
<<"Content-Disposition: form-data; name=\"">>, Name, <<"\"; filename=\"">>, FileName, <<"\"">>, LineSeparator,
|
||||||
|
<<"Content-Type: ">>, MimeType, LineSeparator, LineSeparator,
|
||||||
|
Data,
|
||||||
|
LineSeparator
|
||||||
|
])
|
||||||
|
end, WithParams, FileNames),
|
||||||
|
erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]).
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -21,17 +21,55 @@
|
||||||
|
|
||||||
-export([ get_plugins/0
|
-export([ get_plugins/0
|
||||||
, start_link/0
|
, start_link/0
|
||||||
|
, aggregate_status/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([lock/1, unlock/1, lock/0, unlock/0]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-spec aggregate_status(list(tuple())) ->
|
||||||
|
[#{{Name :: binary(), Vsn :: binary()} => [#{node => node(), status => running | stopped}]}].
|
||||||
|
aggregate_status(List) -> aggregate_status(List, #{}).
|
||||||
|
|
||||||
|
aggregate_status([], Acc) -> Acc;
|
||||||
|
aggregate_status([{Node, Plugins} | List], Acc) ->
|
||||||
|
NewAcc =
|
||||||
|
lists:foldl(fun(Plugin, SubAcc) ->
|
||||||
|
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin,
|
||||||
|
Key = {Name, Vsn},
|
||||||
|
Value = #{node => Node, status => plugin_status(Plugin)},
|
||||||
|
SubAcc#{Key => [Value | maps:get(Key, Acc, [])]}
|
||||||
|
end, Acc, Plugins),
|
||||||
|
aggregate_status(List, NewAcc).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
init([]) ->
|
lock(Owner) ->
|
||||||
{ok, #{ref => next_check_time(), failed => 0}}.
|
gen_server:call(?MODULE, {lock, Owner}).
|
||||||
|
|
||||||
|
unlock(Owner) ->
|
||||||
|
gen_server:call(?MODULE, {unlock, Owner}).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
{ok, #{ref => schedule_next_check(), failed => 0, lock => undefined}}.
|
||||||
|
|
||||||
|
handle_call({lock, Owner}, _From, #{lock := undefined} = State) ->
|
||||||
|
{reply, true, State#{lock => Owner}};
|
||||||
|
handle_call({lock, Owner}, _From, #{lock := Locker} = State) ->
|
||||||
|
case is_locker_alive(Locker) of
|
||||||
|
true -> {reply, false, State};
|
||||||
|
false -> {reply, true, State#{lock => Owner}}
|
||||||
|
end;
|
||||||
|
handle_call({unlock, Owner}, _From, #{lock := Owner} = State) ->
|
||||||
|
{reply, true, State#{lock => undefined}};
|
||||||
|
handle_call({unlock, _Owner}, _From, #{lock := Locker} = State) ->
|
||||||
|
case is_locker_alive(Locker) of
|
||||||
|
true -> {reply, false, State};
|
||||||
|
false -> {reply, true, State#{lock => undefined}}
|
||||||
|
end;
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
@ -40,10 +78,9 @@ handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, Ref, check}, State = #{failed := Failed}) ->
|
handle_info({timeout, _Ref, check}, State = #{failed := Failed}) ->
|
||||||
erlang:cancel_timer(Ref),
|
|
||||||
NewFailed = maybe_alarm(check(), Failed),
|
NewFailed = maybe_alarm(check(), Failed),
|
||||||
{noreply, State#{ref => next_check_time(), failed => NewFailed}};
|
{noreply, State#{ref => schedule_next_check(), failed => NewFailed}};
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -54,23 +91,48 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
next_check_time() ->
|
schedule_next_check() ->
|
||||||
Check = emqx_plugins:get_config(check_interval, 5000),
|
Check = emqx_plugins:get_config(check_interval, 5000),
|
||||||
emqx_misc:start_timer(Check, check).
|
emqx_misc:start_timer(Check, check).
|
||||||
|
|
||||||
|
is_locker_alive({Node, Pid}) ->
|
||||||
|
case rpc:call(Node, erlang, is_process_alive, [Pid]) of
|
||||||
|
{badrpc, _} -> false;
|
||||||
|
Boolean -> Boolean
|
||||||
|
end.
|
||||||
|
|
||||||
check() ->
|
check() ->
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
case rpc:multicall(Nodes, ?MODULE, get_plugins_list, [], 15000) of
|
case rpc:multicall(Nodes, ?MODULE, get_plugins, [], 15000) of
|
||||||
{Plugins, []} -> check_plugins(Plugins);
|
{Plugins, []} -> check_plugins(Plugins);
|
||||||
{_ , BadNodes} -> {error, io_lib:format("~p rpc to ~p failed", [node(), BadNodes])}
|
{_ , BadNodes} -> {error, lists:flatten(io_lib:format("~p rpc to ~p failed", [node(), BadNodes]))}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_plugins() ->
|
get_plugins() ->
|
||||||
{node(), emqx_plugins:list()}.
|
{node(), emqx_plugins:list()}.
|
||||||
|
|
||||||
check_plugins(Plugins) ->
|
check_plugins(Plugins) ->
|
||||||
check_status(Plugins),
|
StatusMap = aggregate_status(Plugins),
|
||||||
ok.
|
case ensure_install(StatusMap) of
|
||||||
|
{error, Bad} ->
|
||||||
|
?SLOG(warning, #{msg => "plugin_not_install_on_some_node", broken_plugins => Bad});
|
||||||
|
{ok, Good} ->
|
||||||
|
check_status(Good)
|
||||||
|
end.
|
||||||
|
|
||||||
|
ensure_install(Status) ->
|
||||||
|
Size = length(mria_mnesia:running_nodes()),
|
||||||
|
{Good, Bad} =
|
||||||
|
maps:fold(fun({Key, Value}, {GoodAcc, BadAcc}) ->
|
||||||
|
case length(Value) =:= Size of
|
||||||
|
true -> {GoodAcc#{Key => Value}, BadAcc};
|
||||||
|
false -> {GoodAcc, BadAcc#{Key => Value}}
|
||||||
|
end
|
||||||
|
end, {#{}, #{}}, Status),
|
||||||
|
case map_size(Bad) =:= 0 of
|
||||||
|
true -> {ok, Good};
|
||||||
|
false -> {error, Bad}
|
||||||
|
end.
|
||||||
|
|
||||||
check_status(_Plugins) ->
|
check_status(_Plugins) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -81,3 +143,16 @@ maybe_alarm({error, _Reason}, Failed) when Failed >= 2 ->
|
||||||
0;
|
0;
|
||||||
maybe_alarm({error, _Reason}, Failed) -> Failed + 1;
|
maybe_alarm({error, _Reason}, Failed) -> Failed + 1;
|
||||||
maybe_alarm(ok, _Failed) -> 0.
|
maybe_alarm(ok, _Failed) -> 0.
|
||||||
|
|
||||||
|
%% running_status: running loaded, stopped
|
||||||
|
%% config_status: not_configured disable enable
|
||||||
|
plugin_status(#{running_status := running}) -> running;
|
||||||
|
plugin_status(_) -> stopped.
|
||||||
|
|
||||||
|
-define(RESOURCE, plugins_lock).
|
||||||
|
|
||||||
|
lock() ->
|
||||||
|
ekka_locker:acquire(?MODULE, ?RESOURCE, all, undefined).
|
||||||
|
|
||||||
|
unlock() ->
|
||||||
|
ekka_locker:release(?MODULE, ?RESOURCE, all).
|
||||||
|
|
Loading…
Reference in New Issue