Read new configuration items when plugins is loaded

This commit is contained in:
zhouzb 2020-03-23 15:23:38 +08:00
parent e492260fe8
commit 3b537760d5
6 changed files with 128 additions and 104 deletions

View File

@ -1909,11 +1909,6 @@ broker.sys_interval = 1m
## Default: 30s ## Default: 30s
broker.sys_heartbeat = 30s broker.sys_heartbeat = 30s
## Enable global session registry.
##
## Value: on | off
broker.enable_session_registry = on
## Session locking strategy in a cluster. ## Session locking strategy in a cluster.
## ##
## Value: Enum ## Value: Enum

View File

@ -128,7 +128,6 @@
-record(plugin, { -record(plugin, {
name :: atom(), name :: atom(),
version :: string(),
dir :: string(), dir :: string(),
descr :: string(), descr :: string(),
vendor :: string(), vendor :: string(),

View File

@ -62,7 +62,7 @@ start_link() ->
%% @doc Is the global registry enabled? %% @doc Is the global registry enabled?
-spec(is_enabled() -> boolean()). -spec(is_enabled() -> boolean()).
is_enabled() -> is_enabled() ->
emqx:get_env(enable_channel_registry, true). emqx:get_env(enable_session_registry, true).
%% @doc Register a global channel. %% @doc Register a global channel.
-spec(register_channel(emqx_types:clientid() -spec(register_channel(emqx_types:clientid()

View File

@ -27,9 +27,9 @@
, load/1 , load/1
, unload/0 , unload/0
, unload/1 , unload/1
, reload/1
, list/0 , list/0
, find_plugin/1 , find_plugin/1
, load_expand_plugin/1
]). ]).
-ifdef(TEST). -ifdef(TEST).
@ -51,12 +51,6 @@ init() ->
lists:foreach(fun init_config/1, CfgFiles) lists:foreach(fun init_config/1, CfgFiles)
end. end.
init_config(CfgFile) ->
{ok, [AppsEnv]} = file:consult(CfgFile),
lists:foreach(fun({App, Envs}) ->
[application:set_env(App, Par, Val) || {Par, Val} <- Envs]
end, AppsEnv).
%% @doc Load all plugins when the broker started. %% @doc Load all plugins when the broker started.
-spec(load() -> list() | {error, term()}). -spec(load() -> list() | {error, term()}).
load() -> load() ->
@ -68,6 +62,92 @@ load() ->
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
end. end.
%% @doc Load a Plugin
-spec(load(atom()) -> ok | {error, term()}).
load(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} ->
?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]),
{error, not_found};
{_, true} ->
?LOG(notice, "Plugin ~s is already started", [PluginName]),
{error, already_started};
{_, false} ->
try
Configs = generate_configs(PluginName),
apply_configs(Configs),
load_plugin(PluginName, true)
catch _ : Error : Stacktrace ->
?LOG(alert, "Plugin ~s load failed with ~p", [PluginName, {Error, Stacktrace}]),
{error, parse_config_file_failed}
end
end.
%% @doc Unload all plugins before broker stopped.
-spec(unload() -> list() | {error, term()}).
unload() ->
case emqx:get_env(plugins_loaded_file) of
undefined -> ignore;
File ->
with_loaded_file(File, fun stop_plugins/1)
end.
%% @doc UnLoad a Plugin
-spec(unload(atom()) -> ok | {error, term()}).
unload(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} ->
?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]),
{error, not_found};
{_, false} ->
?LOG(error, "Plugin ~s is not started", [PluginName]),
{error, not_started};
{_, _} ->
unload_plugin(PluginName, true)
end.
reload(PluginName) when is_atom(PluginName)->
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
{false, _} ->
?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]),
{error, not_found};
{_, false} ->
load(PluginName);
{_, true} ->
case unload(PluginName) of
ok -> load(PluginName);
{error, Reason} -> {error, Reason}
end
end.
%% @doc List all available plugins
-spec(list() -> [emqx_types:plugin()]).
list() ->
StartedApps = names(started_app),
lists:map(fun({Name, _, [Type| _]}) ->
Plugin = plugin(Name, Type),
case lists:member(Name, StartedApps) of
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
find_plugin(Name) ->
find_plugin(Name, list()).
find_plugin(Name, Plugins) ->
lists:keyfind(Name, 2, Plugins).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
init_config(CfgFile) ->
{ok, [AppsEnv]} = file:consult(CfgFile),
lists:foreach(fun({App, Envs}) ->
[application:set_env(App, Par, Val) || {Par, Val} <- Envs]
end, AppsEnv).
load_expand_plugins() -> load_expand_plugins() ->
case emqx:get_env(expand_plugins_dir) of case emqx:get_env(expand_plugins_dir) of
undefined -> ok; undefined -> ok;
@ -136,60 +216,37 @@ load_plugins(Names, Persistent) ->
NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound]) NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound])
end, end,
NeedToLoad = Names -- NotFound -- names(started_app), NeedToLoad = Names -- NotFound -- names(started_app),
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. lists:foreach(fun(Name) ->
Plugin = find_plugin(Name, Plugins),
load_plugin(Plugin#plugin.name, Persistent)
end, NeedToLoad).
%% @doc Unload all plugins before broker stopped. generate_configs(App) ->
-spec(unload() -> list() | {error, term()}). Schema = cuttlefish_schema:files([filename:join([code:priv_dir(App), App]) ++ ".schema"]),
unload() -> Conf = cuttlefish_conf:file(filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf"),
case emqx:get_env(plugins_loaded_file) of cuttlefish_generator:map(Schema, Conf).
undefined -> ignore;
File -> apply_configs([]) ->
with_loaded_file(File, fun stop_plugins/1) ok;
end. apply_configs([{App, Config} | More]) ->
lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)),
lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config),
apply_configs(More).
%% Stop plugins %% Stop plugins
stop_plugins(Names) -> stop_plugins(Names) ->
[stop_app(App) || App <- Names]. [stop_app(App) || App <- Names],
ok.
%% @doc List all available plugins
-spec(list() -> [emqx_types:plugin()]).
list() ->
StartedApps = names(started_app),
lists:map(fun({Name, _, [Type| _]}) ->
Plugin = plugin(Name, Type),
case lists:member(Name, StartedApps) of
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
plugin(AppName, Type) -> plugin(AppName, Type) ->
case application:get_all_key(AppName) of case application:get_all_key(AppName) of
{ok, Attrs} -> {ok, Attrs} ->
Ver = proplists:get_value(vsn, Attrs, "0"),
Descr = proplists:get_value(description, Attrs, ""), Descr = proplists:get_value(description, Attrs, ""),
#plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)}; #plugin{name = AppName, descr = Descr, type = plugin_type(Type)};
undefined -> error({plugin_not_found, AppName}) undefined -> error({plugin_not_found, AppName})
end. end.
%% @doc Load a Plugin load_plugin(Name, Persistent) ->
-spec(load(atom()) -> ok | {error, term()}).
load(PluginName) when is_atom(PluginName) ->
case lists:member(PluginName, names(started_app)) of
true ->
?LOG(notice, "Plugin ~s is already started", [PluginName]),
{error, already_started};
false ->
case find_plugin(PluginName) of
false ->
?LOG(alert, "Plugin ~s not found", [PluginName]),
{error, not_found};
Plugin ->
load_plugin(Plugin, true)
end
end.
load_plugin(#plugin{name = Name}, Persistent) ->
case load_app(Name) of case load_app(Name) of
ok -> ok ->
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
@ -219,26 +276,6 @@ start_app(App, SuccFun) ->
{error, {ErrApp, Reason}} {error, {ErrApp, Reason}}
end. end.
find_plugin(Name) ->
find_plugin(Name, list()).
find_plugin(Name, Plugins) ->
lists:keyfind(Name, 2, Plugins).
%% @doc UnLoad a Plugin
-spec(unload(atom()) -> ok | {error, term()}).
unload(PluginName) when is_atom(PluginName) ->
case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of
{true, true} ->
unload_plugin(PluginName, true);
{false, _} ->
?LOG(error, "Plugin ~s is not started", [PluginName]),
{error, not_started};
{true, false} ->
?LOG(error, "~s is not a plugin, cannot unload it", [PluginName]),
{error, not_found}
end.
unload_plugin(App, Persistent) -> unload_plugin(App, Persistent) ->
case stop_app(App) of case stop_app(App) of
ok -> ok ->
@ -257,9 +294,6 @@ stop_app(App) ->
?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason} ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason}
end. end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
names(plugin) -> names(plugin) ->
names(list()); names(list());

View File

@ -42,26 +42,26 @@ end_per_testcase(_TestCase, Config) ->
Config. Config.
t_is_enabled(_) -> t_is_enabled(_) ->
application:set_env(emqx, enable_channel_registry, false), application:set_env(emqx, enable_session_registry, false),
?assertEqual(false, emqx_cm_registry:is_enabled()), ?assertEqual(false, emqx_cm_registry:is_enabled()),
application:set_env(emqx, enable_channel_registry, true), application:set_env(emqx, enable_session_registry, true),
?assertEqual(true, emqx_cm_registry:is_enabled()). ?assertEqual(true, emqx_cm_registry:is_enabled()).
t_register_unregister_channel(_) -> t_register_unregister_channel(_) ->
ClientId = <<"clientid">>, ClientId = <<"clientid">>,
application:set_env(emqx, enable_channel_registry, false), application:set_env(emqx, enable_session_registry, false),
emqx_cm_registry:register_channel(ClientId), emqx_cm_registry:register_channel(ClientId),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, true), application:set_env(emqx, enable_session_registry, true),
emqx_cm_registry:register_channel(ClientId), emqx_cm_registry:register_channel(ClientId),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, false), application:set_env(emqx, enable_session_registry, false),
emqx_cm_registry:unregister_channel(ClientId), emqx_cm_registry:unregister_channel(ClientId),
?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
application:set_env(emqx, enable_channel_registry, true), application:set_env(emqx, enable_session_registry, true),
emqx_cm_registry:unregister_channel(ClientId), emqx_cm_registry:unregister_channel(ClientId),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)). ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)).

View File

@ -24,7 +24,6 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
%% Compile extra plugin code %% Compile extra plugin code
@ -55,13 +54,11 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
t_load(_) -> t_load(_) ->
?assertEqual([], emqx_plugins:load()), ?assertEqual(ok, emqx_plugins:load()),
?assertEqual([], emqx_plugins:unload()), ?assertEqual(ok, emqx_plugins:unload()),
?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)), ?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)),
?assertMatch(ok, emqx_plugins:load(emqx_mini_plugin)), ?assertEqual({error, parse_config_file_failed}, emqx_plugins:load(emqx_mini_plugin)),
?assertEqual({error, already_started}, emqx_plugins:load(emqx_mini_plugin)),
?assertEqual(ok, emqx_plugins:unload(emqx_mini_plugin)),
?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)), ?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)),
application:set_env(emqx, expand_plugins_dir, undefined), application:set_env(emqx, expand_plugins_dir, undefined),
@ -82,10 +79,10 @@ t_load_expand_plugin(_) ->
?assertEqual({error, load_app_fail}, emqx_plugins:load_expand_plugin("./not_existed_path/")). ?assertEqual({error, load_app_fail}, emqx_plugins:load_expand_plugin("./not_existed_path/")).
t_list(_) -> t_list(_) ->
?assertMatch([{plugin, _, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()). ?assertMatch([{plugin, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()).
t_find_plugin(_) -> t_find_plugin(_) ->
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)). ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)).
t_plugin_type(_) -> t_plugin_type(_) ->
?assertEqual(auth, emqx_plugins:plugin_type(auth)), ?assertEqual(auth, emqx_plugins:plugin_type(auth)),
@ -112,7 +109,7 @@ t_plugin(_) ->
_Error:Reason:_Stacktrace -> _Error:Reason:_Stacktrace ->
?assertEqual({plugin_not_found,not_existed_plugin}, Reason) ?assertEqual({plugin_not_found,not_existed_plugin}, Reason)
end, end,
?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin, undefined)). ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin, undefined)).
t_filter_plugins(_) -> t_filter_plugins(_) ->
?assertEqual([name1, name2], emqx_plugins:filter_plugins([name1, {name2,true}, {name3, false}])). ?assertEqual([name1, name2], emqx_plugins:filter_plugins([name1, {name2,true}, {name3, false}])).
@ -120,27 +117,26 @@ t_filter_plugins(_) ->
t_load_plugin(_) -> t_load_plugin(_) ->
ok = meck:new(application, [unstick, non_strict, passthrough, no_history]), ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
ok = meck:expect(application, load, fun(already_loaded_app) -> {error, {already_loaded, already_loaded_app}}; ok = meck:expect(application, load, fun(already_loaded_app) -> {error, {already_loaded, already_loaded_app}};
(error_app) -> {error, error}; (error_app) -> {error, error};
(_) -> ok end), (_) -> ok end),
ok = meck:expect(application, ensure_all_started, fun(already_loaded_app) -> {error, {already_loaded_app, already_loaded}}; ok = meck:expect(application, ensure_all_started, fun(already_loaded_app) -> {error, {already_loaded_app, already_loaded}};
(error_app) -> {error, error}; (error_app) -> {error, error};
(App) -> {ok, App} end), (App) -> {ok, App} end),
?assertMatch({error, _}, emqx_plugins:load_plugin(#plugin{name = already_loaded_app}, true)), ?assertMatch({error, _}, emqx_plugins:load_plugin(already_loaded_app, true)),
?assertMatch(ok, emqx_plugins:load_plugin(#plugin{name = normal}, true)), ?assertMatch(ok, emqx_plugins:load_plugin(normal, true)),
?assertMatch({error,_}, emqx_plugins:load_plugin(#plugin{name = error_app}, true)), ?assertMatch({error,_}, emqx_plugins:load_plugin(error_app, true)),
ok = meck:unload(application). ok = meck:unload(application).
t_unload_plugin(_) -> t_unload_plugin(_) ->
ok = meck:new(application, [unstick, non_strict, passthrough, no_history]), ok = meck:new(application, [unstick, non_strict, passthrough, no_history]),
ok = meck:expect(application, stop, fun(not_started_app) -> {error, {not_started, not_started_app}}; ok = meck:expect(application, stop, fun(not_started_app) -> {error, {not_started, not_started_app}};
(error_app) -> {error, error}; (error_app) -> {error, error};
(_) -> ok end), (_) -> ok end),
?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app, true)), ?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app, true)),
?assertEqual(ok, emqx_plugins:unload_plugin(normal, true)), ?assertEqual(ok, emqx_plugins:unload_plugin(normal, true)),
?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app, true)), ?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app, true)),
ok = meck:unload(application). ok = meck:unload(application).