diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 37cdbbf17..0cad0624e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -77,7 +77,8 @@ schema("/plugins/install") -> 'operationId' => upload_install, post => #{ description => "Install a plugin(plugin-vsn.tar.gz)." - "Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) to develop plugin.", + "Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) " + "to develop plugin.", 'requestBody' => #{ content => #{ 'multipart/form-data' => #{ @@ -97,7 +98,7 @@ schema("/plugins/:name") -> parameters => [hoconsc:ref(name)], responses => #{ 200 => hoconsc:ref(plugin), - 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found") + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } }, delete => #{ @@ -105,7 +106,7 @@ schema("/plugins/:name") -> parameters => [hoconsc:ref(name)], responses => #{ 204 => <<"Uninstall successfully">>, - 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found") + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } } }; @@ -121,7 +122,7 @@ schema("/plugins/:name/:action") -> {action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}], responses => #{ 200 => <<"OK">>, - 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found") + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) } } }; @@ -142,7 +143,7 @@ fields(plugin) -> #{ desc => "Name-Vsn: without .tar.gz", validator => fun ?MODULE:validate_name/1, - nullable => false, + required => true, example => "emqx_plugin_template-5.0-rc.1"}) }, {author, hoconsc:mk(list(string()), #{example => [<<"EMQ X Team">>]})}, @@ -151,7 +152,8 @@ fields(plugin) -> {compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})}, {git_commit_or_build_date, hoconsc:mk(string(), #{ example => "2021-12-25", - desc => "Last git commit date by `git log -1 --pretty=format:'%cd' --date=format:'%Y-%m-%d`." + desc => "Last git commit date by `git log -1 --pretty=format:'%cd' " + "--date=format:'%Y-%m-%d`.\n" " If the last commit date is not available, the build date will be presented." })}, {functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})}, @@ -159,25 +161,26 @@ fields(plugin) -> {metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})}, {rel_vsn, hoconsc:mk(binary(), #{desc => "Plugins release version", - nullable => false, + required => true, example => <<"5.0-rc.1">>}) }, {rel_apps, hoconsc:mk(hoconsc:array(binary()), #{desc => "Aplications in plugin.", - nullable => false, + required => true, example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]}) }, {repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})}, {description, hoconsc:mk(binary(), #{desc => "Plugin description.", - nullable => false, + required => true, example => "This is an demo plugin description"}) }, - {running_status, hoconsc:mk(hoconsc:array(hoconsc:ref(running_status)), #{nullable => false})}, + {running_status, hoconsc:mk(hoconsc:array(hoconsc:ref(running_status)), + #{required => true})}, {readme, hoconsc:mk(binary(), #{ example => "This is an demo plugin.", desc => "only return when `GET /plugins/{name}`.", - nullable => true})} + required => false})} ]; fields(name) -> [{name, hoconsc:mk(binary(), @@ -199,9 +202,10 @@ fields(position) -> #{ desc => """ Enable auto-boot at position in the boot list, where Position could be - 'top', 'bottom', or 'before:other-vsn', 'after:other-vsn' to specify a relative position. + 'top', 'bottom', or 'before:other-vsn', 'after:other-vsn' + to specify a relative position. """, - nullable => true + required => false })}]; fields(running_status) -> [ @@ -265,7 +269,8 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) - upload_install(post, #{} = Body) -> io:format("~p~n", [Body]), {400, #{code => 'BAD_FORM_DATA', - message => <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>} + message => + <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>} }. plugin(get, #{bindings := #{name := Name}}) -> @@ -315,28 +320,29 @@ describe_package(Name) -> delete_package(Name) -> case emqx_plugins:ensure_stopped(Name) of ok -> - emqx_plugins:ensure_disabled(Name), - emqx_plugins:purge(Name), - emqx_plugins:delete_package(Name); + _ = emqx_plugins:ensure_disabled(Name), + _ = emqx_plugins:purge(Name), + _ = emqx_plugins:delete_package(Name); Error -> Error end. %% for RPC plugin update ensure_action(Name, start) -> - emqx_plugins:ensure_enabled(Name), - emqx_plugins:ensure_started(Name); + _ = emqx_plugins:ensure_enabled(Name), + _ = emqx_plugins:ensure_started(Name); ensure_action(Name, stop) -> - emqx_plugins:ensure_stopped(Name), - emqx_plugins:ensure_disabled(Name); + _ = emqx_plugins:ensure_stopped(Name), + _ = emqx_plugins:ensure_disabled(Name); ensure_action(Name, restart) -> - emqx_plugins:ensure_enabled(Name), - emqx_plugins:restart(Name). + _ = emqx_plugins:ensure_enabled(Name), + _ = emqx_plugins:restart(Name). cluster_call(Mod, Fun, Args, Timeout) -> Nodes = mria_mnesia:running_nodes(), {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), BadNodes =/= [] andalso - ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, length(Args)}}), + ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, + mfa => {Mod, Fun, length(Args)}}), GoodRes. cluster_rpc(Mod, Fun, Args) -> @@ -358,14 +364,18 @@ return(_, {error, Reason}) -> parse_position(#{<<"position">> := <<"top">>}, _) -> front; parse_position(#{<<"position">> := <<"bottom">>}, _) -> rear; -parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) -> {error, <<"Can't before:self">>}; -parse_position(#{<<"position">> := <<"after:", Name/binary>>}, Name) -> {error, <<"Can't after:self">>}; -parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) -> {before, binary_to_list(Before)}; -parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) -> {behind, binary_to_list(After)}; +parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) -> + {error, <<"Can't before:self">>}; +parse_position(#{<<"position">> := <<"after:", Name/binary>>}, Name) -> + {error, <<"Can't after:self">>}; +parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) -> + {before, binary_to_list(Before)}; +parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) -> + {behind, binary_to_list(After)}; parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}. format_plugins(List) -> - StatusMap = emqx_plugins_monitor:aggregate_status(List), + StatusMap = aggregate_status(List), SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end, SortList = lists:sort(SortFun, List), pack_status_in_order(SortList, StatusMap). @@ -390,3 +400,21 @@ pack_plugin_in_order([Plugin0 | Plugins], Acc, StatusAcc) -> error -> pack_plugin_in_order(Plugins, Acc, StatusAcc) end. + +aggregate_status(List) -> aggregate_status(List, #{}). + +aggregate_status([], Acc) -> Acc; +aggregate_status([{Node, Plugins} | List], Acc) -> + NewAcc = + lists:foldl(fun(Plugin, SubAcc) -> + #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin, + Key = {Name, Vsn}, + Value = #{node => Node, status => plugin_status(Plugin)}, + SubAcc#{Key => [Value | maps:get(Key, Acc, [])]} + end, Acc, Plugins), + aggregate_status(List, NewAcc). + +% running_status: running loaded, stopped +%% config_status: not_configured disable enable +plugin_status(#{running_status := running}) -> running; +plugin_status(_) -> stopped. diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 932cca697..08606e50f 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -104,7 +104,8 @@ api_path(Parts)-> ?SERVER ++ filename:join([?BASE_PATH | Parts]). %% Usage: -%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, [], <<"some-token">>) +%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, +%% <<"upload">>, <<"image/png">>, [], <<"some-token">>) %% %% Usage with RequestData: %% Payload = [{upload_type, <<"user_picture">>}], @@ -112,8 +113,10 @@ api_path(Parts)-> %% RequestData = [ %% {<<"payload">>, PayloadContent} %% ] -%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, RequestData, <<"some-token">>) --spec upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> {ok, binary()} | {error, list()} when +%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, +%% <<"upload">>, <<"image/png">>, RequestData, <<"some-token">>) +-spec upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> + {ok, binary()} | {error, list()} when URL:: binary(), FilePath:: binary(), Name:: binary(), @@ -125,7 +128,8 @@ upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) - Filename = filename:basename(FilePath), {ok, Data} = file:read_file(FilePath), Boundary = emqx_guid:to_base62(emqx_guid:gen()), - RequestBody = format_multipart_formdata(Data, RequestData, Name, [Filename], MimeType, Boundary), + RequestBody = format_multipart_formdata(Data, RequestData, Name, + [Filename], MimeType, Boundary), ContentType = "multipart/form-data; boundary=" ++ binary_to_list(Boundary), ContentLength = integer_to_list(length(binary_to_list(RequestBody))), Headers = [ @@ -140,7 +144,8 @@ upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) - inets:start(), httpc:request(Method, {URL, Headers, ContentType, RequestBody}, HTTPOptions, Options). --spec format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> binary() when +-spec format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> + binary() when Data:: binary(), Params:: list(), Name:: binary(), @@ -154,7 +159,8 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> erlang:iolist_to_binary([ Acc, StartBoundary, LineSeparator, - <<"Content-Disposition: form-data; name=\"">>, Key, <<"\"">>, LineSeparator, LineSeparator, + <<"Content-Disposition: form-data; name=\"">>, Key, <<"\"">>, + LineSeparator, LineSeparator, Value, LineSeparator ]) end, <<"">>, Params), @@ -162,7 +168,8 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> erlang:iolist_to_binary([ Acc, StartBoundary, LineSeparator, - <<"Content-Disposition: form-data; name=\"">>, Name, <<"\"; filename=\"">>, FileName, <<"\"">>, LineSeparator, + <<"Content-Disposition: form-data; name=\"">>, Name, <<"\"; filename=\"">>, + FileName, <<"\"">>, LineSeparator, <<"Content-Type: ">>, MimeType, LineSeparator, LineSeparator, Data, LineSeparator diff --git a/apps/emqx_plugins/src/emqx_plugins_monitor.erl b/apps/emqx_plugins/src/emqx_plugins_monitor.erl deleted file mode 100644 index 08c5ae282..000000000 --- a/apps/emqx_plugins/src/emqx_plugins_monitor.erl +++ /dev/null @@ -1,158 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_plugins_monitor). --behaviour(gen_server). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). - --export([ get_plugins/0 - , start_link/0 - , aggregate_status/1 - ]). - --export([lock/1, unlock/1, lock/0, unlock/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --spec aggregate_status(list(tuple())) -> - [#{{Name :: binary(), Vsn :: binary()} => [#{node => node(), status => running | stopped}]}]. -aggregate_status(List) -> aggregate_status(List, #{}). - -aggregate_status([], Acc) -> Acc; -aggregate_status([{Node, Plugins} | List], Acc) -> - NewAcc = - lists:foldl(fun(Plugin, SubAcc) -> - #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin, - Key = {Name, Vsn}, - Value = #{node => Node, status => plugin_status(Plugin)}, - SubAcc#{Key => [Value | maps:get(Key, Acc, [])]} - end, Acc, Plugins), - aggregate_status(List, NewAcc). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -lock(Owner) -> - gen_server:call(?MODULE, {lock, Owner}). - -unlock(Owner) -> - gen_server:call(?MODULE, {unlock, Owner}). - -init([]) -> - {ok, #{ref => schedule_next_check(), failed => 0, lock => undefined}}. - -handle_call({lock, Owner}, _From, #{lock := undefined} = State) -> - {reply, true, State#{lock => Owner}}; -handle_call({lock, Owner}, _From, #{lock := Locker} = State) -> - case is_locker_alive(Locker) of - true -> {reply, false, State}; - false -> {reply, true, State#{lock => Owner}} - end; -handle_call({unlock, Owner}, _From, #{lock := Owner} = State) -> - {reply, true, State#{lock => undefined}}; -handle_call({unlock, _Owner}, _From, #{lock := Locker} = State) -> - case is_locker_alive(Locker) of - true -> {reply, false, State}; - false -> {reply, true, State#{lock => undefined}} - end; -handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", call => Req}), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), - {noreply, State}. - -handle_info({timeout, _Ref, check}, State = #{failed := Failed}) -> - NewFailed = maybe_alarm(check(), Failed), - {noreply, State#{ref => schedule_next_check(), failed => NewFailed}}; -handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected_info", info => Info}), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -schedule_next_check() -> - Check = emqx_plugins:get_config(check_interval, 5000), - emqx_misc:start_timer(Check, check). - -is_locker_alive({Node, Pid}) -> - case rpc:call(Node, erlang, is_process_alive, [Pid]) of - {badrpc, _} -> false; - Boolean -> Boolean - end. - -check() -> - Nodes = mria_mnesia:running_nodes(), - case rpc:multicall(Nodes, ?MODULE, get_plugins, [], 15000) of - {Plugins, []} -> check_plugins(Plugins); - {_ , BadNodes} -> {error, lists:flatten(io_lib:format("~p rpc to ~p failed", [node(), BadNodes]))} - end. - -get_plugins() -> - {node(), emqx_plugins:list()}. - -check_plugins(Plugins) -> - StatusMap = aggregate_status(Plugins), - case ensure_install(StatusMap) of - {error, Bad} -> - ?SLOG(warning, #{msg => "plugin_not_install_on_some_node", broken_plugins => Bad}); - {ok, Good} -> - check_status(Good) - end. - -ensure_install(Status) -> - Size = length(mria_mnesia:running_nodes()), - {Good, Bad} = - maps:fold(fun({Key, Value}, {GoodAcc, BadAcc}) -> - case length(Value) =:= Size of - true -> {GoodAcc#{Key => Value}, BadAcc}; - false -> {GoodAcc, BadAcc#{Key => Value}} - end - end, {#{}, #{}}, Status), - case map_size(Bad) =:= 0 of - true -> {ok, Good}; - false -> {error, Bad} - end. - -check_status(_Plugins) -> - ok. - -%% alarm when failed 3 time. -maybe_alarm({error, _Reason}, Failed) when Failed >= 2 -> - %alarm(Reason), - 0; -maybe_alarm({error, _Reason}, Failed) -> Failed + 1; -maybe_alarm(ok, _Failed) -> 0. - -%% running_status: running loaded, stopped -%% config_status: not_configured disable enable -plugin_status(#{running_status := running}) -> running; -plugin_status(_) -> stopped. - --define(RESOURCE, plugins_lock). - -lock() -> - ekka_locker:acquire(?MODULE, ?RESOURCE, all, undefined). - -unlock() -> - ekka_locker:release(?MODULE, ?RESOURCE, all). diff --git a/apps/emqx_plugins/src/emqx_plugins_sup.erl b/apps/emqx_plugins/src/emqx_plugins_sup.erl index 5326e525d..488372cc6 100644 --- a/apps/emqx_plugins/src/emqx_plugins_sup.erl +++ b/apps/emqx_plugins/src/emqx_plugins_sup.erl @@ -26,8 +26,9 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + %% TODO: Add monitor plugins change. Monitor = emqx_plugins_monitor, - Children = [ + _Children = [ #{id => Monitor, start => {Monitor, start_link, []}, restart => permanent, @@ -42,4 +43,4 @@ init([]) -> intensity => 100, period => 10 }, - {ok, {SupFlags, Children}}. + {ok, {SupFlags, []}}.