Merge pull request #3335 from emqx/reload_plugin

Read new configuration items when plugins is loaded
This commit is contained in:
tigercl 2020-03-26 18:35:49 +08:00 committed by GitHub
commit 5eadca1782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 143 additions and 108 deletions

View File

@ -1941,11 +1941,6 @@ broker.sys_interval = 1m
## Default: 30s
broker.sys_heartbeat = 30s
## Enable global session registry.
##
## Value: on | off
broker.enable_session_registry = on
## Session locking strategy in a cluster.
##
## Value: Enum

View File

@ -128,7 +128,6 @@
-record(plugin, {
name :: atom(),
version :: string(),
dir :: string(),
descr :: string(),
vendor :: string(),

View File

@ -62,7 +62,7 @@ start_link() ->
%% @doc Is the global registry enabled?
-spec(is_enabled() -> boolean()).
is_enabled() ->
emqx:get_env(enable_channel_registry, true).
emqx:get_env(enable_session_registry, true).
%% @doc Register a global channel.
-spec(register_channel(emqx_types:clientid()

View File

@ -27,9 +27,9 @@
, load/1
, unload/0
, unload/1
, reload/1
, list/0
, find_plugin/1
, load_expand_plugin/1
]).
-ifdef(TEST).
@ -51,12 +51,6 @@ init() ->
lists:foreach(fun init_config/1, CfgFiles)
end.
init_config(CfgFile) ->
{ok, [AppsEnv]} = file:consult(CfgFile),
lists:foreach(fun({App, Envs}) ->
[application:set_env(App, Par, Val) || {Par, Val} <- Envs]
end, AppsEnv).
%% @doc Load all plugins when the broker started.
-spec(load() -> list() | {error, term()}).
load() ->
@ -68,6 +62,85 @@ load() ->
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
end.
%% @doc Load a Plugin
-spec(load(atom()) -> ok | {error, term()}).
load(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} ->
?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]),
{error, not_found};
{_, true} ->
?LOG(notice, "Plugin ~s is already started", [PluginName]),
{error, already_started};
{_, false} ->
load_plugin(PluginName, true)
end.
%% @doc Unload all plugins before broker stopped.
-spec(unload() -> list() | {error, term()}).
unload() ->
case emqx:get_env(plugins_loaded_file) of
undefined -> ignore;
File ->
with_loaded_file(File, fun stop_plugins/1)
end.
%% @doc UnLoad a Plugin
-spec(unload(atom()) -> ok | {error, term()}).
unload(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} ->
?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]),
{error, not_found};
{_, false} ->
?LOG(error, "Plugin ~s is not started", [PluginName]),
{error, not_started};
{_, _} ->
unload_plugin(PluginName, true)
end.
reload(PluginName) when is_atom(PluginName)->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} ->
?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]),
{error, not_found};
{_, false} ->
load(PluginName);
{_, true} ->
case unload(PluginName) of
ok -> load(PluginName);
{error, Reason} -> {error, Reason}
end
end.
%% @doc List all available plugins
-spec(list() -> [emqx_types:plugin()]).
list() ->
StartedApps = names(started_app),
lists:map(fun({Name, _, [Type| _]}) ->
Plugin = plugin(Name, Type),
case lists:member(Name, StartedApps) of
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
find_plugin(Name) ->
find_plugin(Name, list()).
find_plugin(Name, Plugins) ->
lists:keyfind(Name, 2, Plugins).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
init_config(CfgFile) ->
{ok, [AppsEnv]} = file:consult(CfgFile),
lists:foreach(fun({App, Envs}) ->
[application:set_env(App, Par, Val) || {Par, Val} <- Envs]
end, AppsEnv).
load_expand_plugins() ->
case emqx:get_env(expand_plugins_dir) of
undefined -> ok;
@ -136,65 +209,60 @@ load_plugins(Names, Persistent) ->
NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound])
end,
NeedToLoad = Names -- NotFound -- names(started_app),
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
lists:foreach(fun(Name) ->
Plugin = find_plugin(Name, Plugins),
load_plugin(Plugin#plugin.name, Persistent)
end, NeedToLoad).
%% @doc Unload all plugins before broker stopped.
-spec(unload() -> list() | {error, term()}).
unload() ->
case emqx:get_env(plugins_loaded_file) of
undefined -> ignore;
File ->
with_loaded_file(File, fun stop_plugins/1)
generate_configs(App) ->
ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",
ConfFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
case {filelib:is_file(ConfigFile), filelib:is_file(ConfFile) andalso filelib:is_file(SchemaFile)} of
{true, _} ->
{ok, [Configs]} = file:consult(ConfigFile),
Configs;
{_, true} ->
Schema = cuttlefish_schema:files([SchemaFile]),
Conf = cuttlefish_conf:file(ConfFile),
cuttlefish_generator:map(Schema, Conf);
{false, false} ->
error(no_avaliable_configuration)
end.
apply_configs([]) ->
ok;
apply_configs([{App, Config} | More]) ->
lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)),
lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config),
apply_configs(More).
%% Stop plugins
stop_plugins(Names) ->
[stop_app(App) || App <- Names].
%% @doc List all available plugins
-spec(list() -> [emqx_types:plugin()]).
list() ->
StartedApps = names(started_app),
lists:map(fun({Name, _, [Type| _]}) ->
Plugin = plugin(Name, Type),
case lists:member(Name, StartedApps) of
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
[stop_app(App) || App <- Names],
ok.
plugin(AppName, Type) ->
case application:get_all_key(AppName) of
{ok, Attrs} ->
Ver = proplists:get_value(vsn, Attrs, "0"),
Descr = proplists:get_value(description, Attrs, ""),
#plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)};
#plugin{name = AppName, descr = Descr, type = plugin_type(Type)};
undefined -> error({plugin_not_found, AppName})
end.
%% @doc Load a Plugin
-spec(load(atom()) -> ok | {error, term()}).
load(PluginName) when is_atom(PluginName) ->
case lists:member(PluginName, names(started_app)) of
true ->
?LOG(notice, "Plugin ~s is already started", [PluginName]),
{error, already_started};
false ->
case find_plugin(PluginName) of
false ->
?LOG(alert, "Plugin ~s not found", [PluginName]),
{error, not_found};
Plugin ->
load_plugin(Plugin, true)
end
end.
load_plugin(#plugin{name = Name}, Persistent) ->
case load_app(Name) of
ok ->
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
{error, Error} ->
{error, Error}
load_plugin(Name, Persistent) ->
try
Configs = generate_configs(Name),
apply_configs(Configs),
case load_app(Name) of
ok ->
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
{error, Error0} ->
{error, Error0}
end
catch _ : Error : Stacktrace ->
?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]),
{error, parse_config_file_failed}
end.
load_app(App) ->
@ -219,26 +287,6 @@ start_app(App, SuccFun) ->
{error, {ErrApp, Reason}}
end.
find_plugin(Name) ->
find_plugin(Name, list()).
find_plugin(Name, Plugins) ->
lists:keyfind(Name, 2, Plugins).
%% @doc UnLoad a Plugin
-spec(unload(atom()) -> ok | {error, term()}).
unload(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of
{true, true} ->
unload_plugin(PluginName, true);
{false, _} ->
?LOG(error, "Plugin ~s is not started", [PluginName]),
{error, not_started};
{true, false} ->
?LOG(error, "~s is not a plugin, cannot unload it", [PluginName]),
{error, not_found}
end.
unload_plugin(App, Persistent) ->
case stop_app(App) of
ok ->
@ -257,9 +305,6 @@ stop_app(App) ->
?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason}
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
names(plugin) ->
names(list());

View File

@ -42,26 +42,26 @@ end_per_testcase(_TestCase, Config) ->
Config.
t_is_enabled(_) ->
application:set_env(emqx, enable_channel_registry, false),
application:set_env(emqx, enable_session_registry, false),
?assertEqual(false, emqx_cm_registry:is_enabled()),
application:set_env(emqx, enable_channel_registry, true),
application:set_env(emqx, enable_session_registry, true),
?assertEqual(true, emqx_cm_registry:is_enabled()).
t_register_unregister_channel(_) ->
ClientId = <<"clientid">>,
application:set_env(emqx, enable_channel_registry, false),
application:set_env(emqx, enable_session_registry, false),
emqx_cm_registry:register_channel(ClientId),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, true),
application:set_env(emqx, enable_session_registry, true),
emqx_cm_registry:register_channel(ClientId),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, false),
application:set_env(emqx, enable_session_registry, false),
emqx_cm_registry:unregister_channel(ClientId),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, true),
application:set_env(emqx, enable_session_registry, true),
emqx_cm_registry:unregister_channel(ClientId),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)).

View File

@ -24,7 +24,6 @@
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
%% Compile extra plugin code
@ -55,13 +54,11 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_load(_) ->
?assertEqual([], emqx_plugins:load()),
?assertEqual([], emqx_plugins:unload()),
?assertEqual(ok, emqx_plugins:load()),
?assertEqual(ok, emqx_plugins:unload()),
?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)),
?assertMatch(ok, emqx_plugins:load(emqx_mini_plugin)),
?assertEqual({error, already_started}, emqx_plugins:load(emqx_mini_plugin)),
?assertEqual(ok, emqx_plugins:unload(emqx_mini_plugin)),
?assertEqual({error, parse_config_file_failed}, emqx_plugins:load(emqx_mini_plugin)),
?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)),
application:set_env(emqx, expand_plugins_dir, undefined),
@ -82,10 +79,10 @@ t_load_expand_plugin(_) ->
?assertEqual({error, load_app_fail}, emqx_plugins:load_expand_plugin("./not_existed_path/")).
t_list(_) ->
?assertMatch([{plugin, _, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()).
?assertMatch([{plugin, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()).
t_find_plugin(_) ->
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)).
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)).
t_plugin_type(_) ->
?assertEqual(auth, emqx_plugins:plugin_type(auth)),
@ -112,7 +109,7 @@ t_plugin(_) ->
_Error:Reason:_Stacktrace ->
?assertEqual({plugin_not_found,not_existed_plugin}, Reason)
end,
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin, undefined)).
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin, undefined)).
t_filter_plugins(_) ->
?assertEqual([name1, name2], emqx_plugins:filter_plugins([name1, {name2,true}, {name3, false}])).
@ -120,27 +117,26 @@ t_filter_plugins(_) ->
t_load_plugin(_) ->
ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
ok = meck:expect(application, load, fun(already_loaded_app) -> {error, {already_loaded, already_loaded_app}};
(error_app) -> {error, error};
(_) -> ok end),
(error_app) -> {error, error};
(_) -> ok end),
ok = meck:expect(application, ensure_all_started, fun(already_loaded_app) -> {error, {already_loaded_app, already_loaded}};
(error_app) -> {error, error};
(App) -> {ok, App} end),
(error_app) -> {error, error};
(App) -> {ok, App} end),
?assertMatch({error, _}, emqx_plugins:load_plugin(#plugin{name = already_loaded_app}, true)),
?assertMatch(ok, emqx_plugins:load_plugin(#plugin{name = normal}, true)),
?assertMatch({error,_}, emqx_plugins:load_plugin(#plugin{name = error_app}, true)),
?assertMatch({error, _}, emqx_plugins:load_plugin(already_loaded_app, true)),
?assertMatch(ok, emqx_plugins:load_plugin(normal, true)),
?assertMatch({error,_}, emqx_plugins:load_plugin(error_app, true)),
ok = meck:unload(application).
t_unload_plugin(_) ->
ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
ok = meck:expect(application, stop, fun(not_started_app) -> {error, {not_started, not_started_app}};
(error_app) -> {error, error};
(_) -> ok end),
(error_app) -> {error, error};
(_) -> ok end),
?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app, true)),
?assertEqual(ok, emqx_plugins:unload_plugin(normal, true)),
?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app, true)),
ok = meck:unload(application).