diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index c832ed79c..7d4108908 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -7,14 +7,15 @@ {emqx_dashboard,1}. {emqx_exhook,1}. {emqx_gateway_cm,1}. -{emqx_license,1}. {emqx_management,1}. +{emqx_mgmt_api_plugins,1}. +{emqx_license,1}. {emqx_mgmt_trace,1}. {emqx_persistent_session,1}. {emqx_plugin_libs,1}. {emqx_prometheus,1}. {emqx_resource,1}. +{emqx_slow_subs,1}. {emqx_statsd,1}. {emqx_telemetry,1}. {emqx_topic_metrics,1}. -{emqx_slow_subs,1}. diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index 3af10b959..283eb21f0 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -4,7 +4,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, - {applications, [kernel,stdlib,minirest,emqx]}, + {applications, [kernel,stdlib,emqx_plugins,minirest,emqx]}, {mod, {emqx_mgmt_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl new file mode 100644 index 000000000..c7c963626 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -0,0 +1,408 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugins). + +-behaviour(minirest_api). + +-include_lib("kernel/include/file.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +%%-include_lib("emqx_plugins/include/emqx_plugins.hrl"). + +-export([ api_spec/0 + , fields/1 + , paths/0 + , schema/1 + , namespace/0 + ]). + +-export([ list_plugins/2 + , upload_install/2 + , plugin/2 + , update_plugin/2 + , update_boot_order/2 + ]). + +-export([ validate_name/1 + , get_plugins/0 + , install_package/2 + , delete_package/1 + , describe_package/1 + , ensure_action/2 + ]). + +-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_.]*$"). + +namespace() -> "plugins". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +%% Don't change the path's order +paths() -> + [ + "/plugins", + "/plugins/:name", + "/plugins/install", + "/plugins/:name/:action", + "/plugins/:name/move" + ]. + +schema("/plugins") -> + #{ + 'operationId' => list_plugins, + get => #{ + description => "List all install plugins.
" + "Plugins are launched in top-down order.
" + "Using `POST /plugins/{name}/move` to change the boot order.", + responses => #{ + 200 => hoconsc:array(hoconsc:ref(plugin)) + } + } + }; +schema("/plugins/install") -> + #{ + 'operationId' => upload_install, + post => #{ + description => "Install a plugin(plugin-vsn.tar.gz)." + "Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) " + "to develop plugin.", + 'requestBody' => #{ + content => #{ + 'multipart/form-data' => #{ + schema => #{ + type => object, + properties => #{ + plugin => #{type => string, format => binary}}}, + encoding => #{plugin => #{'contentType' => 'application/gzip'}}}}}, + responses => #{200 => <<"OK">>} + } + }; +schema("/plugins/:name") -> + #{ + 'operationId' => plugin, + get => #{ + description => "Describe a plugin according `release.json` and `README.md`.", + parameters => [hoconsc:ref(name)], + responses => #{ + 200 => hoconsc:ref(plugin), + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) + } + }, + delete => #{ + description => "Uninstall a plugin package.", + parameters => [hoconsc:ref(name)], + responses => #{ + 204 => <<"Uninstall successfully">>, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) + } + } + }; +schema("/plugins/:name/:action") -> + #{ + 'operationId' => update_plugin, + put => #{ + description => "start/stop a installed plugin.
" + "- **start**: start the plugin.
" + "- **stop**: stop the plugin.
", + parameters => [ + hoconsc:ref(name), + {action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}], + responses => #{ + 200 => <<"OK">>, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>) + } + } + }; +schema("/plugins/:name/move") -> + #{ + 'operationId' => update_boot_order, + post => #{ + description => "Setting the boot order of plugins.", + parameters => [hoconsc:ref(name)], + 'requestBody' => move_request_body(), + responses => #{200 => <<"OK">>} + } + }. + +fields(plugin) -> + [ + {name, hoconsc:mk(binary(), + #{ + desc => "Name-Vsn: without .tar.gz", + validator => fun ?MODULE:validate_name/1, + required => true, + example => "emqx_plugin_template-5.0-rc.1"}) + }, + {author, hoconsc:mk(list(string()), #{example => [<<"EMQ X Team">>]})}, + {builder, hoconsc:ref(?MODULE, builder)}, + {built_on_otp_release, hoconsc:mk(string(), #{example => "24"})}, + {compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})}, + {git_commit_or_build_date, hoconsc:mk(string(), #{ + example => "2021-12-25", + desc => "Last git commit date by `git log -1 --pretty=format:'%cd' " + "--date=format:'%Y-%m-%d`.\n" + " If the last commit date is not available, the build date will be presented." + })}, + {functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})}, + {git_ref, hoconsc:mk(string(), #{example => "ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1"})}, + {metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})}, + {rel_vsn, hoconsc:mk(binary(), + #{desc => "Plugins release version", + required => true, + example => <<"5.0-rc.1">>}) + }, + {rel_apps, hoconsc:mk(hoconsc:array(binary()), + #{desc => "Aplications in plugin.", + required => true, + example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]}) + }, + {repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})}, + {description, hoconsc:mk(binary(), + #{desc => "Plugin description.", + required => true, + example => "This is an demo plugin description"}) + }, + {running_status, hoconsc:mk(hoconsc:array(hoconsc:ref(running_status)), + #{required => true})}, + {readme, hoconsc:mk(binary(), #{ + example => "This is an demo plugin.", + desc => "only return when `GET /plugins/{name}`.", + required => false})} + ]; +fields(name) -> + [{name, hoconsc:mk(binary(), + #{ + desc => list_to_binary(?NAME_RE), + example => "emqx_plugin_template-5.0-rc.1", + in => path, + validator => fun ?MODULE:validate_name/1 + })} + ]; +fields(builder) -> + [ + {contact, hoconsc:mk(string(), #{example => "emqx-support@emqx.io"})}, + {name, hoconsc:mk(string(), #{example => "EMQ X Team"})}, + {website, hoconsc:mk(string(), #{example => "www.emqx.com"})} + ]; +fields(position) -> + [{position, hoconsc:mk(hoconsc:union([top, bottom, binary()]), + #{ + desc => """ + Enable auto-boot at position in the boot list, where Position could be + 'top', 'bottom', or 'before:other-vsn', 'after:other-vsn' + to specify a relative position. + """, + required => false + })}]; +fields(running_status) -> + [ + {node, hoconsc:mk(string(), #{example => "emqx@127.0.0.1"})}, + {status, hoconsc:mk(hoconsc:enum([running, stopped]), #{ + desc => "Install plugin status at runtime
" + "1. running: plugin is running.
" + "2. stopped: plugin is stopped.
" + })} + ]. + +move_request_body() -> + emqx_dashboard_swagger:schema_with_examples(hoconsc:ref(?MODULE, position), + #{ + move_to_top => #{ + summary => <<"move plugin on the top">>, + value => #{position => <<"top">>} + }, + move_to_bottom => #{ + summary => <<"move plugin on the bottom">>, + value => #{position => <<"bottom">>} + }, + move_to_before => #{ + summary => <<"move plugin before other plugins">>, + value => #{position => <<"before:emqx_plugin_demo-5.1-rc.2">>} + }, + move_to_after => #{ + summary => <<"move plugin after other plugins">>, + value => #{position => <<"after:emqx_plugin_demo-5.1-rc.2">>} + } + }). + +validate_name(Name) -> + NameLen = byte_size(Name), + case NameLen > 0 andalso NameLen =< 256 of + true -> + case re:run(Name, ?NAME_RE) of + nomatch -> {error, "Name should be " ?NAME_RE}; + _ -> ok + end; + false -> {error, "Name Length must =< 256"} + end. + +%% API CallBack Begin +list_plugins(get, _) -> + {Plugins, []} = emqx_mgmt_api_plugins_proto_v1:get_plugins(), + {200, format_plugins(Plugins)}. + +get_plugins() -> + {node(), emqx_plugins:list()}. + +upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -> + [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)), + %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall + %% TODO what happened when a new node join in? + %% emqx_plugins_monitor should copy plugins from other core node when boot-up. + {Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin), + case lists:filter(fun(R) -> R =/= ok end, Res) of + [] -> {200}; + [{error, Reason} | _] -> + {400, #{code => 'UNEXPECTED_ERROR', + message => iolist_to_binary(io_lib:format("~p", [Reason]))}} + end; +upload_install(post, #{} = Body) -> + io:format("~p~n", [Body]), + {400, #{code => 'BAD_FORM_DATA', + message => + <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>} + }. + +plugin(get, #{bindings := #{name := Name}}) -> + {Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name), + case format_plugins(Plugins) of + [Plugin] -> {200, Plugin}; + [] -> {404, #{code => 'NOT_FOUND', message => Name}} + end; + +plugin(delete, #{bindings := #{name := Name}}) -> + {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:delete_package(Name), + return(204, Res). + +update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> + {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action), + return(204, Res). + +update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> + case parse_position(Body, Name) of + {error, Reason} -> {400, #{code => 'BAD_POSITION', message => Reason}}; + Position -> + case emqx_plugins:ensure_enabled(Name, Position) of + ok -> {200}; + {error, Reason} -> + {400, #{code => 'MOVE_FAILED', + message => iolist_to_binary(io_lib:format("~p", [Reason]))}} + end + end. + +%% API CallBack End + +%% For RPC upload_install/2 +install_package(FileName, Bin) -> + File = filename:join(emqx_plugins:install_dir(), FileName), + io:format("xx:~p~n", [File]), + ok = file:write_file(File, Bin), + PackageName = string:trim(FileName, trailing, ".tar.gz"), + emqx_plugins:ensure_installed(PackageName). + +%% For RPC plugin get +describe_package(Name) -> + Node = node(), + case emqx_plugins:describe(Name) of + {ok, Plugin} -> {Node, [Plugin]}; + _ -> {Node, []} + end. + +%% For RPC plugin delete +delete_package(Name) -> + case emqx_plugins:ensure_stopped(Name) of + ok -> + _ = emqx_plugins:ensure_disabled(Name), + _ = emqx_plugins:purge(Name), + _ = emqx_plugins:delete_package(Name); + Error -> Error + end. + +%% for RPC plugin update +ensure_action(Name, start) -> + _ = emqx_plugins:ensure_enabled(Name), + _ = emqx_plugins:ensure_started(Name); +ensure_action(Name, stop) -> + _ = emqx_plugins:ensure_stopped(Name), + _ = emqx_plugins:ensure_disabled(Name); +ensure_action(Name, restart) -> + _ = emqx_plugins:ensure_enabled(Name), + _ = emqx_plugins:restart(Name). + +return(Code, ok) -> {Code}; +return(Code, {ok, Result}) -> {Code, Result}; +return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) -> + {404, #{code => 'NOT_FOUND', message => Path}}; +return(_, {error, Reason}) -> + {400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}. + +parse_position(#{<<"position">> := <<"top">>}, _) -> front; +parse_position(#{<<"position">> := <<"bottom">>}, _) -> rear; +parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) -> + {error, <<"Can't before:self">>}; +parse_position(#{<<"position">> := <<"after:", Name/binary>>}, Name) -> + {error, <<"Can't after:self">>}; +parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) -> + {before, binary_to_list(Before)}; +parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) -> + {behind, binary_to_list(After)}; +parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}. + +format_plugins(List) -> + StatusMap = aggregate_status(List), + SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end, + SortList = lists:sort(SortFun, List), + pack_status_in_order(SortList, StatusMap). + +pack_status_in_order(List, StatusMap) -> + {Plugins, _} = + lists:foldl(fun({_Node, PluginList}, {Acc, StatusAcc}) -> + pack_plugin_in_order(PluginList, Acc, StatusAcc) + end, {[], StatusMap}, List), + lists:reverse(Plugins). + +pack_plugin_in_order([], Acc, StatusAcc) -> {Acc, StatusAcc}; +pack_plugin_in_order(_, Acc, StatusAcc)when map_size(StatusAcc) =:= 0 -> {Acc, StatusAcc}; +pack_plugin_in_order([Plugin0 | Plugins], Acc, StatusAcc) -> + #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin0, + case maps:find({Name, Vsn}, StatusAcc) of + {ok, Status} -> + Plugin1 = maps:without([running_status, config_status], Plugin0), + Plugins2 = Plugin1#{running_status => Status}, + NewStatusAcc = maps:remove({Name, Vsn}, StatusAcc), + pack_plugin_in_order(Plugins, [Plugins2 | Acc], NewStatusAcc); + error -> + pack_plugin_in_order(Plugins, Acc, StatusAcc) + end. + +aggregate_status(List) -> aggregate_status(List, #{}). + +aggregate_status([], Acc) -> Acc; +aggregate_status([{Node, Plugins} | List], Acc) -> + NewAcc = + lists:foldl(fun(Plugin, SubAcc) -> + #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin, + Key = {Name, Vsn}, + Value = #{node => Node, status => plugin_status(Plugin)}, + SubAcc#{Key => [Value | maps:get(Key, Acc, [])]} + end, Acc, Plugins), + aggregate_status(List, NewAcc). + +% running_status: running loaded, stopped +%% config_status: not_configured disable enable +plugin_status(#{running_status := running}) -> running; +plugin_status(_) -> stopped. diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl new file mode 100644 index 000000000..4a2d9e65d --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugins_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + , get_plugins/0 + , install_package/2 + , describe_package/1 + , delete_package/1 + , ensure_action/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_plugins() -> emqx_rpc:multicall_result(). +get_plugins() -> + rpc:multicall(emqx_mgmt_api_plugins, get_plugins, [], 15000). + +-spec install_package(binary() | string(), binary()) -> emqx_rpc:multicall_result(). +install_package(Filename, Bin) -> + rpc:multicall(emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000). + +-spec describe_package(binary() | string()) -> emqx_rpc:multicall_result(). +describe_package(Name) -> + rpc:multicall(emqx_mgmt_api_plugins, describe_package, [Name], 10000). + +-spec delete_package(binary() | string()) -> emqx_cluster_rpc:multicall_return(). +delete_package(Name) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000). + +-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> + emqx_cluster_rpc:multicall_return(). +ensure_action(Name, Action) -> + emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000). diff --git a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl new file mode 100644 index 000000000..a6e255d69 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl @@ -0,0 +1,119 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugins_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0-rc.1"). +-define(PACKAGE_SUFFIX, ".tar.gz"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = proplists:get_value(data_dir, Config), + ok = filelib:ensure_dir(WorkDir), + DemoShDir1 = string:replace(WorkDir, "emqx_mgmt_api_plugins", "emqx_plugins"), + DemoShDir = string:replace(DemoShDir1, "emqx_management", "emqx_plugins"), + OrigInstallDir = emqx_plugins:get_config(install_dir, undefined), + ok = filelib:ensure_dir(DemoShDir), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_plugins]), + emqx_plugins:put_config(install_dir, DemoShDir), + + [{demo_sh_dir, DemoShDir}, {orig_install_dir, OrigInstallDir} | Config]. + +end_per_suite(Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]), + %% restore config + case proplists:get_value(orig_install_dir, Config) of + undefined -> ok; + OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir) + end, + ok. + +t_plugins(Config) -> + DemoShDir = proplists:get_value(demo_sh_dir, Config), + PackagePath = build_demo_plugin_package(DemoShDir), + ct:pal("package_location:~p install dir:~p", [PackagePath, emqx_plugins:install_dir()]), + NameVsn = filename:basename(PackagePath, ?PACKAGE_SUFFIX), + ok = install_plugin(PackagePath), + {ok, StopRes} = describe_plugins(NameVsn), + ?assertMatch(#{<<"running_status">> := [ + #{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"stopped">>}]}, StopRes), + {ok, StopRes1} = update_plugin(NameVsn, "start"), + ?assertEqual([], StopRes1), + {ok, StartRes} = describe_plugins(NameVsn), + ?assertMatch(#{<<"running_status">> := [ + #{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"running">>}]}, StartRes), + {ok, []} = update_plugin(NameVsn, "stop"), + {ok, StopRes2} = describe_plugins(NameVsn), + ?assertMatch(#{<<"running_status">> := [ + #{<<"node">> := <<"test@127.0.0.1">>, <<"status">> := <<"stopped">>}]}, StopRes2), + {ok, []} = uninstall_plugin(NameVsn), + ok. + +list_plugins() -> + Path = emqx_mgmt_api_test_util:api_path(["plugins"]), + case emqx_mgmt_api_test_util:request_api(get, Path) of + {ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])}; + Error -> Error + end. + +describe_plugins(Name) -> + Path = emqx_mgmt_api_test_util:api_path(["plugins", Name]), + case emqx_mgmt_api_test_util:request_api(get, Path) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +install_plugin(FilePath) -> + {ok, Token} = emqx_dashboard_admin:sign_token(<<"admin">>, <<"public">>), + Path = emqx_mgmt_api_test_util:api_path(["plugins", "install"]), + case emqx_mgmt_api_test_util:upload_request(Path, FilePath, "plugin", + <<"application/gzip">>, [], Token) of + {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, <<>>}} -> ok; + Error -> Error + end. + +update_plugin(Name, Action) -> + Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, Action]), + emqx_mgmt_api_test_util:request_api(put, Path). + +update_boot_order(Name, MoveBody) -> + Auth = emqx_mgmt_api_test_util:auth_header_(), + Path = emqx_mgmt_api_test_util:api_path(["plugins", Name, "move"]), + case emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, MoveBody) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +uninstall_plugin(Name) -> + DeletePath = emqx_mgmt_api_test_util:api_path(["plugins", Name]), + emqx_mgmt_api_test_util:request_api(delete, DeletePath). + + +build_demo_plugin_package(Dir) -> + #{package := Pkg} = emqx_plugins_SUITE:build_demo_plugin_package(), + FileName = "emqx_plugin_template-" ++ ?EMQX_PLUGIN_TEMPLATE_VSN ++ ?PACKAGE_SUFFIX, + PluginPath = "./" ++ FileName, + Pkg = filename:join([Dir, FileName]), + _ = os:cmd("cp " ++ Pkg ++ " " ++ PluginPath), + true = filelib:is_regular(PluginPath), + PluginPath. diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 42481abcb..08606e50f 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -102,3 +102,77 @@ auth_header_() -> api_path(Parts)-> ?SERVER ++ filename:join([?BASE_PATH | Parts]). + +%% Usage: +%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, +%% <<"upload">>, <<"image/png">>, [], <<"some-token">>) +%% +%% Usage with RequestData: +%% Payload = [{upload_type, <<"user_picture">>}], +%% PayloadContent = jsx:encode(Payload), +%% RequestData = [ +%% {<<"payload">>, PayloadContent} +%% ] +%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, +%% <<"upload">>, <<"image/png">>, RequestData, <<"some-token">>) +-spec upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> + {ok, binary()} | {error, list()} when + URL:: binary(), + FilePath:: binary(), + Name:: binary(), + MimeType:: binary(), + RequestData:: list(), + AuthorizationToken:: binary(). +upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> + Method = post, + Filename = filename:basename(FilePath), + {ok, Data} = file:read_file(FilePath), + Boundary = emqx_guid:to_base62(emqx_guid:gen()), + RequestBody = format_multipart_formdata(Data, RequestData, Name, + [Filename], MimeType, Boundary), + ContentType = "multipart/form-data; boundary=" ++ binary_to_list(Boundary), + ContentLength = integer_to_list(length(binary_to_list(RequestBody))), + Headers = [ + {"Content-Length", ContentLength}, + case AuthorizationToken =/= undefined of + true -> {"Authorization", "Bearer " ++ binary_to_list(AuthorizationToken)}; + false -> {} + end + ], + HTTPOptions = [], + Options = [{body_format, binary}], + inets:start(), + httpc:request(Method, {URL, Headers, ContentType, RequestBody}, HTTPOptions, Options). + +-spec format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> + binary() when + Data:: binary(), + Params:: list(), + Name:: binary(), + FileNames:: list(), + MimeType:: binary(), + Boundary:: binary(). +format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> + StartBoundary = erlang:iolist_to_binary([<<"--">>, Boundary]), + LineSeparator = <<"\r\n">>, + WithParams = lists:foldl(fun({Key, Value}, Acc) -> + erlang:iolist_to_binary([ + Acc, + StartBoundary, LineSeparator, + <<"Content-Disposition: form-data; name=\"">>, Key, <<"\"">>, + LineSeparator, LineSeparator, + Value, LineSeparator + ]) + end, <<"">>, Params), + WithPaths = lists:foldl(fun(FileName, Acc) -> + erlang:iolist_to_binary([ + Acc, + StartBoundary, LineSeparator, + <<"Content-Disposition: form-data; name=\"">>, Name, <<"\"; filename=\"">>, + FileName, <<"\"">>, LineSeparator, + <<"Content-Type: ">>, MimeType, LineSeparator, LineSeparator, + Data, + LineSeparator + ]) + end, WithParams, FileNames), + erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]). diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index bcfa1f0d9..0e54b8450 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -24,6 +24,7 @@ , ensure_enabled/1 , ensure_enabled/2 , ensure_disabled/1 + , purge/1 , delete_package/1 ]). @@ -43,6 +44,9 @@ %% internal -export([ do_ensure_started/1 ]). +-export([ + install_dir/0 + ]). -ifdef(TEST). -compile(export_all). @@ -55,7 +59,7 @@ -type name_vsn() :: binary() | string(). %% "my_plugin-0.1.0" -type plugin() :: map(). %% the parse result of the JSON info file --type position() :: no_move | front | rear | {before, name_vsn()}. +-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}. %%-------------------------------------------------------------------- %% APIs @@ -63,12 +67,12 @@ %% @doc Describe a plugin. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}. -describe(NameVsn) -> read_plugin(NameVsn). +describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}). %% @doc Install a .tar.gz package placed in install_dir. -spec ensure_installed(name_vsn()) -> ok | {error, any()}. ensure_installed(NameVsn) -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, _} -> ok; {error, _} -> @@ -80,7 +84,7 @@ do_ensure_installed(NameVsn) -> TarGz = pkg_file(NameVsn), case erl_tar:extract(TarGz, [{cwd, install_dir()}, compressed]) of ok -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, _} -> ok; {error, Reason} -> ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), @@ -103,7 +107,7 @@ do_ensure_installed(NameVsn) -> %% If a plugin is running, or enabled, error is returned. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}. ensure_uninstalled(NameVsn) -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, #{running_status := RunningSt}} when RunningSt =/= stopped -> {error, #{reason => "bad_plugin_running_status", hint => "stop_the_plugin_first" @@ -134,7 +138,7 @@ ensure_disabled(NameVsn) -> ensure_state(NameVsn, Position, State) when is_binary(NameVsn) -> ensure_state(binary_to_list(NameVsn), Position, State); ensure_state(NameVsn, Position, State) -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, _} -> Item = #{ name_vsn => NameVsn , enable => State @@ -166,7 +170,7 @@ add_new_configured(Configured, front, Item) -> [Item | Configured]; add_new_configured(Configured, rear, Item) -> Configured ++ [Item]; -add_new_configured(Configured, {before, NameVsn}, Item) -> +add_new_configured(Configured, {Action, NameVsn}, Item) -> SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end, {Front, Rear} = lists:splitwith(SplitFun, Configured), Rear =:= [] andalso @@ -174,7 +178,13 @@ add_new_configured(Configured, {before, NameVsn}, Item) -> hint => "maybe_install_and_configure", name_vsn => NameVsn }), - Front ++ [Item | Rear]. + case Action of + before -> Front ++ [Item | Rear]; + behind -> + [Anchor | Rear0] = Rear, + Front ++ [Anchor, Item | Rear0] + end. + %% @doc Delete the package file. -spec delete_package(name_vsn()) -> ok. @@ -259,7 +269,7 @@ list() -> Pattern = filename:join([install_dir(), "*", "release.json"]), All = lists:filtermap( fun(JsonFile) -> - case read_plugin({file, JsonFile}) of + case read_plugin({file, JsonFile}, #{}) of {ok, Info} -> {true, Info}; {error, Reason} -> @@ -314,26 +324,36 @@ tryit(WhichOp, F) -> %% read plugin info from the JSON file %% returns {ok, Info} or {error, Reason} -read_plugin(NameVsn) -> +read_plugin(NameVsn, Options) -> tryit("read_plugin_info", - fun() -> {ok, do_read_plugin(NameVsn)} end). + fun() -> {ok, do_read_plugin(NameVsn, Options)} end). -do_read_plugin({file, InfoFile}) -> +do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}). + +do_read_plugin({file, InfoFile}, Options) -> [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)), case hocon:load(InfoFile, #{format => richmap}) of {ok, RichMap} -> - Info = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile), - maps:merge(Info, plugin_status(NameVsn)); + Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile), + Info1 = plugins_readme(NameVsn, Options, Info0), + plugin_status(NameVsn, Info1); {error, Reason} -> throw(#{error => "bad_info_file", path => InfoFile, return => Reason }) end; -do_read_plugin(NameVsn) -> - do_read_plugin({file, info_file(NameVsn)}). +do_read_plugin(NameVsn, Options) -> + do_read_plugin({file, info_file(NameVsn)}, Options). -plugin_status(NameVsn) -> +plugins_readme(NameVsn, #{fill_readme := true}, Info) -> + case file:read_file(readme_file(NameVsn)) of + {ok, Bin} -> Info#{readme => Bin}; + _ -> Info#{readme => <<>>} + end; +plugins_readme(_NameVsn, _Options, Info) -> Info. + +plugin_status(NameVsn, Info) -> {AppName, _AppVsn} = parse_name_vsn(NameVsn), RunningSt = case application:get_key(AppName, vsn) of @@ -357,9 +377,9 @@ plugin_status(NameVsn) -> [true] -> enabled; [false] -> disabled end, - #{ running_status => RunningSt - , config_status => ConfSt - }. + Info#{ running_status => RunningSt + , config_status => ConfSt + }. bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); @@ -592,6 +612,9 @@ dir(NameVsn) -> info_file(NameVsn) -> filename:join([dir(NameVsn), "release.json"]). +readme_file(NameVsn) -> + filename:join([dir(NameVsn), "README.md"]). + running_apps() -> lists:map(fun({N, _, V}) -> {N, V} diff --git a/apps/emqx_plugins/src/emqx_plugins_app.erl b/apps/emqx_plugins/src/emqx_plugins_app.erl index 6b53438bb..70fab549f 100644 --- a/apps/emqx_plugins/src/emqx_plugins_app.erl +++ b/apps/emqx_plugins/src/emqx_plugins_app.erl @@ -23,8 +23,8 @@ ]). start(_Type, _Args) -> - {ok, Sup} = emqx_plugins_sup:start_link(), ok = emqx_plugins:ensure_started(), %% load all pre-configured + {ok, Sup} = emqx_plugins_sup:start_link(), {ok, Sup}. stop(_State) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_schema.erl b/apps/emqx_plugins/src/emqx_plugins_schema.erl index 2de8430a1..b7498d2ee 100644 --- a/apps/emqx_plugins/src/emqx_plugins_schema.erl +++ b/apps/emqx_plugins/src/emqx_plugins_schema.erl @@ -66,6 +66,7 @@ state_fields() -> root_fields() -> [ {states, fun states/1} , {install_dir, fun install_dir/1} + , {check_interval, fun check_interval/1} ]. states(type) -> hoconsc:array(hoconsc:ref(?MODULE, state)); @@ -87,3 +88,11 @@ the subdirectory named as emqx_foo_bar-0.1.0. NOTE: For security reasons, this directory should **NOT** be writable by anyone except emqx (or any user which runs EMQX). """. + +check_interval(type) -> emqx_schema:duration(); +check_interval(default) -> "5s"; +check_interval(T) when T =/= desc -> undefined; +check_interval(desc) -> """ +Check interval: check if the status of the plugins in the cluster is consistent,
+if the results of 3 consecutive checks are not consistent, then alarm. +""". diff --git a/apps/emqx_plugins/src/emqx_plugins_sup.erl b/apps/emqx_plugins/src/emqx_plugins_sup.erl index 45d9b8ca2..488372cc6 100644 --- a/apps/emqx_plugins/src/emqx_plugins_sup.erl +++ b/apps/emqx_plugins/src/emqx_plugins_sup.erl @@ -26,5 +26,21 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Children = [], - {ok, {{one_for_one, 10, 10}, Children}}. + %% TODO: Add monitor plugins change. + Monitor = emqx_plugins_monitor, + _Children = [ + #{id => Monitor, + start => {Monitor, start_link, []}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [Monitor] + } + ], + SupFlags = + #{ + strategy => one_for_one, + intensity => 100, + period => 10 + }, + {ok, {SupFlags, []}}. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 7fb778b2f..8d50ef7f7 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -63,6 +63,7 @@ build_demo_plugin_package() -> , git_url => "https://github.com/emqx/emqx-plugin-template.git" , vsn => ?EMQX_PLUGIN_TEMPLATE_VSN , workdir => "demo_src" + , shdir => emqx_plugins:install_dir() }). build_demo_plugin_package(#{ target_path := TargetPath @@ -70,8 +71,8 @@ build_demo_plugin_package(#{ target_path := TargetPath , git_url := GitUrl , vsn := PluginVsn , workdir := DemoWorkDir + , shdir := WorkDir } = Opts) -> - WorkDir = emqx_plugins:install_dir(), BuildSh = filename:join([WorkDir, "build-demo-plugin.sh"]), Cmd = string:join([ BuildSh , PluginVsn @@ -115,8 +116,8 @@ t_demo_install_start_stop_uninstall(Config) -> ok = emqx_plugins:ensure_installed(NameVsn), %% idempotent ok = emqx_plugins:ensure_installed(NameVsn), - {ok, Info} = emqx_plugins:read_plugin(NameVsn), - ?assertEqual([Info], emqx_plugins:list()), + {ok, Info} = emqx_plugins:describe(NameVsn), + ?assertEqual([maps:without([readme], Info)], emqx_plugins:list()), %% start ok = emqx_plugins:ensure_started(NameVsn), ok = assert_app_running(emqx_plugin_template, true), @@ -158,6 +159,39 @@ write_info_file(Config, NameVsn, Content) -> ok = filelib:ensure_dir(InfoFile), ok = file:write_file(InfoFile, Content). +t_position({init, Config}) -> + #{package := Package} = build_demo_plugin_package(), + NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), + [{name_vsn, NameVsn} | Config]; +t_position({'end', _Config}) -> ok; +t_position(Config) -> + NameVsn = proplists:get_value(name_vsn, Config), + ok = emqx_plugins:ensure_installed(NameVsn), + ok = emqx_plugins:ensure_enabled(NameVsn), + FakeInfo = "name=position, rel_vsn=\"2\", rel_apps=[\"position-9\"]," + "description=\"desc fake position app\"", + PosApp2 = <<"position-2">>, + ok = write_info_file(Config, PosApp2, FakeInfo), + %% fake a disabled plugin in config + ok = emqx_plugins:ensure_state(PosApp2, {before, NameVsn}, false), + ListFun = fun() -> + lists:map(fun( + #{<<"name">> := Name, <<"rel_vsn">> := Vsn}) -> + <> + end, emqx_plugins:list()) + end, + ?assertEqual([PosApp2, list_to_binary(NameVsn)], ListFun()), + emqx_plugins:ensure_enabled(PosApp2, {behind, NameVsn}), + ?assertEqual([list_to_binary(NameVsn), PosApp2], ListFun()), + + ok = emqx_plugins:ensure_stopped(), + ok = emqx_plugins:ensure_disabled(NameVsn), + ok = emqx_plugins:ensure_disabled(PosApp2), + ok = emqx_plugins:ensure_uninstalled(NameVsn), + ok = emqx_plugins:ensure_uninstalled(PosApp2), + ?assertEqual([], emqx_plugins:list()), + ok. + t_start_restart_and_stop({init, Config}) -> #{package := Package} = build_demo_plugin_package(), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), @@ -283,12 +317,12 @@ t_bad_info_json(Config) -> ?assertMatch({error, #{error := "bad_info_file", return := {parse_error, _} }}, - emqx_plugins:read_plugin(NameVsn)), + emqx_plugins:describe(NameVsn)), ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"), ?assertMatch({error, #{error := "bad_info_file_content", mandatory_fields := _ }}, - emqx_plugins:read_plugin(NameVsn)), + emqx_plugins:describe(NameVsn)), ?assertEqual([], emqx_plugins:list()), emqx_plugins:purge(NameVsn), ok. @@ -300,6 +334,7 @@ t_elixir_plugin({init, Config}) -> , git_url => "https://github.com/emqx/emqx-elixir-plugin.git" , vsn => ?EMQX_ELIXIR_PLUGIN_TEMPLATE_VSN , workdir => "demo_src_elixir" + , shdir => emqx_plugins:install_dir() }, Opts = #{package := Package} = build_demo_plugin_package(Opts0), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), @@ -316,7 +351,7 @@ t_elixir_plugin(Config) -> ok = emqx_plugins:ensure_installed(NameVsn), %% idempotent ok = emqx_plugins:ensure_installed(NameVsn), - {ok, Info} = emqx_plugins:read_plugin(NameVsn), + {ok, Info} = emqx_plugins:read_plugin(NameVsn, #{}), ?assertEqual([Info], emqx_plugins:list()), %% start ok = emqx_plugins:ensure_started(NameVsn), diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index b5c9f9da8..2b7fb9fe4 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -45,7 +45,7 @@ read_plugin_test() -> try ok = write_file(InfoFile, FakeInfo), ?assertMatch({error, #{error := "bad_rel_apps"}}, - emqx_plugins:read_plugin(NameVsn)) + emqx_plugins:read_plugin(NameVsn, #{})) after emqx_plugins:purge(NameVsn) end @@ -100,4 +100,3 @@ purge_test() -> ok = file:write_file(Dir, "a"), ?assertEqual(ok, emqx_plugins:purge("a-1")) end). - diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d59835123..bbafcf43d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -41,7 +41,7 @@ ]). %% Sync resource instances and files -%% provisional solution: rpc:multical to all the nodes for creating/updating/removing +%% provisional solution: rpc:multicall to all the nodes for creating/updating/removing %% todo: replicate operations -export([ create/4 %% store the config and start the instance , create/5