diff --git a/etc/emqx.conf b/etc/emqx.conf index 173aedb3d..eea3381d2 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1941,11 +1941,6 @@ broker.sys_interval = 1m ## Default: 30s broker.sys_heartbeat = 30s -## Enable global session registry. -## -## Value: on | off -broker.enable_session_registry = on - ## Session locking strategy in a cluster. ## ## Value: Enum diff --git a/include/emqx.hrl b/include/emqx.hrl index 87bc520cb..acca359c4 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -128,7 +128,6 @@ -record(plugin, { name :: atom(), - version :: string(), dir :: string(), descr :: string(), vendor :: string(), diff --git a/src/emqx_cm_registry.erl b/src/emqx_cm_registry.erl index c45c497ff..6609d70f4 100644 --- a/src/emqx_cm_registry.erl +++ b/src/emqx_cm_registry.erl @@ -62,7 +62,7 @@ start_link() -> %% @doc Is the global registry enabled? -spec(is_enabled() -> boolean()). is_enabled() -> - emqx:get_env(enable_channel_registry, true). + emqx:get_env(enable_session_registry, true). %% @doc Register a global channel. -spec(register_channel(emqx_types:clientid() diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 381782fda..b6e2b6156 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -27,9 +27,9 @@ , load/1 , unload/0 , unload/1 + , reload/1 , list/0 , find_plugin/1 - , load_expand_plugin/1 ]). -ifdef(TEST). @@ -51,12 +51,6 @@ init() -> lists:foreach(fun init_config/1, CfgFiles) end. -init_config(CfgFile) -> - {ok, [AppsEnv]} = file:consult(CfgFile), - lists:foreach(fun({App, Envs}) -> - [application:set_env(App, Par, Val) || {Par, Val} <- Envs] - end, AppsEnv). - %% @doc Load all plugins when the broker started. -spec(load() -> list() | {error, term()}). load() -> @@ -68,6 +62,85 @@ load() -> with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) end. +%% @doc Load a Plugin +-spec(load(atom()) -> ok | {error, term()}). +load(PluginName) when is_atom(PluginName) -> + case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of + {false, _} -> + ?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]), + {error, not_found}; + {_, true} -> + ?LOG(notice, "Plugin ~s is already started", [PluginName]), + {error, already_started}; + {_, false} -> + load_plugin(PluginName, true) + end. + +%% @doc Unload all plugins before broker stopped. +-spec(unload() -> list() | {error, term()}). +unload() -> + case emqx:get_env(plugins_loaded_file) of + undefined -> ignore; + File -> + with_loaded_file(File, fun stop_plugins/1) + end. + +%% @doc UnLoad a Plugin +-spec(unload(atom()) -> ok | {error, term()}). +unload(PluginName) when is_atom(PluginName) -> + case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of + {false, _} -> + ?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]), + {error, not_found}; + {_, false} -> + ?LOG(error, "Plugin ~s is not started", [PluginName]), + {error, not_started}; + {_, _} -> + unload_plugin(PluginName, true) + end. + +reload(PluginName) when is_atom(PluginName)-> + case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of + {false, _} -> + ?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]), + {error, not_found}; + {_, false} -> + load(PluginName); + {_, true} -> + case unload(PluginName) of + ok -> load(PluginName); + {error, Reason} -> {error, Reason} + end + end. + +%% @doc List all available plugins +-spec(list() -> [emqx_types:plugin()]). +list() -> + StartedApps = names(started_app), + lists:map(fun({Name, _, [Type| _]}) -> + Plugin = plugin(Name, Type), + case lists:member(Name, StartedApps) of + true -> Plugin#plugin{active = true}; + false -> Plugin + end + end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). + +find_plugin(Name) -> + find_plugin(Name, list()). + +find_plugin(Name, Plugins) -> + lists:keyfind(Name, 2, Plugins). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +init_config(CfgFile) -> + {ok, [AppsEnv]} = file:consult(CfgFile), + lists:foreach(fun({App, Envs}) -> + [application:set_env(App, Par, Val) || {Par, Val} <- Envs] + end, AppsEnv). + load_expand_plugins() -> case emqx:get_env(expand_plugins_dir) of undefined -> ok; @@ -136,65 +209,60 @@ load_plugins(Names, Persistent) -> NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), - [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. + lists:foreach(fun(Name) -> + Plugin = find_plugin(Name, Plugins), + load_plugin(Plugin#plugin.name, Persistent) + end, NeedToLoad). -%% @doc Unload all plugins before broker stopped. --spec(unload() -> list() | {error, term()}). -unload() -> - case emqx:get_env(plugins_loaded_file) of - undefined -> ignore; - File -> - with_loaded_file(File, fun stop_plugins/1) +generate_configs(App) -> + ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config", + ConfFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf", + SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema", + case {filelib:is_file(ConfigFile), filelib:is_file(ConfFile) andalso filelib:is_file(SchemaFile)} of + {true, _} -> + {ok, [Configs]} = file:consult(ConfigFile), + Configs; + {_, true} -> + Schema = cuttlefish_schema:files([SchemaFile]), + Conf = cuttlefish_conf:file(ConfFile), + cuttlefish_generator:map(Schema, Conf); + {false, false} -> + error(no_avaliable_configuration) end. +apply_configs([]) -> + ok; +apply_configs([{App, Config} | More]) -> + lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)), + lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config), + apply_configs(More). + %% Stop plugins stop_plugins(Names) -> - [stop_app(App) || App <- Names]. - -%% @doc List all available plugins --spec(list() -> [emqx_types:plugin()]). -list() -> - StartedApps = names(started_app), - lists:map(fun({Name, _, [Type| _]}) -> - Plugin = plugin(Name, Type), - case lists:member(Name, StartedApps) of - true -> Plugin#plugin{active = true}; - false -> Plugin - end - end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). + [stop_app(App) || App <- Names], + ok. plugin(AppName, Type) -> case application:get_all_key(AppName) of {ok, Attrs} -> - Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)}; + #plugin{name = AppName, descr = Descr, type = plugin_type(Type)}; undefined -> error({plugin_not_found, AppName}) end. -%% @doc Load a Plugin --spec(load(atom()) -> ok | {error, term()}). -load(PluginName) when is_atom(PluginName) -> - case lists:member(PluginName, names(started_app)) of - true -> - ?LOG(notice, "Plugin ~s is already started", [PluginName]), - {error, already_started}; - false -> - case find_plugin(PluginName) of - false -> - ?LOG(alert, "Plugin ~s not found", [PluginName]), - {error, not_found}; - Plugin -> - load_plugin(Plugin, true) - end - end. - -load_plugin(#plugin{name = Name}, Persistent) -> - case load_app(Name) of - ok -> - start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); - {error, Error} -> - {error, Error} +load_plugin(Name, Persistent) -> + try + Configs = generate_configs(Name), + apply_configs(Configs), + case load_app(Name) of + ok -> + start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); + {error, Error0} -> + {error, Error0} + end + catch _ : Error : Stacktrace -> + ?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]), + {error, parse_config_file_failed} end. load_app(App) -> @@ -219,26 +287,6 @@ start_app(App, SuccFun) -> {error, {ErrApp, Reason}} end. -find_plugin(Name) -> - find_plugin(Name, list()). - -find_plugin(Name, Plugins) -> - lists:keyfind(Name, 2, Plugins). - -%% @doc UnLoad a Plugin --spec(unload(atom()) -> ok | {error, term()}). -unload(PluginName) when is_atom(PluginName) -> - case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of - {true, true} -> - unload_plugin(PluginName, true); - {false, _} -> - ?LOG(error, "Plugin ~s is not started", [PluginName]), - {error, not_started}; - {true, false} -> - ?LOG(error, "~s is not a plugin, cannot unload it", [PluginName]), - {error, not_found} - end. - unload_plugin(App, Persistent) -> case stop_app(App) of ok -> @@ -257,9 +305,6 @@ stop_app(App) -> ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason} end. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- names(plugin) -> names(list()); diff --git a/test/emqx_cm_registry_SUITE.erl b/test/emqx_cm_registry_SUITE.erl index a95bca2ae..edc5ff9c4 100644 --- a/test/emqx_cm_registry_SUITE.erl +++ b/test/emqx_cm_registry_SUITE.erl @@ -42,26 +42,26 @@ end_per_testcase(_TestCase, Config) -> Config. t_is_enabled(_) -> - application:set_env(emqx, enable_channel_registry, false), + application:set_env(emqx, enable_session_registry, false), ?assertEqual(false, emqx_cm_registry:is_enabled()), - application:set_env(emqx, enable_channel_registry, true), + application:set_env(emqx, enable_session_registry, true), ?assertEqual(true, emqx_cm_registry:is_enabled()). t_register_unregister_channel(_) -> ClientId = <<"clientid">>, - application:set_env(emqx, enable_channel_registry, false), + application:set_env(emqx, enable_session_registry, false), emqx_cm_registry:register_channel(ClientId), ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), - application:set_env(emqx, enable_channel_registry, true), + application:set_env(emqx, enable_session_registry, true), emqx_cm_registry:register_channel(ClientId), ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), - application:set_env(emqx, enable_channel_registry, false), + application:set_env(emqx, enable_session_registry, false), emqx_cm_registry:unregister_channel(ClientId), ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)), - application:set_env(emqx, enable_channel_registry, true), + application:set_env(emqx, enable_session_registry, true), emqx_cm_registry:unregister_channel(ClientId), ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)). diff --git a/test/emqx_plugins_SUITE.erl b/test/emqx_plugins_SUITE.erl index 50af9ff3e..85c14933b 100644 --- a/test/emqx_plugins_SUITE.erl +++ b/test/emqx_plugins_SUITE.erl @@ -24,7 +24,6 @@ all() -> emqx_ct:all(?MODULE). - init_per_suite(Config) -> %% Compile extra plugin code @@ -55,13 +54,11 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_load(_) -> - ?assertEqual([], emqx_plugins:load()), - ?assertEqual([], emqx_plugins:unload()), + ?assertEqual(ok, emqx_plugins:load()), + ?assertEqual(ok, emqx_plugins:unload()), ?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)), - ?assertMatch(ok, emqx_plugins:load(emqx_mini_plugin)), - ?assertEqual({error, already_started}, emqx_plugins:load(emqx_mini_plugin)), - ?assertEqual(ok, emqx_plugins:unload(emqx_mini_plugin)), + ?assertEqual({error, parse_config_file_failed}, emqx_plugins:load(emqx_mini_plugin)), ?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)), application:set_env(emqx, expand_plugins_dir, undefined), @@ -82,10 +79,10 @@ t_load_expand_plugin(_) -> ?assertEqual({error, load_app_fail}, emqx_plugins:load_expand_plugin("./not_existed_path/")). t_list(_) -> - ?assertMatch([{plugin, _, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()). + ?assertMatch([{plugin, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()). t_find_plugin(_) -> - ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)). + ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _}, emqx_plugins:find_plugin(emqx_mini_plugin)). t_plugin_type(_) -> ?assertEqual(auth, emqx_plugins:plugin_type(auth)), @@ -112,7 +109,7 @@ t_plugin(_) -> _Error:Reason:_Stacktrace -> ?assertEqual({plugin_not_found,not_existed_plugin}, Reason) end, - ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin, undefined)). + ?assertMatch({plugin, emqx_mini_plugin, _, _, _, _, _, _}, emqx_plugins:plugin(emqx_mini_plugin, undefined)). t_filter_plugins(_) -> ?assertEqual([name1, name2], emqx_plugins:filter_plugins([name1, {name2,true}, {name3, false}])). @@ -120,27 +117,26 @@ t_filter_plugins(_) -> t_load_plugin(_) -> ok = meck:new(application, [unstick, non_strict, passthrough, no_history]), ok = meck:expect(application, load, fun(already_loaded_app) -> {error, {already_loaded, already_loaded_app}}; - (error_app) -> {error, error}; - (_) -> ok end), + (error_app) -> {error, error}; + (_) -> ok end), ok = meck:expect(application, ensure_all_started, fun(already_loaded_app) -> {error, {already_loaded_app, already_loaded}}; - (error_app) -> {error, error}; - (App) -> {ok, App} end), + (error_app) -> {error, error}; + (App) -> {ok, App} end), - ?assertMatch({error, _}, emqx_plugins:load_plugin(#plugin{name = already_loaded_app}, true)), - ?assertMatch(ok, emqx_plugins:load_plugin(#plugin{name = normal}, true)), - ?assertMatch({error,_}, emqx_plugins:load_plugin(#plugin{name = error_app}, true)), + ?assertMatch({error, _}, emqx_plugins:load_plugin(already_loaded_app, true)), + ?assertMatch(ok, emqx_plugins:load_plugin(normal, true)), + ?assertMatch({error,_}, emqx_plugins:load_plugin(error_app, true)), ok = meck:unload(application). t_unload_plugin(_) -> ok = meck:new(application, [unstick, non_strict, passthrough, no_history]), ok = meck:expect(application, stop, fun(not_started_app) -> {error, {not_started, not_started_app}}; - (error_app) -> {error, error}; - (_) -> ok end), + (error_app) -> {error, error}; + (_) -> ok end), ?assertEqual(ok, emqx_plugins:unload_plugin(not_started_app, true)), ?assertEqual(ok, emqx_plugins:unload_plugin(normal, true)), ?assertEqual({error,error}, emqx_plugins:unload_plugin(error_app, true)), ok = meck:unload(application). -