diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index 3af10b959..283eb21f0 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -4,7 +4,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, - {applications, [kernel,stdlib,minirest,emqx]}, + {applications, [kernel,stdlib,emqx_plugins,minirest,emqx]}, {mod, {emqx_mgmt_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 8646d286b..37cdbbf17 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -246,7 +246,6 @@ validate_name(Name) -> end. %% API CallBack Begin - list_plugins(get, _) -> Plugins = cluster_call(emqx_plugins_monitor, get_plugins, [], 15000), {200, format_plugins(Plugins)}. @@ -263,7 +262,8 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) - {400, #{code => 'UNEXPECTED_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}} end; -upload_install(post, #{}) -> +upload_install(post, #{} = Body) -> + io:format("~p~n", [Body]), {400, #{code => 'BAD_FORM_DATA', message => <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>} }. @@ -279,7 +279,7 @@ plugin(delete, #{bindings := #{name := Name}}) -> return(204, cluster_rpc(?MODULE, delete_package, [Name])). update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> - return(200, cluster_rpc(?MODULE, ensure_action, [Name, Action])). + return(204, cluster_rpc(?MODULE, ensure_action, [Name, Action])). update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> case parse_position(Body, Name) of @@ -298,6 +298,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> %% For RPC upload_install/2 install_package(FileName, Bin) -> File = filename:join(emqx_plugins:install_dir(), FileName), + io:format("xx:~p~n", [File]), ok = file:write_file(File, Bin), PackageName = string:trim(FileName, trailing, ".tar.gz"), emqx_plugins:ensure_installed(PackageName). @@ -364,40 +365,28 @@ parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) -> {behin parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}. format_plugins(List) -> - StatusList = merge_running_status(List, #{}), + StatusMap = emqx_plugins_monitor:aggregate_status(List), + SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end, + SortList = lists:sort(SortFun, List), + pack_status_in_order(SortList, StatusMap). + +pack_status_in_order(List, StatusMap) -> {Plugins, _} = - lists:foldr(fun({_Node, Plugins}, {Acc, StatusAcc}) -> - format_plugins_in_order(Plugins, Acc, StatusAcc) - end, {[], StatusList}, List), - Plugins. + lists:foldl(fun({_Node, PluginList}, {Acc, StatusAcc}) -> + pack_plugin_in_order(PluginList, Acc, StatusAcc) + end, {[], StatusMap}, List), + lists:reverse(Plugins). -format_plugins_in_order(Plugins, Acc0, StatusAcc0) -> - lists:foldr(fun(Plugin0, {Acc, StatusAcc}) -> - #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin0, - case maps:find({Name, Vsn}, StatusAcc) of - {ok, Status} -> - Plugin1 = maps:without([running_status, config_status], Plugin0), - Plugins2 = Plugin1#{running_status => Status}, - { - [Plugins2 | Acc], - maps:remove({Name, Vsn}, StatusAcc) - }; - error -> {Acc, StatusAcc} - end - end, {Acc0, StatusAcc0}, Plugins). - -merge_running_status([], Acc) -> Acc; -merge_running_status([{Node, Plugins} | List], Acc) -> - NewAcc = - lists:foldl(fun(Plugin, SubAcc) -> - #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin, - Key = {Name, Vsn}, - Value = #{node => Node, status => plugin_status(Plugin)}, - SubAcc#{Key => [Value | maps:get(Key, Acc, [])]} - end, Acc, Plugins), - merge_running_status(List, NewAcc). - -%% running_status: running loaded, stopped -%% config_status: not_configured disable enable -plugin_status(#{running_status := running}) -> running; -plugin_status(_) -> stopped. +pack_plugin_in_order([], Acc, StatusAcc) -> {Acc, StatusAcc}; +pack_plugin_in_order(_, Acc, StatusAcc)when map_size(StatusAcc) =:= 0 -> {Acc, StatusAcc}; +pack_plugin_in_order([Plugin0 | Plugins], Acc, StatusAcc) -> + #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin0, + case maps:find({Name, Vsn}, StatusAcc) of + {ok, Status} -> + Plugin1 = maps:without([running_status, config_status], Plugin0), + Plugins2 = Plugin1#{running_status => Status}, + NewStatusAcc = maps:remove({Name, Vsn}, StatusAcc), + pack_plugin_in_order(Plugins, [Plugins2 | Acc], NewStatusAcc); + error -> + pack_plugin_in_order(Plugins, Acc, StatusAcc) + end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl new file mode 100644 index 000000000..53de59a06 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl @@ -0,0 +1,128 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugins_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0-rc.1"). +-define(PACKAGE_SUFFIX, ".tar.gz"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = proplists:get_value(data_dir, Config), + ok = filelib:ensure_dir(WorkDir), + DemoShDir1 = string:replace(WorkDir, "emqx_mgmt_api_plugins", "emqx_plugins"), + DemoShDir = string:replace(DemoShDir1, "emqx_management", "emqx_plugins"), + OrigInstallDir = emqx_plugins:get_config(install_dir, undefined), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_plugins]), + emqx_plugins:put_config(install_dir, WorkDir), + + [{demo_sh_dir, DemoShDir}, {orig_install_dir, OrigInstallDir} | Config]. + +end_per_suite(Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]), + %% restore config + case proplists:get_value(orig_install_dir, Config) of + undefined -> ok; + OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir) + end, + ok. + +t_plugins(Config) -> + DemoShDir = proplists:get_value(demo_sh_dir, Config), + PackagePath = build_demo_plugin_package(DemoShDir), + NameVsn = filename:basename(PackagePath, ?PACKAGE_SUFFIX), + ok = install_plugin(PackagePath), + {ok, StopRes} = describe_plugins(NameVsn), + ?assertMatch(#{<<"running_status">> := [ + #{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"stopped">>}]}, StopRes), + {ok, StopRes1} = update_plugin(NameVsn, "start"), + ?assertEqual([], StopRes1), + {ok, StartRes} = describe_plugins(NameVsn), + ?assertMatch(#{<<"running_status">> := [ + #{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"running">>}]}, StartRes), + {ok, []} = update_plugin(NameVsn, "stop"), + {ok, StopRes2} = describe_plugins(NameVsn), + ?assertMatch(#{<<"running_status">> := [ + #{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"stopped">>}]}, StopRes2), + {ok, []} = uninstall_plugin(NameVsn), + ok. + +list_plugins() -> + Path = emqx_mgmt_api_test_util:api_path(["plugins"]), + case emqx_mgmt_api_test_util:request_api(get, Path) of + {ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])}; + Error -> Error + end. + +describe_plugins(Name) -> + Path = emqx_mgmt_api_test_util:api_path(["plugins", Name]), + case emqx_mgmt_api_test_util:request_api(get, Path) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +install_plugin(FilePath) -> + {ok, Token} = emqx_dashboard_admin:sign_token(<<"admin">>, <<"public">>), + Path = emqx_mgmt_api_test_util:api_path(["plugins", "install"]), + case emqx_mgmt_api_test_util:upload_request(Path, FilePath, "plugin", + <<"application/gzip">>, [], Token) of + {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok; + Error -> Error + end. + +update_plugin(Name, Action) -> + Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, Action]), + emqx_mgmt_api_test_util:request_api(put, Path). + +update_boot_order(Name, MoveBody) -> + Auth = emqx_mgmt_api_test_util:auth_header_(), + Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, "move"]), + case emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, MoveBody) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +uninstall_plugin(Name) -> + DeletePath = emqx_mgmt_api_test_util:api_path(["plugins", Name]), + emqx_mgmt_api_test_util:request_api(delete, DeletePath). + + +build_demo_plugin_package(Dir) -> + BuildSh = filename:join([Dir, "build-demo-plugin.sh"]), + case emqx_run_sh:do(BuildSh ++ " " ++ ?EMQX_PLUGIN_TEMPLATE_VSN, + [{cd, Dir}]) of + {ok, _} -> + FileName = "emqx_plugin_template-" ++ ?EMQX_PLUGIN_TEMPLATE_VSN ++ ?PACKAGE_SUFFIX, + Pkg = filename:join([Dir, FileName]), + case filelib:is_regular(Pkg) of + true -> + PluginPath = "./" ++ FileName, + _ = os:cmd("mv " ++ Pkg ++ " " ++ PluginPath), + true = filelib:is_regular(PluginPath), + PluginPath; + false -> error(#{reason => unexpected_build_result, not_found => Pkg}) + end; + {error, {Rc, Output}} -> + io:format(user, "failed_to_build_demo_plugin, Exit = ~p, Output:~n~ts\n", [Rc, Output]), + error(failed_to_build_demo_plugin) + end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 42481abcb..932cca697 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -102,3 +102,70 @@ auth_header_() -> api_path(Parts)-> ?SERVER ++ filename:join([?BASE_PATH | Parts]). + +%% Usage: +%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, [], <<"some-token">>) +%% +%% Usage with RequestData: +%% Payload = [{upload_type, <<"user_picture">>}], +%% PayloadContent = jsx:encode(Payload), +%% RequestData = [ +%% {<<"payload">>, PayloadContent} +%% ] +%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, RequestData, <<"some-token">>) +-spec upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> {ok, binary()} | {error, list()} when + URL:: binary(), + FilePath:: binary(), + Name:: binary(), + MimeType:: binary(), + RequestData:: list(), + AuthorizationToken:: binary(). +upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> + Method = post, + Filename = filename:basename(FilePath), + {ok, Data} = file:read_file(FilePath), + Boundary = emqx_guid:to_base62(emqx_guid:gen()), + RequestBody = format_multipart_formdata(Data, RequestData, Name, [Filename], MimeType, Boundary), + ContentType = "multipart/form-data; boundary=" ++ binary_to_list(Boundary), + ContentLength = integer_to_list(length(binary_to_list(RequestBody))), + Headers = [ + {"Content-Length", ContentLength}, + case AuthorizationToken =/= undefined of + true -> {"Authorization", "Bearer " ++ binary_to_list(AuthorizationToken)}; + false -> {} + end + ], + HTTPOptions = [], + Options = [{body_format, binary}], + inets:start(), + httpc:request(Method, {URL, Headers, ContentType, RequestBody}, HTTPOptions, Options). + +-spec format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> binary() when + Data:: binary(), + Params:: list(), + Name:: binary(), + FileNames:: list(), + MimeType:: binary(), + Boundary:: binary(). +format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> + StartBoundary = erlang:iolist_to_binary([<<"--">>, Boundary]), + LineSeparator = <<"\r\n">>, + WithParams = lists:foldl(fun({Key, Value}, Acc) -> + erlang:iolist_to_binary([ + Acc, + StartBoundary, LineSeparator, + <<"Content-Disposition: form-data; name=\"">>, Key, <<"\"">>, LineSeparator, LineSeparator, + Value, LineSeparator + ]) + end, <<"">>, Params), + WithPaths = lists:foldl(fun(FileName, Acc) -> + erlang:iolist_to_binary([ + Acc, + StartBoundary, LineSeparator, + <<"Content-Disposition: form-data; name=\"">>, Name, <<"\"; filename=\"">>, FileName, <<"\"">>, LineSeparator, + <<"Content-Type: ">>, MimeType, LineSeparator, LineSeparator, + Data, + LineSeparator + ]) + end, WithParams, FileNames), + erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]). diff --git a/apps/emqx_plugins/src/emqx_plugins_monitor.erl b/apps/emqx_plugins/src/emqx_plugins_monitor.erl index ee06f1bcf..08c5ae282 100644 --- a/apps/emqx_plugins/src/emqx_plugins_monitor.erl +++ b/apps/emqx_plugins/src/emqx_plugins_monitor.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,17 +21,55 @@ -export([ get_plugins/0 , start_link/0 + , aggregate_status/1 ]). +-export([lock/1, unlock/1, lock/0, unlock/0]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-spec aggregate_status(list(tuple())) -> + [#{{Name :: binary(), Vsn :: binary()} => [#{node => node(), status => running | stopped}]}]. +aggregate_status(List) -> aggregate_status(List, #{}). + +aggregate_status([], Acc) -> Acc; +aggregate_status([{Node, Plugins} | List], Acc) -> + NewAcc = + lists:foldl(fun(Plugin, SubAcc) -> + #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin, + Key = {Name, Vsn}, + Value = #{node => Node, status => plugin_status(Plugin)}, + SubAcc#{Key => [Value | maps:get(Key, Acc, [])]} + end, Acc, Plugins), + aggregate_status(List, NewAcc). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -init([]) -> - {ok, #{ref => next_check_time(), failed => 0}}. +lock(Owner) -> + gen_server:call(?MODULE, {lock, Owner}). +unlock(Owner) -> + gen_server:call(?MODULE, {unlock, Owner}). + +init([]) -> + {ok, #{ref => schedule_next_check(), failed => 0, lock => undefined}}. + +handle_call({lock, Owner}, _From, #{lock := undefined} = State) -> + {reply, true, State#{lock => Owner}}; +handle_call({lock, Owner}, _From, #{lock := Locker} = State) -> + case is_locker_alive(Locker) of + true -> {reply, false, State}; + false -> {reply, true, State#{lock => Owner}} + end; +handle_call({unlock, Owner}, _From, #{lock := Owner} = State) -> + {reply, true, State#{lock => undefined}}; +handle_call({unlock, _Owner}, _From, #{lock := Locker} = State) -> + case is_locker_alive(Locker) of + true -> {reply, false, State}; + false -> {reply, true, State#{lock => undefined}} + end; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -40,10 +78,9 @@ handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. -handle_info({timeout, Ref, check}, State = #{failed := Failed}) -> - erlang:cancel_timer(Ref), +handle_info({timeout, _Ref, check}, State = #{failed := Failed}) -> NewFailed = maybe_alarm(check(), Failed), - {noreply, State#{ref => next_check_time(), failed => NewFailed}}; + {noreply, State#{ref => schedule_next_check(), failed => NewFailed}}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -54,23 +91,48 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -next_check_time() -> +schedule_next_check() -> Check = emqx_plugins:get_config(check_interval, 5000), emqx_misc:start_timer(Check, check). +is_locker_alive({Node, Pid}) -> + case rpc:call(Node, erlang, is_process_alive, [Pid]) of + {badrpc, _} -> false; + Boolean -> Boolean + end. + check() -> Nodes = mria_mnesia:running_nodes(), - case rpc:multicall(Nodes, ?MODULE, get_plugins_list, [], 15000) of + case rpc:multicall(Nodes, ?MODULE, get_plugins, [], 15000) of {Plugins, []} -> check_plugins(Plugins); - {_ , BadNodes} -> {error, io_lib:format("~p rpc to ~p failed", [node(), BadNodes])} + {_ , BadNodes} -> {error, lists:flatten(io_lib:format("~p rpc to ~p failed", [node(), BadNodes]))} end. get_plugins() -> {node(), emqx_plugins:list()}. check_plugins(Plugins) -> - check_status(Plugins), - ok. + StatusMap = aggregate_status(Plugins), + case ensure_install(StatusMap) of + {error, Bad} -> + ?SLOG(warning, #{msg => "plugin_not_install_on_some_node", broken_plugins => Bad}); + {ok, Good} -> + check_status(Good) + end. + +ensure_install(Status) -> + Size = length(mria_mnesia:running_nodes()), + {Good, Bad} = + maps:fold(fun({Key, Value}, {GoodAcc, BadAcc}) -> + case length(Value) =:= Size of + true -> {GoodAcc#{Key => Value}, BadAcc}; + false -> {GoodAcc, BadAcc#{Key => Value}} + end + end, {#{}, #{}}, Status), + case map_size(Bad) =:= 0 of + true -> {ok, Good}; + false -> {error, Bad} + end. check_status(_Plugins) -> ok. @@ -81,3 +143,16 @@ maybe_alarm({error, _Reason}, Failed) when Failed >= 2 -> 0; maybe_alarm({error, _Reason}, Failed) -> Failed + 1; maybe_alarm(ok, _Failed) -> 0. + +%% running_status: running loaded, stopped +%% config_status: not_configured disable enable +plugin_status(#{running_status := running}) -> running; +plugin_status(_) -> stopped. + +-define(RESOURCE, plugins_lock). + +lock() -> + ekka_locker:acquire(?MODULE, ?RESOURCE, all, undefined). + +unlock() -> + ekka_locker:release(?MODULE, ?RESOURCE, all).