Merge pull request #6460 from zmstone/feat-add-plugin-mgmt

refactor(plugins): refactor plugins configuration interface
This commit is contained in:
zhongwencool 2021-12-20 09:24:38 +08:00 committed by GitHub
commit 96ee51fe57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1140 additions and 622 deletions

View File

@ -160,18 +160,28 @@ format(Format, Args) ->
-spec(format_usage([cmd_usage()]) -> [string()]). -spec(format_usage([cmd_usage()]) -> [string()]).
format_usage(UsageList) -> format_usage(UsageList) ->
Width = lists:foldl(fun({CmdStr, _}, W) ->
max(iolist_size(CmdStr), W)
end, 0, UsageList),
lists:map( lists:map(
fun({CmdParams, Desc}) -> fun({CmdParams, Desc}) ->
format_usage(CmdParams, Desc) format_usage(CmdParams, Desc, Width)
end, UsageList). end, UsageList).
-spec(format_usage(cmd_params(), cmd_descr()) -> string()). -spec(format_usage(cmd_params(), cmd_descr()) -> string()).
format_usage(CmdParams, Desc) -> format_usage(CmdParams, Desc) ->
format_usage(CmdParams, Desc, 0).
format_usage(CmdParams, Desc, 0) ->
format_usage(CmdParams, Desc, iolist_size(CmdParams));
format_usage(CmdParams, Desc, Width) ->
CmdLines = split_cmd(CmdParams), CmdLines = split_cmd(CmdParams),
DescLines = split_cmd(Desc), DescLines = split_cmd(Desc),
Zipped = zip_cmd(CmdLines, DescLines),
Fmt = "~-" ++ integer_to_list(Width + 1) ++ "s# ~ts~n",
lists:foldl(fun({CmdStr, DescStr}, Usage) -> lists:foldl(fun({CmdStr, DescStr}, Usage) ->
Usage ++ format("~-70s# ~ts~n", [CmdStr, DescStr]) Usage ++ format(Fmt, [CmdStr, DescStr])
end, "", zip_cmd(CmdLines, DescLines)). end, "", Zipped).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks

View File

@ -31,6 +31,9 @@
-export([format/2]). -export([format/2]).
%% For CLI outputs
-export([best_effort_json/1]).
-ifdef(TEST). -ifdef(TEST).
-include_lib("proper/include/proper.hrl"). -include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -51,6 +54,16 @@
-define(IS_STRING(String), (is_list(String) orelse is_binary(String))). -define(IS_STRING(String), (is_list(String) orelse is_binary(String))).
%% @doc Format a list() or map() to JSON object.
%% This is used for CLI result prints,
%% or HTTP API result formatting.
%% The JSON object is pretty-printed.
%% NOTE: do not use this function for logging.
best_effort_json(Input) ->
Config = #{depth => unlimited, single_line => true},
JsonReady = best_effort_json_obj(Input, Config),
jsx:encode(JsonReady, [space, {indent, 4}]).
-spec format(logger:log_event(), config()) -> iodata(). -spec format(logger:log_event(), config()) -> iodata().
format(#{level := Level, msg := Msg, meta := Meta}, Config0) when is_map(Config0) -> format(#{level := Level, msg := Msg, meta := Meta}, Config0) when is_map(Config0) ->
Config = add_default_config(Config0), Config = add_default_config(Config0),

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_run_sh).
-export([do/2]).
do(Command, Options0) ->
Options = Options0 ++ [use_stdio, stderr_to_stdout,
exit_status, {line, 906}, hide, eof],
Port = erlang:open_port({spawn, Command}, Options),
try
collect_output(Port, [])
after
erlang:port_close(Port)
end.
collect_output(Port, Lines) ->
receive
{Port, {data, {eol, Line}}} ->
collect_output(Port, [Line ++ "\n" | Lines]);
{Port, {data, {noeol, Line}}} ->
collect_output(Port, [Line | Lines]);
{Port, eof} ->
Result = lists:flatten(lists:reverse(Lines)),
receive
{Port, {exit_status, 0}} ->
{ok, Result};
{Port, {exit_status, ExitCode}} ->
{error, {ExitCode, Result}}
end
end.

View File

@ -28,7 +28,6 @@
post_boot() -> post_boot() ->
ok = ensure_apps_started(), ok = ensure_apps_started(),
_ = emqx_plugins:load(),
ok = print_vsn(), ok = print_vsn(),
ok = start_autocluster(), ok = start_autocluster(),
ignore. ignore.
@ -79,7 +78,7 @@ start_one_app(App) ->
end. end.
%% list of app names which should be rebooted when: %% list of app names which should be rebooted when:
%% 1. due to static static config change %% 1. due to static config change
%% 2. after join a cluster %% 2. after join a cluster
reboot_apps() -> reboot_apps() ->
[ gproc [ gproc
@ -104,6 +103,7 @@ reboot_apps() ->
, emqx_exhook , emqx_exhook
, emqx_authn , emqx_authn
, emqx_authz , emqx_authz
, emqx_plugins
]. ].
sorted_reboot_apps() -> sorted_reboot_apps() ->

View File

@ -79,14 +79,6 @@
, do_unsubscribe/2 , do_unsubscribe/2
]). ]).
%% Plugins
-export([ list_plugins/0
, list_plugins/1
, load_plugin/2
, unload_plugin/2
, reload_plugin/2
]).
%% Listeners %% Listeners
-export([ list_listeners/0 -export([ list_listeners/0
, list_listeners/1 , list_listeners/1
@ -457,33 +449,6 @@ do_unsubscribe(ClientId, Topic) ->
Pid ! {unsubscribe, [emqx_topic:parse(Topic)]} Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
end. end.
%%--------------------------------------------------------------------
%% Plugins
%%--------------------------------------------------------------------
list_plugins() ->
[{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()].
list_plugins(Node) when Node =:= node() ->
emqx_plugins:list();
list_plugins(Node) ->
rpc_call(Node, list_plugins, [Node]).
load_plugin(Node, Plugin) when Node =:= node() ->
emqx_plugins:load(Plugin);
load_plugin(Node, Plugin) ->
rpc_call(Node, load_plugin, [Node, Plugin]).
unload_plugin(Node, Plugin) when Node =:= node() ->
emqx_plugins:unload(Plugin);
unload_plugin(Node, Plugin) ->
rpc_call(Node, unload_plugin, [Node, Plugin]).
reload_plugin(Node, Plugin) when Node =:= node() ->
emqx_plugins:reload(Plugin);
reload_plugin(Node, Plugin) ->
rpc_call(Node, reload_plugin, [Node, Plugin]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Listeners %% Listeners
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -225,47 +225,51 @@ if_valid_qos(QoS, Fun) ->
end. end.
plugins(["list"]) -> plugins(["list"]) ->
lists:foreach(fun print/1, emqx_plugins:list()); emqx_plugins_cli:list(fun emqx_ctl:print/2);
plugins(["describe", NameVsn]) ->
plugins(["load", Name]) -> emqx_plugins_cli:describe(NameVsn, fun emqx_ctl:print/2);
case emqx_plugins:load(list_to_atom(Name)) of plugins(["install", NameVsn]) ->
ok -> emqx_plugins_cli:ensure_installed(NameVsn, fun emqx_ctl:print/2);
emqx_ctl:print("Plugin ~ts loaded successfully.~n", [Name]); plugins(["uninstall", NameVsn])->
{error, Reason} -> emqx_plugins_cli:ensure_uninstalled(NameVsn, fun emqx_ctl:print/2);
emqx_ctl:print("Load plugin ~ts error: ~p.~n", [Name, Reason]) plugins(["start", NameVsn]) ->
end; emqx_plugins_cli:ensure_started(NameVsn, fun emqx_ctl:print/2);
plugins(["stop", NameVsn]) ->
plugins(["unload", "emqx_management"])-> emqx_plugins_cli:ensure_stopped(NameVsn, fun emqx_ctl:print/2);
emqx_ctl:print("Plugin emqx_management can not be unloaded.~n"); plugins(["restart", NameVsn]) ->
emqx_plugins_cli:restart(NameVsn, fun emqx_ctl:print/2);
plugins(["unload", Name]) -> plugins(["disable", NameVsn]) ->
case emqx_plugins:unload(list_to_atom(Name)) of emqx_plugins_cli:ensure_disabled(NameVsn, fun emqx_ctl:print/2);
ok -> plugins(["enable", NameVsn]) ->
emqx_ctl:print("Plugin ~ts unloaded successfully.~n", [Name]); emqx_plugins_cli:ensure_enabled(NameVsn, no_move, fun emqx_ctl:print/2);
{error, Reason} -> plugins(["enable", NameVsn, "front"]) ->
emqx_ctl:print("Unload plugin ~ts error: ~p.~n", [Name, Reason]) emqx_plugins_cli:ensure_enabled(NameVsn, front, fun emqx_ctl:print/2);
end; plugins(["enable", NameVsn, "rear"]) ->
emqx_plugins_cli:ensure_enabled(NameVsn, rear, fun emqx_ctl:print/2);
plugins(["reload", Name]) -> plugins(["enable", NameVsn, "before", Other]) ->
try list_to_existing_atom(Name) of emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2);
PluginName ->
case emqx_mgmt:reload_plugin(node(), PluginName) of
ok ->
emqx_ctl:print("Plugin ~ts reloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Reload plugin ~ts error: ~p.~n", [Name, Reason])
end
catch
error:badarg ->
emqx_ctl:print("Reload plugin ~ts error: The plugin doesn't exist.~n", [Name])
end;
plugins(_) -> plugins(_) ->
emqx_ctl:usage([{"plugins list", "Show loaded plugins"}, emqx_ctl:usage(
{"plugins load <Plugin>", "Load plugin"}, [{"plugins <command> [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"},
{"plugins unload <Plugin>", "Unload plugin"}, {"plugins list", "List all installed plugins"},
{"plugins reload <Plugin>", "Reload plugin"} {"plugins describe Name-Vsn", "Describe an installed plugins"},
]). {"plugins install Name-Vsn", "Install a plugin package placed\n"
"in plugin'sinstall_dir"},
{"plugins uninstall Name-Vsn", "Uninstall a plugin. NOTE: it deletes\n"
"all files in install_dir/Name-Vsn"},
{"plugins start Name-Vsn", "Start a plugin"},
{"plugins stop Name-Vsn", "Stop a plugin"},
{"plugins restart Name-Vsn", "Stop then start a plugin"},
{"plugins disable Name-Vsn", "Disable auto-boot"},
{"plugins enable Name-Vsn [Position]",
"Enable auto-boot at Position in the boot list, where Position could be\n"
"'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n"
"The Position parameter can be used to adjust the boot order.\n"
"If no Position is given, an already configured plugin\n"
"will stary at is old position; a newly plugin is appended to the rear\n"
"e.g. plugins disable foo-0.1.0 front\n"
" plugins enable bar-0.2.0 before foo-0.1.0"}
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc vm command %% @doc vm command

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_PLUGINS_HRL).
-define(EMQX_PLUGINS_HRL, true).
-define(CONF_ROOT, plugins).
-endif.

View File

@ -19,13 +19,29 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-export([ load/0 -export([ ensure_installed/1
, load/1 , ensure_uninstalled/1
, unload/0 , ensure_enabled/1
, unload/1 , ensure_enabled/2
, reload/1 , ensure_disabled/1
, delete_package/1
]).
-export([ ensure_started/0
, ensure_started/1
, ensure_stopped/0
, ensure_stopped/1
, restart/1
, list/0 , list/0
, find_plugin/1 , describe/1
]).
-export([ get_config/2
, put_config/2
]).
%% internal
-export([ do_ensure_started/1
]). ]).
-ifdef(TEST). -ifdef(TEST).
@ -33,128 +49,389 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_plugins.hrl").
-type name_vsn() :: binary() | string(). %% "my_plugin-0.1.0"
-type plugin() :: map(). %% the parse result of the JSON info file
-type position() :: no_move | front | rear | {before, name_vsn()}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Load all plugins when the broker started. %% @doc Describe a plugin.
-spec(load() -> ok | ignore | {error, term()}). -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
load() -> describe(NameVsn) -> read_plugin(NameVsn).
ok = load_ext_plugins(emqx:get_config([plugins, install_dir], undefined)).
%% @doc Load a Plugin %% @doc Install a .tar.gz package placed in install_dir.
-spec(load(atom()) -> ok | {error, term()}). -spec ensure_installed(name_vsn()) -> ok | {error, any()}.
load(PluginName) when is_atom(PluginName) -> ensure_installed(NameVsn) ->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of case read_plugin(NameVsn) of
{false, _} -> {ok, _} ->
?SLOG(alert, #{msg => "failed_to_load_plugin", ok;
plugin_name => PluginName, {error, _} ->
reason => not_found}), ok = purge(NameVsn),
{error, not_found}; do_ensure_installed(NameVsn)
{_, true} ->
?SLOG(notice, #{msg => "plugin_already_loaded",
plugin_name => PluginName,
reason => already_loaded}),
{error, already_started};
{_, false} ->
load_plugin(PluginName)
end. end.
%% @doc Unload all plugins before broker stopped. do_ensure_installed(NameVsn) ->
-spec(unload() -> ok). TarGz = pkg_file(NameVsn),
unload() -> case erl_tar:extract(TarGz, [{cwd, install_dir()}, compressed]) of
stop_plugins(list()). ok ->
case read_plugin(NameVsn) of
%% @doc UnLoad a Plugin {ok, _} -> ok;
-spec(unload(atom()) -> ok | {error, term()}). {error, Reason} ->
unload(PluginName) when is_atom(PluginName) -> ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of _ = ensure_uninstalled(NameVsn),
{false, _} -> {error, Reason}
?SLOG(error, #{msg => "fialed_to_unload_plugin", end;
plugin_name => PluginName, {error, {_, enoent}} ->
reason => not_found}), {error, #{ reason => "failed_to_extract_plugin_package"
{error, not_found}; , path => TarGz
{_, false} -> , return => not_found
?SLOG(error, #{msg => "failed_to_unload_plugin", }};
plugin_name => PluginName, {error, Reason} ->
reason => not_loaded}), {error, #{ reason => "bad_plugin_package"
{error, not_started}; , path => TarGz
{_, _} -> , return => Reason
unload_plugin(PluginName) }}
end. end.
reload(PluginName) when is_atom(PluginName)-> %% @doc Ensure files and directories for the given plugin are delete.
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of %% If a plugin is running, or enabled, error is returned.
{false, _} -> -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
?SLOG(error, #{msg => "failed_to_reload_plugin", ensure_uninstalled(NameVsn) ->
plugin_name => PluginName, case read_plugin(NameVsn) of
reason => not_found}), {ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
{error, not_found}; {error, #{reason => "bad_plugin_running_status",
{_, false} -> hint => "stop_the_plugin_first"
load(PluginName); }};
{_, true} -> {ok, #{config_status := enabled}} ->
case unload(PluginName) of {error, #{reason => "bad_plugin_config_status",
ok -> load(PluginName); hint => "disable_the_plugin_first"
{error, Reason} -> {error, Reason} }};
_ ->
purge(NameVsn)
end.
%% @doc Ensure a plugin is enabled to the end of the plugins list.
-spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
ensure_enabled(NameVsn) ->
ensure_enabled(NameVsn, no_move).
%% @doc Ensure a plugin is enabled at the given position of the plugin list.
-spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
ensure_enabled(NameVsn, Position) ->
ensure_state(NameVsn, Position, true).
%% @doc Ensure a plugin is disabled.
-spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
ensure_disabled(NameVsn) ->
ensure_state(NameVsn, no_move, false).
ensure_state(NameVsn, Position, State) when is_binary(NameVsn) ->
ensure_state(binary_to_list(NameVsn), Position, State);
ensure_state(NameVsn, Position, State) ->
case read_plugin(NameVsn) of
{ok, _} ->
Item = #{ name_vsn => NameVsn
, enable => State
},
tryit("ensure_state", fun() -> ensure_configured(Item, Position) end);
{error, Reason} ->
{error, Reason}
end.
ensure_configured(#{name_vsn := NameVsn} = Item, Position) ->
Configured = configured(),
SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
{Front, Rear} = lists:splitwith(SplitFun, Configured),
NewConfigured =
case Rear of
[_ | More] when Position =:= no_move ->
Front ++ [Item | More];
[_ | More] ->
add_new_configured(Front ++ More, Position, Item);
[] ->
add_new_configured(Configured, Position, Item)
end,
ok = put_configured(NewConfigured).
add_new_configured(Configured, no_move, Item) ->
%% default to rear
add_new_configured(Configured, rear, Item);
add_new_configured(Configured, front, Item) ->
[Item | Configured];
add_new_configured(Configured, rear, Item) ->
Configured ++ [Item];
add_new_configured(Configured, {before, NameVsn}, Item) ->
SplitFun = fun(#{name_vsn := Nv}) -> bin(Nv) =/= bin(NameVsn) end,
{Front, Rear} = lists:splitwith(SplitFun, Configured),
Rear =:= [] andalso
throw(#{error => "position_anchor_plugin_not_configured",
hint => "maybe_install_and_configure",
name_vsn => NameVsn
}),
Front ++ [Item | Rear].
%% @doc Delete the package file.
-spec delete_package(name_vsn()) -> ok.
delete_package(NameVsn) ->
File = pkg_file(NameVsn),
case file:delete(File) of
ok ->
?SLOG(info, #{msg => "purged_plugin_dir", path => File}),
ok;
{error, enoent} ->
ok;
{error, Reason} ->
?SLOG(error, #{msg => "failed_to_delete_package_file",
path => File,
reason => Reason}),
{error, Reason}
end.
%% @doc Delete extracted dir
%% In case one lib is shared by multiple plugins.
%% it might be the case that purging one plugin's install dir
%% will cause deletion of loaded beams.
%% It should not be a problem, because shared lib should
%% reside in all the plugin install dirs.
-spec purge(name_vsn()) -> ok.
purge(NameVsn) ->
Dir = dir(NameVsn),
case file:del_dir_r(Dir) of
ok ->
?SLOG(info, #{msg => "purged_plugin_dir", dir => Dir});
{error, enoent} ->
ok;
{error, Reason} ->
?SLOG(error, #{msg => "failed_to_purge_plugin_dir",
dir => Dir,
reason => Reason}),
{error, Reason}
end.
%% @doc Start all configured plugins are started.
-spec ensure_started() -> ok.
ensure_started() ->
ok = for_plugins(fun ?MODULE:do_ensure_started/1).
%% @doc Start a plugin from Management API or CLI.
%% the input is a <name>-<vsn> string.
-spec ensure_started(name_vsn()) -> ok | {error, term()}.
ensure_started(NameVsn) ->
case do_ensure_started(NameVsn) of
ok -> ok;
{error, Reason} ->
?SLOG(alert, #{msg => "failed_to_start_plugin",
reason => Reason}),
{error, Reason}
end.
%% @doc Stop all plugins before broker stops.
-spec ensure_stopped() -> ok.
ensure_stopped() ->
for_plugins(fun ?MODULE:ensure_stopped/1).
%% @doc Stop a plugin from Management API or CLI.
-spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
ensure_stopped(NameVsn) ->
tryit("stop_plugin",
fun() ->
Plugin = do_read_plugin(NameVsn),
ensure_apps_stopped(Plugin)
end).
%% @doc Stop and then start the plugin.
restart(NameVsn) ->
case ensure_stopped(NameVsn) of
ok -> ensure_started(NameVsn);
{error, Reason} -> {error, Reason}
end.
%% @doc List all installed plugins.
%% Including the ones that are installed, but not enabled in config.
-spec list() -> [plugin()].
list() ->
Pattern = filename:join([install_dir(), "*", "release.json"]),
All = lists:filtermap(
fun(JsonFile) ->
case read_plugin({file, JsonFile}) of
{ok, Info} ->
{true, Info};
{error, Reason} ->
?SLOG(warning, Reason),
false
end
end, filelib:wildcard(Pattern)),
list(configured(), All).
%% Make sure configured ones are ordered in front.
list([], All) -> All;
list([#{name_vsn := NameVsn} | Rest], All) ->
SplitF = fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
bin([Name, "-", Vsn]) =/= bin(NameVsn)
end,
case lists:splitwith(SplitF, All) of
{_, []} ->
?SLOG(warning, #{msg => "configured_plugin_not_installed",
name_vsn => NameVsn
}),
list(Rest, All);
{Front, [I | Rear]} ->
[I | list(Rest, Front ++ Rear)]
end.
do_ensure_started(NameVsn) ->
tryit("start_plugins",
fun() ->
Plugin = do_read_plugin(NameVsn),
ok = load_code_start_apps(NameVsn, Plugin)
end).
%% try the function, catch 'throw' exceptions as normal 'error' return
%% other exceptions with stacktrace returned.
tryit(WhichOp, F) ->
try
F()
catch
throw : Reason ->
%% thrown exceptions are known errors
%% translate to a return value without stacktrace
{error, Reason};
error : Reason : Stacktrace ->
%% unexpected errors, log stacktrace
?SLOG(warning, #{ msg => "plugin_op_failed"
, which_op => WhichOp
, exception => Reason
, stacktrace => Stacktrace
}),
{error, {failed, WhichOp}}
end.
%% read plugin info from the JSON file
%% returns {ok, Info} or {error, Reason}
read_plugin(NameVsn) ->
tryit("read_plugin_info",
fun() -> {ok, do_read_plugin(NameVsn)} end).
do_read_plugin({file, InfoFile}) ->
[_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
case hocon:load(InfoFile, #{format => richmap}) of
{ok, RichMap} ->
Info = check_plugin(hocon_util:richmap_to_map(RichMap), NameVsn, InfoFile),
maps:merge(Info, plugin_status(NameVsn));
{error, Reason} ->
throw(#{error => "bad_info_file",
path => InfoFile,
return => Reason
})
end;
do_read_plugin(NameVsn) ->
do_read_plugin({file, info_file(NameVsn)}).
plugin_status(NameVsn) ->
{AppName, _AppVsn} = parse_name_vsn(NameVsn),
RunningSt =
case application:get_key(AppName, vsn) of
{ok, _} ->
case lists:keyfind(AppName, 1, running_apps()) of
{AppName, _} -> running;
_ -> loaded
end;
undefined ->
stopped
end,
Configured = lists:filtermap(
fun(#{name_vsn := Nv, enable := St}) ->
case bin(Nv) =:= bin(NameVsn) of
true -> {true, St};
false -> false
end
end, configured()),
ConfSt = case Configured of
[] -> not_configured;
[true] -> enabled;
[false] -> disabled
end,
#{ running_status => RunningSt
, config_status => ConfSt
}.
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
bin(B) when is_binary(B) -> B.
check_plugin(#{ <<"name">> := Name
, <<"rel_vsn">> := Vsn
, <<"rel_apps">> := Apps
, <<"description">> := _
} = Info, NameVsn, File) ->
case bin(NameVsn) =:= bin([Name, "-", Vsn]) of
true ->
try
[_ | _ ] = Apps, %% assert
%% validate if the list is all <app>-<vsn> strings
lists:foreach(fun parse_name_vsn/1, Apps)
catch
_ : _ ->
throw(#{ error => "bad_rel_apps"
, rel_apps => Apps
, hint => "A non-empty string list of app_name-app_vsn format"
})
end,
Info;
false ->
throw(#{ error => "name_vsn_mismatch"
, name_vsn => NameVsn
, path => File
, name => Name
, rel_vsn => Vsn
})
end;
check_plugin(_What, NameVsn, File) ->
throw(#{ error => "bad_info_file_content"
, mandatory_fields => [rel_vsn, name, rel_apps, description]
, name_vsn => NameVsn
, path => File
}).
load_code_start_apps(RelNameVsn, #{<<"rel_apps">> := Apps}) ->
LibDir = filename:join([install_dir(), RelNameVsn]),
RunningApps = running_apps(),
%% load plugin apps and beam code
AppNames =
lists:map(fun(AppNameVsn) ->
{AppName, AppVsn} = parse_name_vsn(AppNameVsn),
EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
ok = load_plugin_app(AppName, AppVsn, EbinDir, RunningApps),
AppName
end, Apps),
lists:foreach(fun start_app/1, AppNames).
load_plugin_app(AppName, AppVsn, Ebin, RunningApps) ->
case lists:keyfind(AppName, 1, RunningApps) of
false -> do_load_plugin_app(AppName, Ebin);
{_, Vsn} ->
case bin(Vsn) =:= bin(AppVsn) of
true ->
%% already started on the exact versio
ok;
false ->
%% running but a different version
?SLOG(warning, #{msg => "plugin_app_already_running", name => AppName,
running_vsn => Vsn,
loading_vsn => AppVsn
})
end end
end. end.
%% @doc List all available plugins do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
-spec(list() -> [emqx_types:plugin()]). do_load_plugin_app(AppName, binary_to_list(Ebin));
list() -> do_load_plugin_app(AppName, Ebin) ->
StartedApps = names(started_app),
lists:map(fun({Name, _, _}) ->
Plugin = plugin(Name),
case lists:member(Name, StartedApps) of
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
find_plugin(Name) ->
find_plugin(Name, list()).
find_plugin(Name, Plugins) ->
lists:keyfind(Name, 2, Plugins).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% load external plugins which are placed in etc/plugins dir
load_ext_plugins(undefined) -> ok;
load_ext_plugins(Dir) ->
lists:foreach(
fun(Plugin) ->
PluginDir = filename:join(Dir, Plugin),
case filelib:is_dir(PluginDir) of
true -> load_ext_plugin(PluginDir);
false -> ok
end
end, filelib:wildcard("*", Dir)).
load_ext_plugin(PluginDir) ->
?SLOG(debug, #{msg => "loading_extra_plugin", plugin_dir => PluginDir}),
Ebin = filename:join([PluginDir, "ebin"]),
AppFile = filename:join([Ebin, "*.app"]),
AppName = case filelib:wildcard(AppFile) of
[App] ->
list_to_atom(filename:basename(App, ".app"));
[] ->
?SLOG(alert, #{msg => "plugin_app_file_not_found", app_file => AppFile}),
error({plugin_app_file_not_found, AppFile})
end,
ok = load_plugin_app(AppName, Ebin).
% try
% ok = generate_configs(AppName, PluginDir)
% catch
% throw : {conf_file_not_found, ConfFile} ->
% %% this is maybe a dependency of an external plugin
% ?LOG(debug, "config_load_error_ignored for app=~p, path=~ts", [AppName, ConfFile]),
% ok
% end.
load_plugin_app(AppName, Ebin) ->
_ = code:add_patha(Ebin), _ = code:add_patha(Ebin),
Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])), Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
lists:foreach( lists:foreach(
@ -162,103 +439,160 @@ load_plugin_app(AppName, Ebin) ->
Module = list_to_atom(filename:basename(BeamFile, ".beam")), Module = list_to_atom(filename:basename(BeamFile, ".beam")),
case code:load_file(Module) of case code:load_file(Module) of
{module, _} -> ok; {module, _} -> ok;
{error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason}) {error, Reason} -> throw(#{error => "failed_to_load_plugin_beam",
path => BeamFile,
reason => Reason
})
end end
end, Modules), end, Modules),
case application:load(AppName) of case application:load(AppName) of
ok -> ok; ok -> ok;
{error, {already_loaded, _}} -> ok {error, {already_loaded, _}} -> ok;
end. {error, Reason} -> throw(#{error => "failed_to_load_plugin_app",
name => AppName,
%% Stop plugins reason => Reason})
stop_plugins(Plugins) ->
_ = [stop_app(Plugin#plugin.name) || Plugin <- Plugins],
ok.
plugin(AppName) ->
case application:get_all_key(AppName) of
{ok, Attrs} ->
Descr = proplists:get_value(description, Attrs, ""),
#plugin{name = AppName, descr = Descr};
undefined -> error({plugin_not_found, AppName})
end.
load_plugin(Name) ->
try
case load_app(Name) of
ok ->
start_app(Name);
{error, Error0} ->
{error, Error0}
end
catch Error : Reason : Stacktrace ->
?SLOG(alert, #{
msg => "plugin_load_failed",
name => Name,
exception => Error,
reason => Reason,
stacktrace => Stacktrace
}),
{error, parse_config_file_failed}
end.
load_app(App) ->
case application:load(App) of
ok ->
ok;
{error, {already_loaded, App}} ->
ok;
{error, Error} ->
{error, Error}
end. end.
start_app(App) -> start_app(App) ->
case application:ensure_all_started(App) of case application:ensure_all_started(App) of
{ok, Started} -> {ok, Started} ->
case Started =/= [] of case Started =/= [] of
true -> ?SLOG(info, #{msg => "started_plugin_dependency_apps", apps => Started}); true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
false -> ok false -> ok
end, end,
?SLOG(info, #{msg => "started_plugin_app", app => App}), ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
ok; ok;
{error, {ErrApp, Reason}} -> {error, {ErrApp, Reason}} ->
?SLOG(error, #{msg => failed_to_start_plugin_app, throw(#{error => "failed_to_start_plugin_app",
app => App, app => App,
err_app => ErrApp, err_app => ErrApp,
reason => Reason reason => Reason
}), })
{error, failed_to_start_plugin_app}
end. end.
unload_plugin(App) -> %% Stop all apps installed by the plugin package,
case stop_app(App) of %% but not the ones shared with others.
ok -> ensure_apps_stopped(#{<<"rel_apps">> := Apps}) ->
%% load plugin apps and beam code
AppsToStop =
lists:map(fun(NameVsn) ->
{AppName, _AppVsn} = parse_name_vsn(NameVsn),
AppName
end, Apps),
case tryit("stop_apps", fun() -> stop_apps(AppsToStop) end) of
{ok, []} ->
%% all apps stopped
ok;
{ok, Left} ->
?SLOG(warning, #{msg => "unabled_to_stop_plugin_apps",
apps => Left
}),
ok; ok;
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end. end.
stop_apps(Apps) ->
RunningApps = running_apps(),
case do_stop_apps(Apps, [], RunningApps) of
{ok, []} -> {ok, []}; %% all stopped
{ok, Remain} when Remain =:= Apps -> {ok, Apps}; %% no progress
{ok, Remain} -> stop_apps(Remain) %% try again
end.
do_stop_apps([], Remain, _AllApps) ->
{ok, lists:reverse(Remain)};
do_stop_apps([App | Apps], Remain, RunningApps) ->
case is_needed_by_any(App, RunningApps) of
true ->
do_stop_apps(Apps, [App | Remain], RunningApps);
false ->
ok = stop_app(App),
do_stop_apps(Apps, Remain, RunningApps)
end.
stop_app(App) -> stop_app(App) ->
case application:stop(App) of case application:stop(App) of
ok -> ok ->
?SLOG(info, #{msg => "stop_plugin_successfully", app => App}), ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
ok; ok = unload_moudle_and_app(App);
{error, {not_started, App}} -> {error, {not_started, App}} ->
?SLOG(info, #{msg => "plugin_not_started", app => App}), ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
ok; ok = unload_moudle_and_app(App);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "failed_to_stop_plugin_app", throw(#{error => "failed_to_stop_app", app => App, reason => Reason})
app => App,
error => Reason
}),
{error, Reason}
end. end.
names(plugin) -> unload_moudle_and_app(App) ->
names(list()); case application:get_key(App, modules) of
{ok, Modules} -> lists:foreach(fun code:soft_purge/1, Modules);
_ -> ok
end,
_ = application:unload(App),
ok.
names(started_app) -> is_needed_by_any(AppToStop, RunningApps) ->
[Name || {Name, _Descr, _Ver} <- application:which_applications()]; lists:any(fun({RunningApp, _RunningAppVsn}) ->
is_needed_by(AppToStop, RunningApp)
end, RunningApps).
names(Plugins) -> is_needed_by(AppToStop, AppToStop) -> false;
[Name || #plugin{name = Name} <- Plugins]. is_needed_by(AppToStop, RunningApp) ->
case application:get_key(RunningApp, applications) of
{ok, Deps} -> lists:member(AppToStop, Deps);
undefined -> false
end.
put_config(Key, Value) when is_atom(Key) ->
put_config([Key], Value);
put_config(Path, Value) when is_list(Path) ->
emqx_config:put([?CONF_ROOT | Path], Value).
get_config(Key, Default) when is_atom(Key) ->
get_config([Key], Default);
get_config(Path, Default) ->
emqx:get_config([?CONF_ROOT | Path], Default).
install_dir() -> get_config(install_dir, "").
put_configured(Configured) ->
ok = put_config(states, Configured).
configured() ->
get_config(states, []).
for_plugins(ActionFun) ->
case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
[] -> ok;
Errors -> erlang:error(#{function => ActionFun, errors => Errors})
end.
for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
case Fun(NameVsn) of
ok -> [];
{error, Reason} -> [{NameVsn, Reason}]
end;
for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
?SLOG(debug, #{msg => "plugin_disabled",
name_vsn => NameVsn}),
[].
parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
parse_name_vsn(binary_to_list(NameVsn));
parse_name_vsn(NameVsn) when is_list(NameVsn) ->
{AppName, [$- | Vsn]} = lists:splitwith(fun(X) -> X =/= $- end, NameVsn),
{list_to_atom(AppName), Vsn}.
pkg_file(NameVsn) ->
filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]).
dir(NameVsn) ->
filename:join([install_dir(), NameVsn]).
info_file(NameVsn) ->
filename:join([dir(NameVsn), "release.json"]).
running_apps() ->
lists:map(fun({N, _, V}) ->
{N, V}
end, application:which_applications(infinity)).

View File

@ -24,6 +24,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_plugins_sup:start_link(), {ok, Sup} = emqx_plugins_sup:start_link(),
ok = emqx_plugins:ensure_started(), %% load all pre-configured
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->

View File

@ -0,0 +1,88 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugins_cli).
-export([ list/1
, describe/2
, ensure_installed/2
, ensure_uninstalled/2
, ensure_started/2
, ensure_stopped/2
, restart/2
, ensure_disabled/2
, ensure_enabled/3
]).
-include_lib("emqx/include/logger.hrl").
-define(PRINT(EXPR, LOG_FUN),
print(NameVsn, fun()-> EXPR end(), LOG_FUN, ?FUNCTION_NAME)).
list(LogFun) ->
LogFun("~ts~n", [to_json(emqx_plugins:list())]).
describe(NameVsn, LogFun) ->
case emqx_plugins:describe(NameVsn) of
{ok, Plugin} ->
LogFun("~ts~n", [to_json(Plugin)]);
{error, Reason} ->
%% this should not happend unless the package is manually installed
%% corrupted packages installed from emqx_plugins:ensure_installed
%% should not leave behind corrupted files
?SLOG(error, #{msg => "failed_to_describe_plugin",
name_vsn => NameVsn,
cause => Reason}),
%% do nothing to the CLI console
ok
end.
ensure_installed(NameVsn, LogFun) ->
?PRINT(emqx_plugins:ensure_installed(NameVsn), LogFun).
ensure_uninstalled(NameVsn, LogFun) ->
?PRINT(emqx_plugins:ensure_uninstalled(NameVsn), LogFun).
ensure_started(NameVsn, LogFun) ->
?PRINT(emqx_plugins:ensure_started(NameVsn), LogFun).
ensure_stopped(NameVsn, LogFun) ->
?PRINT(emqx_plugins:ensure_stopped(NameVsn), LogFun).
restart(NameVsn, LogFun) ->
?PRINT(emqx_plugins:restart(NameVsn), LogFun).
ensure_enabled(NameVsn, Position, LogFun) ->
?PRINT(emqx_plugins:ensure_enabled(NameVsn, Position), LogFun).
ensure_disabled(NameVsn, LogFun) ->
?PRINT(emqx_plugins:ensure_disabled(NameVsn), LogFun).
to_json(Input) ->
emqx_logger_jsonfmt:best_effort_json(Input).
print(NameVsn, Res, LogFun, Action) ->
Obj = #{action => Action,
name_vsn => NameVsn},
JsonReady =
case Res of
ok ->
Obj#{result => ok};
{error, Reason} ->
Obj#{result => not_ok,
cause => Reason}
end,
LogFun("~ts~n", [to_json(JsonReady)]).

View File

@ -20,14 +20,18 @@
-export([ roots/0 -export([ roots/0
, fields/1 , fields/1
, namespace/0
]). ]).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include("emqx_plugins.hrl").
roots() -> ["plugins"]. namespace() -> "plugin".
fields("plugins") -> roots() -> [?CONF_ROOT].
#{fields => fields(),
fields(?CONF_ROOT) ->
#{fields => root_fields(),
desc => """ desc => """
Manage EMQ X plugins. Manage EMQ X plugins.
<br> <br>
@ -37,44 +41,39 @@ or installed as a standalone package in a location specified by
<br> <br>
The standalone-installed plugins are referred to as 'external' plugins. The standalone-installed plugins are referred to as 'external' plugins.
""" """
};
fields(state) ->
#{ fields => state_fields(),
desc => "A per-plugin config to describe the desired state of the plugin."
}. }.
fields() -> state_fields() ->
[ {prebuilt, fun prebuilt/1} [ {name_vsn,
, {external, fun external/1} hoconsc:mk(string(),
#{ desc => "The {name}-{version} of the plugin.<br>"
"It should match the plugin application name-vsn as the "
"for the plugin release package name<br>"
"For example: my_plugin-0.1.0."
, nullable => false
})}
, {enable,
hoconsc:mk(boolean(),
#{ desc => "Set to 'true' to enable this plugin"
, nullable => false
})}
].
root_fields() ->
[ {states, fun states/1}
, {install_dir, fun install_dir/1} , {install_dir, fun install_dir/1}
]. ].
prebuilt(type) -> hoconsc:map("name", boolean()); states(type) -> hoconsc:array(hoconsc:ref(state));
prebuilt(nullable) -> true; states(nullable) -> true;
prebuilt(T) when T=/= desc -> undefined; states(default) -> [];
prebuilt(desc) -> """ states(desc) -> "An array of plugins in the desired states.<br>"
A map() from plugin name to a boolean (true | false) flag to indicate "The plugins are started in the defined order";
whether or not to enable the prebuilt plugin. states(_) -> undefined.
<br>
Most of the prebuilt plugins from 4.x are converted into features since 5.0.
""" ++ prebuilt_plugins() ++
"""
<br>
Enabled plugins are loaded (started) as a part of EMQ X node's boot sequence.
Plugins can be loaded on the fly, and enabled from dashbaord UI and/or CLI.
<br>
Example config: <code>{emqx_foo_bar: true, emqx_bazz: false}</code>
""".
external(type) -> hoconsc:map("name", string());
external(nullable) -> true;
external(T) when T =/= desc -> undefined;
external(desc) ->
"""
A map from plugin name to a version number string for enabled ones.
To disable an external plugin, set the value to 'false'.
<br>
Enabled plugins are loaded (started) as a part of EMQ X node's boot sequence.
Plugins can be loaded on the fly, and enabled from dashbaord UI and/or CLI.
<br>
Example config: <code>{emqx_extplug1: \"0.1.0\", emqx_extplug2: false}</code>
""".
install_dir(type) -> string(); install_dir(type) -> string();
install_dir(nullable) -> true; install_dir(nullable) -> true;
@ -88,12 +87,3 @@ the sub-directory named as <code>emqx_foo_bar-0.1.0</code>.
NOTE: For security reasons, this directory should **NOT** be writable NOTE: For security reasons, this directory should **NOT** be writable
by anyone expect for <code>emqx</code> (or any user which runs EMQ X) by anyone expect for <code>emqx</code> (or any user which runs EMQ X)
""". """.
%% TODO: when we have some prebuilt plugins, change this function to:
%% """
%% The names should be one of
%% - name1
%% - name2
%% """
prebuilt_plugins() ->
"So far, we do not have any prebuilt plugins".

View File

@ -22,92 +22,254 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0-rc.1").
-define(PACKAGE_SUFFIX, ".tar.gz").
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
WorkDir = proplists:get_value(data_dir, Config),
%% Compile extra plugin code OrigInstallDir = emqx_plugins:get_config(install_dir, undefined),
emqx_plugins:put_config(install_dir, WorkDir),
DataPath = proplists:get_value(data_dir, Config),
AppPath = filename:join([DataPath, "emqx_mini_plugin"]),
HoconPath = filename:join([DataPath, "emqx_hocon_plugin"]),
Cmd = lists:flatten(io_lib:format("cd ~ts && make", [AppPath])),
CmdPath = lists:flatten(io_lib:format("cd ~ts && make", [HoconPath])),
ct:pal("Executing ~ts~n", [Cmd]),
ct:pal("~n ~ts~n", [os:cmd(Cmd)]),
ct:pal("Executing ~ts~n", [CmdPath]),
ct:pal("~n ~ts~n", [os:cmd(CmdPath)]),
emqx_common_test_helpers:boot_modules([]),
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([]),
emqx_config:put([plugins, install_dir], DataPath), [{orig_install_dir, OrigInstallDir} | Config].
?assertEqual(ok, emqx_plugins:load()),
Config.
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:stop_apps([]), emqx_common_test_helpers:stop_apps([]),
emqx_config:erase(plugins). emqx_config:erase(plugins),
%% restore config
case proplists:get_value(orig_install_dir, Config) of
undefined -> ok;
OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir)
end.
t_load(_) -> init_per_testcase(TestCase, Config) ->
?assertEqual(ok, emqx_plugins:load()), emqx_plugins:put_configured([]),
?assertEqual(ok, emqx_plugins:unload()), lists:foreach(fun(#{<<"name">> := Name, <<"rel_vsn">> := Vsn}) ->
emqx_plugins:purge(bin([Name, "-", Vsn]))
end, emqx_plugins:list()),
?MODULE:TestCase({init, Config}).
?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)), end_per_testcase(TestCase, Config) ->
?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)), emqx_plugins:put_configured([]),
?assertEqual({error, not_started}, emqx_plugins:unload(emqx_hocon_plugin)), ?MODULE:TestCase({'end', Config}).
emqx_config:erase(plugins). build_demo_plugin_package() ->
WorkDir = emqx_plugins:install_dir(),
BuildSh = filename:join([WorkDir, "build-demo-plugin.sh"]),
case emqx_run_sh:do(BuildSh ++ " " ++ ?EMQX_PLUGIN_TEMPLATE_VSN,
[{cd, WorkDir}]) of
{ok, _} ->
Pkg = filename:join([WorkDir, "emqx_plugin_template-" ++
?EMQX_PLUGIN_TEMPLATE_VSN ++
?PACKAGE_SUFFIX]),
case filelib:is_regular(Pkg) of
true -> Pkg;
false -> error(#{reason => unexpected_build_result, not_found => Pkg})
end;
{error, {Rc, Output}} ->
io:format(user, "failed_to_build_demo_plugin, Exit = ~p, Output:~n~ts\n", [Rc, Output]),
error(failed_to_build_demo_plugin)
end.
t_load_ext_plugin(_) -> bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
?assertError({plugin_app_file_not_found, _}, bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
emqx_plugins:load_ext_plugin("./not_existed_path/")). bin(B) when is_binary(B) -> B.
t_list(_) -> t_demo_install_start_stop_uninstall({init, Config}) ->
?assertMatch([{plugin, _, _, _, _, _, _} | _ ], emqx_plugins:list()). Package = build_demo_plugin_package(),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
[{name_vsn, NameVsn} | Config];
t_demo_install_start_stop_uninstall({'end', _Config}) -> ok;
t_demo_install_start_stop_uninstall(Config) ->
NameVsn = proplists:get_value(name_vsn, Config),
ok = emqx_plugins:ensure_installed(NameVsn),
%% idempotent
ok = emqx_plugins:ensure_installed(NameVsn),
{ok, Info} = emqx_plugins:read_plugin(NameVsn),
?assertEqual([Info], emqx_plugins:list()),
%% start
ok = emqx_plugins:ensure_started(NameVsn),
ok = assert_app_running(emqx_plugin_template, true),
ok = assert_app_running(map_sets, true),
%% start (idempotent)
ok = emqx_plugins:ensure_started(bin(NameVsn)),
ok = assert_app_running(emqx_plugin_template, true),
ok = assert_app_running(map_sets, true),
t_find_plugin(_) -> %% running app can not be un-installed
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)), ?assertMatch({error, _},
?assertMatch({plugin, emqx_hocon_plugin, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_hocon_plugin)). emqx_plugins:ensure_uninstalled(NameVsn)),
t_plugin(_) -> %% stop
ok = emqx_plugins:ensure_stopped(NameVsn),
ok = assert_app_running(emqx_plugin_template, false),
ok = assert_app_running(map_sets, false),
%% stop (idempotent)
ok = emqx_plugins:ensure_stopped(bin(NameVsn)),
ok = assert_app_running(emqx_plugin_template, false),
ok = assert_app_running(map_sets, false),
%% still listed after stopped
?assertMatch([#{<<"name">> := <<"emqx_plugin_template">>,
<<"rel_vsn">> := <<?EMQX_PLUGIN_TEMPLATE_VSN>>
}], emqx_plugins:list()),
ok = emqx_plugins:ensure_uninstalled(NameVsn),
?assertEqual([], emqx_plugins:list()),
ok.
%% help funtion to create a info file.
%% The file is in JSON format when built
%% but since we are using hocon:load to load it
%% ad-hoc test files can be in hocon format
write_info_file(Config, NameVsn, Content) ->
WorkDir = proplists:get_value(data_dir, Config),
InfoFile = filename:join([WorkDir, NameVsn, "release.json"]),
ok = filelib:ensure_dir(InfoFile),
ok = file:write_file(InfoFile, Content).
t_start_restart_and_stop({init, Config}) ->
Package = build_demo_plugin_package(),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
[{name_vsn, NameVsn} | Config];
t_start_restart_and_stop({'end', _Config}) -> ok;
t_start_restart_and_stop(Config) ->
NameVsn = proplists:get_value(name_vsn, Config),
ok = emqx_plugins:ensure_installed(NameVsn),
ok = emqx_plugins:ensure_enabled(NameVsn),
FakeInfo = "name=bar, rel_vsn=\"2\", rel_apps=[\"bar-9\"],"
"description=\"desc bar\"",
Bar2 = <<"bar-2">>,
ok = write_info_file(Config, Bar2, FakeInfo),
%% fake a disabled plugin in config
ok = emqx_plugins:ensure_state(Bar2, front, false),
assert_app_running(emqx_plugin_template, false),
ok = emqx_plugins:ensure_started(),
assert_app_running(emqx_plugin_template, true),
%% fake enable bar-2
ok = emqx_plugins:ensure_state(Bar2, rear, true),
%% should cause an error
?assertError(#{function := _, errors := [_ | _]},
emqx_plugins:ensure_started()),
%% but demo plugin should still be running
assert_app_running(emqx_plugin_template, true),
%% stop all
ok = emqx_plugins:ensure_stopped(),
assert_app_running(emqx_plugin_template, false),
ok = emqx_plugins:ensure_state(Bar2, rear, false),
ok = emqx_plugins:restart(NameVsn),
assert_app_running(emqx_plugin_template, true),
%% repeat
ok = emqx_plugins:restart(NameVsn),
assert_app_running(emqx_plugin_template, true),
ok = emqx_plugins:ensure_stopped(),
ok = emqx_plugins:ensure_disabled(NameVsn),
ok = emqx_plugins:ensure_uninstalled(NameVsn),
ok = emqx_plugins:ensure_uninstalled(Bar2),
?assertEqual([], emqx_plugins:list()),
ok.
t_enable_disable({init, Config}) ->
Package = build_demo_plugin_package(),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
[{name_vsn, NameVsn} | Config];
t_enable_disable({'end', Config}) ->
ok = emqx_plugins:ensure_uninstalled(proplists:get_value(name_vsn, Config));
t_enable_disable(Config) ->
NameVsn = proplists:get_value(name_vsn, Config),
ok = emqx_plugins:ensure_installed(NameVsn),
?assertEqual([], emqx_plugins:configured()),
ok = emqx_plugins:ensure_enabled(NameVsn),
?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
ok = emqx_plugins:ensure_disabled(NameVsn),
?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
ok = emqx_plugins:ensure_enabled(bin(NameVsn)),
?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
?assertMatch({error, #{reason := "bad_plugin_config_status",
hint := "disable_the_plugin_first"
}}, emqx_plugins:ensure_uninstalled(NameVsn)),
ok = emqx_plugins:ensure_disabled(bin(NameVsn)),
ok = emqx_plugins:ensure_uninstalled(NameVsn),
?assertMatch({error, _}, emqx_plugins:ensure_enabled(NameVsn)),
?assertMatch({error, _}, emqx_plugins:ensure_disabled(NameVsn)),
ok.
assert_app_running(Name, true) ->
AllApps = application:which_applications(),
?assertMatch({Name, _, _}, lists:keyfind(Name, 1, AllApps));
assert_app_running(Name, false) ->
AllApps = application:which_applications(),
?assertEqual(false, lists:keyfind(Name, 1, AllApps)).
t_bad_tar_gz({init, Config}) -> Config;
t_bad_tar_gz({'end', _Config}) -> ok;
t_bad_tar_gz(Config) ->
WorkDir = proplists:get_value(data_dir, Config),
FakeTarTz = filename:join([WorkDir, "fake-vsn.tar.gz"]),
ok = file:write_file(FakeTarTz, "a\n"),
?assertMatch({error, #{reason := "bad_plugin_package",
return := eof
}},
emqx_plugins:ensure_installed("fake-vsn")),
?assertMatch({error, #{reason := "failed_to_extract_plugin_package",
return := not_found
}},
emqx_plugins:ensure_installed("nonexisting")),
?assertEqual([], emqx_plugins:list()),
ok = emqx_plugins:delete_package("fake-vsn"),
%% idempotent
ok = emqx_plugins:delete_package("fake-vsn").
%% create a corrupted .tar.gz
%% failed install attempts should not leave behind extracted dir
t_bad_tar_gz2({init, Config}) -> Config;
t_bad_tar_gz2({'end', _Config}) -> ok;
t_bad_tar_gz2(Config) ->
WorkDir = proplists:get_value(data_dir, Config),
NameVsn = "foo-0.2",
%% this an invalid info file content
BadInfo = "name=foo, rel_vsn=\"0.2\", rel_apps=[foo]",
ok = write_info_file(Config, NameVsn, BadInfo),
TarGz = filename:join([WorkDir, NameVsn ++ ".tar.gz"]),
ok = make_tar(WorkDir, NameVsn),
?assert(filelib:is_regular(TarGz)),
%% failed to install, it also cleans up the bad .tar.gz file
?assertMatch({error, _}, emqx_plugins:ensure_installed(NameVsn)),
%% the tar.gz file is still around
?assert(filelib:is_regular(TarGz)),
?assertEqual({error, enoent}, file:read_file_info(emqx_plugins:dir(NameVsn))),
ok = emqx_plugins:delete_package(NameVsn).
t_bad_info_json({init, Config}) -> Config;
t_bad_info_json({'end', _}) -> ok;
t_bad_info_json(Config) ->
NameVsn = "test-2",
ok = write_info_file(Config, NameVsn, "bad-syntax"),
?assertMatch({error, #{error := "bad_info_file",
return := {parse_error, _}
}},
emqx_plugins:read_plugin(NameVsn)),
ok = write_info_file(Config, NameVsn, "{\"bad\": \"obj\"}"),
?assertMatch({error, #{error := "bad_info_file_content",
mandatory_fields := _
}},
emqx_plugins:read_plugin(NameVsn)),
?assertEqual([], emqx_plugins:list()),
emqx_plugins:purge(NameVsn),
ok.
make_tar(Cwd, NameWithVsn) ->
{ok, OriginalCwd} = file:get_cwd(),
ok = file:set_cwd(Cwd),
try try
emqx_plugins:plugin(not_existed_plugin) Files = filelib:wildcard(NameWithVsn ++ "/**"),
catch TarFile = NameWithVsn ++ ".tar.gz",
_Error:Reason:_Stacktrace -> ok = erl_tar:create(TarFile, Files, [compressed])
?assertEqual({plugin_not_found,not_existed_plugin}, Reason) after
end, file:set_cwd(OriginalCwd)
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin)), end.
?assertMatch({plugin, emqx_hocon_plugin, _, _, _, _, _}, emqx_plugins:plugin(emqx_hocon_plugin)).
t_load_plugin(_) ->
ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
ok = meck:expect(application, load, fun(already_loaded_app) -> {error, {already_loaded, already_loaded_app}};
(error_app) -> {error, error};
(_) -> ok end),
ok = meck:expect(application, ensure_all_started, fun(already_loaded_app) -> {error, {already_loaded_app, already_loaded}};
(error_app) -> {error, error};
(App) -> {ok, App} end),
ok = meck:new(emqx_plugins, [unstick, non_strict, passthrough, no_history]),
ok = meck:expect(emqx_plugins, generate_configs, fun(_) -> ok end),
ok = meck:expect(emqx_plugins, apply_configs, fun(_) -> ok end),
?assertMatch({error, _}, emqx_plugins:load_plugin(already_loaded_app)),
?assertMatch(ok, emqx_plugins:load_plugin(normal)),
?assertMatch({error,_}, emqx_plugins:load_plugin(error_app)),
ok = meck:unload(emqx_plugins),
ok = meck:unload(application).
t_unload_plugin(_) ->
ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
ok = meck:expect(application, stop, fun(not_started_app) -> {error, {not_started, not_started_app}};
(error_app) -> {error, error};
(_) -> ok end),
?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app)),
?assertEqual(ok, emqx_plugins:unload_plugin(normal)),
?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app)),
ok = meck:unload(application).

View File

@ -0,0 +1,20 @@
#!/bin/bash
set -euo pipefail
vsn="${1}"
workdir="demo_src"
target_name="emqx_plugin_template-${vsn}.tar.gz"
target="$workdir/_build/default/emqx_plugrel/${target_name}"
if [ -f "${target}" ]; then
cp "$target" ./
exit 0
fi
# cleanup
rm -rf "${workdir}"
git clone https://github.com/emqx/emqx-plugin-template.git -b "${vsn}" ${workdir}
make -C "$workdir" rel
cp "$target" ./

View File

@ -1,26 +0,0 @@
## shallow clone for speed
REBAR_GIT_CLONE_OPTIONS += --depth 1
export REBAR_GIT_CLONE_OPTIONS
REBAR = rebar3
all: compile
compile:
$(REBAR) compile
cp -r _build/default/lib/emqx_hocon_plugin/ebin ./
clean: distclean
ct: compile
$(REBAR) as test ct -v
eunit: compile
$(REBAR) as test eunit
xref:
$(REBAR) xref
distclean:
@rm -rf _build
@rm -f ebin/ data/app.*.config data/vm.*.args rebar.lock

View File

@ -1,3 +0,0 @@
emqx_hocon_plugin {
name = test
}

View File

@ -1,23 +0,0 @@
{deps, [{hocon, {git, "https://github.com/emqx/hocon", {tag, "0.6.0"}}}]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test, [
{deps, [
]}
]}
]}.

View File

@ -1,16 +0,0 @@
%% -*- mode: erlang -*-
{application, emqx_hocon_plugin,
[{description, "An EMQ X plugin for hocon testcase"},
{vsn, "0.1"},
{modules, []},
{registered, []},
{mod, {emqx_hocon_plugin_app, []}},
{applications,
[kernel,
stdlib,
typerefl
]},
{env,[]},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -1,42 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc emqx_mini_plugin public API
%% @end
%%%-------------------------------------------------------------------
-module(emqx_hocon_plugin_app).
-behaviour(application).
-behaviour(supervisor).
-emqx_plugin(?MODULE).
%% Application APIs
-export([ start/2
, stop/1
]).
%% Supervisor callback
-export([init/1]).
%% -- Application
start(_StartType, _StartArgs) ->
{ok, Sup} = start_link(),
{ok, Sup}.
stop(_State) ->
ok.
%% --- Supervisor
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -1,15 +0,0 @@
-module(emqx_hocon_plugin_schema).
-include_lib("typerefl/include/types.hrl").
-export([roots/0, fields/1]).
-behaviour(hocon_schema).
roots() -> ["emqx_hocon_plugin"].
fields("emqx_hocon_plugin") ->
[{name, fun name/1}].
name(type) -> binary();
name(_) -> undefined.

View File

@ -1,26 +0,0 @@
## shallow clone for speed
REBAR_GIT_CLONE_OPTIONS += --depth 1
export REBAR_GIT_CLONE_OPTIONS
REBAR = rebar3
all: compile
compile:
$(REBAR) compile
cp -r _build/default/lib/emqx_mini_plugin/ebin ./
clean: distclean
ct: compile
$(REBAR) as test ct -v
eunit: compile
$(REBAR) as test eunit
xref:
$(REBAR) xref
distclean:
@rm -rf _build
@rm -f ebin/ data/app.*.config data/vm.*.args rebar.lock

View File

@ -1,5 +0,0 @@
%%-*- mode: erlang -*-
{mapping, "mini.name", "emqx_mini_plugin.name", [
{datatype, string}
]}.

View File

@ -1,23 +0,0 @@
{deps, []}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test, [
{deps, [
]}
]}
]}.

View File

@ -1,15 +0,0 @@
%% -*- mode: erlang -*-
{application, emqx_mini_plugin,
[{description, "An EMQ X plugin for testcase"},
{vsn, "0.1"},
{modules, []},
{registered, []},
{mod, {emqx_mini_plugin_app, []}},
{applications,
[kernel,
stdlib
]},
{env,[]},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -1,42 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc emqx_mini_plugin public API
%% @end
%%%-------------------------------------------------------------------
-module(emqx_mini_plugin_app).
-behaviour(application).
-behaviour(supervisor).
-emqx_plugin(?MODULE).
%% Application APIs
-export([ start/2
, stop/1
]).
%% Supervisor callback
-export([init/1]).
%% -- Application
start(_StartType, _StartArgs) ->
{ok, Sup} = start_link(),
{ok, Sup}.
stop(_State) ->
ok.
%% --- Supervisor
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -0,0 +1,103 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugins_tests).
-include_lib("eunit/include/eunit.hrl").
ensure_configured_test() ->
try test_ensure_configured()
after emqx_plugins:put_configured([])
end.
test_ensure_configured() ->
ok = emqx_plugins:put_configured([]),
P1 =#{name_vsn => "p-1", enable => true},
P2 =#{name_vsn => "p-2", enable => true},
P3 =#{name_vsn => "p-3", enable => false},
emqx_plugins:ensure_configured(P1, front),
emqx_plugins:ensure_configured(P2, {before, <<"p-1">>}),
emqx_plugins:ensure_configured(P3, {before, <<"p-1">>}),
?assertEqual([P2, P3, P1], emqx_plugins:configured()),
?assertThrow(#{error := "position_anchor_plugin_not_configured"},
emqx_plugins:ensure_configured(P3, {before, <<"unknown-x">>})).
read_plugin_test() ->
with_rand_install_dir(
fun(_Dir) ->
NameVsn = "bar-5",
InfoFile = emqx_plugins:info_file(NameVsn),
FakeInfo = "name=bar, rel_vsn=\"5\", rel_apps=[justname_no_vsn],"
"description=\"desc bar\"",
try
ok = write_file(InfoFile, FakeInfo),
?assertMatch({error, #{error := "bad_rel_apps"}},
emqx_plugins:read_plugin(NameVsn))
after
emqx_plugins:purge(NameVsn)
end
end).
with_rand_install_dir(F) ->
N = rand:uniform(10000000),
TmpDir = integer_to_list(N),
OriginalInstallDir = emqx_plugins:install_dir(),
ok = filelib:ensure_dir(filename:join([TmpDir, "foo"])),
ok = emqx_plugins:put_config(install_dir, TmpDir),
try
F(TmpDir)
after
file:del_dir_r(TmpDir),
ok = emqx_plugins:put_config(install_dir, OriginalInstallDir)
end.
write_file(Path, Content) ->
ok = filelib:ensure_dir(Path),
file:write_file(Path, Content).
%% delete package should mostly work and return ok
%% but it may fail in case the path is a directory
%% or if the file is read-only
delete_package_test() ->
with_rand_install_dir(
fun(_Dir) ->
File = emqx_plugins:pkg_file("a-1"),
ok = write_file(File, "a"),
ok = emqx_plugins:delete_package("a-1"),
%% delete again should be ok
ok = emqx_plugins:delete_package("a-1"),
Dir = File,
ok = filelib:ensure_dir(filename:join([Dir, "foo"])),
?assertMatch({error, _}, emqx_plugins:delete_package("a-1"))
end).
%% purge plugin's install dir should mostly work and return ok
%% but it may fail in case the dir is read-only
purge_test() ->
with_rand_install_dir(
fun(_Dir) ->
File = emqx_plugins:info_file("a-1"),
Dir = emqx_plugins:dir("a-1"),
ok = filelib:ensure_dir(File),
?assertMatch({ok, _}, file:read_file_info(Dir)),
?assertEqual(ok, emqx_plugins:purge("a-1")),
%% assert the dir is gone
?assertMatch({error, enoent}, file:read_file_info(Dir)),
%% wite a file for the dir path
ok = file:write_file(Dir, "a"),
?assertEqual(ok, emqx_plugins:purge("a-1"))
end).