diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src new file mode 100644 index 000000000..1fd8ce62d --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src @@ -0,0 +1,12 @@ +{application, emqttd_plugin_demo, + [ + {description, ""}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { emqttd_plugin_demo_app, []}}, + {env, []} + ]}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl new file mode 100644 index 000000000..9042a449a --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl @@ -0,0 +1,16 @@ +-module(emqttd_plugin_demo_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + emqttd_plugin_demo_sup:start_link(). + +stop(_State) -> + ok. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl new file mode 100644 index 000000000..65dad7a60 --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl @@ -0,0 +1,27 @@ +-module(emqttd_plugin_demo_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. + diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index e07a157c2..159062b16 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -52,9 +52,8 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), - %% emqttd:load_all_plugins(), - {ok, Listeners} = application:get_env(listeners), - emqttd:open_listeners(Listeners), + emqttd_plugins:load(), + start_listeners(), register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -67,6 +66,10 @@ print_vsn() -> {ok, Desc} = application:get_key(description), ?PRINT("~s ~s is running now~n", [Desc, Vsn]). +start_listeners() -> + {ok, Listeners} = application:get_env(listeners), + emqttd:open_listeners(Listeners). + start_servers(Sup) -> Servers = [{"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, @@ -131,14 +134,23 @@ worker_spec(Name, Opts) -> %% close all listeners first... prep_stop(State) -> - %%TODO: esockd app should be running... - {ok, Listeners} = application:get_env(listeners), - emqttd:close_listeners(Listeners), + stop_listeners(), + timer:sleep(2), + emqttd_plugins:unload(), timer:sleep(2), State. +stop_listeners() -> + %% ensure that esockd applications is started? + case lists:keyfind(esockd, 1, application:which_applications()) of + false -> + ignore; + _Tuple -> + {ok, Listeners} = application:get_env(listeners), + emqttd:close_listeners(Listeners) + end. + -spec stop(State :: term()) -> term(). stop(_State) -> ok. - diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index d7273e8a6..048af3431 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -77,8 +77,6 @@ cluster([SNode]) -> pong -> case emqttd:is_running(Node) of true -> - %%TODO: should not unload here. - %% emqttd:unload_all_plugins(), application:stop(emqttd), application:stop(esockd), application:stop(gproc), @@ -180,19 +178,20 @@ bridges(["stop", SNode, Topic]) -> end. plugins(["list"]) -> - Plugins = emqttd:loaded_plugins(), - lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins); + lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active]) + end, emqttd_plugins:list()); plugins(["load", Name]) -> - case emqttd:load_plugin(list_to_atom(Name)) of - ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) + case emqttd_plugins:load(list_to_atom(Name)) of + {ok, StartedApps} -> ?PRINT("start apps: ~p, plugin ~s is loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~s~n", [Reason]) end; plugins(["unload", Name]) -> - case emqttd:unload_plugin(list_to_atom(Name)) of + case emqttd_plugins:unload(list_to_atom(Name)) of ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) + {error, Reason} -> ?PRINT("unload plugin error: ~s~n", [Reason]) end. trace(["list"]) -> diff --git a/src/emqttd_plugin_manager.erl b/src/emqttd_plugin_manager.erl deleted file mode 100644 index de92029b5..000000000 --- a/src/emqttd_plugin_manager.erl +++ /dev/null @@ -1,172 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd plugin manager. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - --module(emqttd_plugin_manager). - --author("Feng Lee "). - --include("emqttd.hrl"). - --export([start/0, list/0, load/1, unload/1, stop/0]). - -start() -> - %% start all plugins - %% - ok. - -%%------------------------------------------------------------------------------ -%% @doc Load all plugins -%% @end -%%------------------------------------------------------------------------------ --spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -load_all_plugins() -> - %% save first - case file:consult("etc/plugins.config") of - {ok, [PluginApps]} -> - ok; - %% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), - %% [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; - {error, enoent} -> - lager:error("etc/plugins.config not found!"); - {error, Error} -> - lager:error("Load etc/plugins.config error: ~p", [Error]) - end. - -%%------------------------------------------------------------------------------ -%% List all available plugins -%%------------------------------------------------------------------------------ -list() -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"), - AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), - Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], - StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()], - lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> - case lists:member(Name, StartedApps) of - true -> Plugin#mqtt_plugin{active = true}; - false -> Plugin - end - end, Plugins). - -plugin(AppFile) -> - {ok, [{application, Name, Attrs}]} = file:consult(AppFile), - Ver = proplists:get_value(vsn, Attrs), - Descr = proplists:get_value(description, Attrs, ""), - #mqtt_plugin{name = Name, version = Ver, descr = Descr}. - -%%------------------------------------------------------------------------------ -%% @doc Load Plugin -%% @end -%%------------------------------------------------------------------------------ --spec load(atom()) -> ok | {error, any()}. -load(PluginName) when is_atom(PluginName) -> - %% start plugin - %% write file if plugin is loaded - ok. - --spec load_plugin(App :: atom()) -> ok | {error, any()}. -load_plugin(App) -> - case load_app(App) of - ok -> - start_app(App); - {error, Reason} -> - {error, Reason} - end. - -load_app(App) -> - case application:load(App) of - ok -> - lager:info("load plugin ~p successfully", [App]), ok; - {error, {already_loaded, App}} -> - lager:info("load plugin ~p is already loaded", [App]), ok; - {error, Reason} -> - lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -start_app(App) -> - case application:start(App) of - ok -> - lager:info("start plugin ~p successfully", [App]), ok; - {error, {already_started, App}} -> - lager:error("plugin ~p is already started", [App]), ok; - {error, Reason} -> - lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - - -%%------------------------------------------------------------------------------ -%% @doc UnLoad Plugin -%% @end -%%------------------------------------------------------------------------------ --spec unload(atom()) -> ok | {error, any()}. -unload(PluginName) when is_atom(PluginName) -> - %% stop plugin - %% write file if plugin is loaded - ok. - --spec unload_plugin(App :: atom()) -> ok | {error, any()}. -unload_plugin(App) -> - case stop_app(App) of - ok -> - unload_app(App); - {error, Reason} -> - {error, Reason} - end. - -stop_app(App) -> - case application:stop(App) of - ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; - {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; - {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} - end. - -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -stop() -> - %% stop all plugins - ok. - -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ --spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -unload_all_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []). - %%[{App, unload_plugin(App)} || App <- PluginApps]. - diff --git a/src/emqttd_plugin_mgr.erl b/src/emqttd_plugins.erl similarity index 67% rename from src/emqttd_plugin_mgr.erl rename to src/emqttd_plugins.erl index 55216f7c2..4372ca3c7 100644 --- a/src/emqttd_plugin_mgr.erl +++ b/src/emqttd_plugins.erl @@ -20,53 +20,75 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd plugin manager. +%%% emqttd plugin admin. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_plugin_mgr). +-module(emqttd_plugins). -author("Feng Lee "). -include("emqttd.hrl"). --export([start/0, list/0, load/1, unload/1, stop/0]). +-export([load/0, unload/0]). + +-export([list/0, load/1, unload/1]). %%------------------------------------------------------------------------------ -%% @doc Load all plugins +%% @doc Load all plugins when the broker started. %% @end %%------------------------------------------------------------------------------ --spec start() -> ok | {error, any()}. -start() -> +-spec load() -> list() | {error, any()}. +load() -> case read_loaded() of - {ok, AppNames} -> - NotFound = AppNames -- apps(plugin), + {ok, LoadNames} -> + NotFound = LoadNames -- apps(plugin), case NotFound of [] -> ok; NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) end, - {ok, start_apps(AppNames -- NotFound -- apps(started))}; + start_apps(LoadNames -- NotFound -- apps(started)); {error, Error} -> lager:error("Read loaded_plugins file error: ~p", [Error]), {error, Error} end. +start_apps(Apps) -> + [start_app(App) || App <- Apps]. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins before broker stopped. +%% @end +%%------------------------------------------------------------------------------ +-spec unload() -> list() | {error, any()}. +unload() -> + case read_loaded() of + {ok, LoadNames} -> + stop_apps(LoadNames); + {error, Error} -> + lager:error("Read loaded_plugins file error: ~p", [Error]), + {error, Error} + end. + +stop_apps(Apps) -> + [stop_app(App) || App <- Apps]. + %%------------------------------------------------------------------------------ %% @doc List all available plugins %% @end %%------------------------------------------------------------------------------ +-spec list() -> [mqtt_plugin()]. list() -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"), + PluginsDir = env(dir), AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], - StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()], + StartedApps = apps(started), lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> - case lists:member(Name, StartedApps) of - true -> Plugin#mqtt_plugin{active = true}; - false -> Plugin - end + case lists:member(Name, StartedApps) of + true -> Plugin#mqtt_plugin{active = true}; + false -> Plugin + end end, Plugins). plugin(AppFile) -> @@ -76,23 +98,20 @@ plugin(AppFile) -> #mqtt_plugin{name = Name, version = Ver, descr = Descr}. %%------------------------------------------------------------------------------ -%% @doc Load Plugin +%% @doc Load One Plugin %% @end %%------------------------------------------------------------------------------ -spec load(atom()) -> ok | {error, any()}. load(PluginName) when is_atom(PluginName) -> - case lists:member(PluginName, apps(started)) of - true -> - lager:info("plugin ~p is started", [PluginName]), + case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of + {true, _} -> + lager:error("plugin ~p is started", [PluginName]), {error, already_started}; - false -> - case lists:member(PluginName, apps(plugin)) of - true -> - load_plugin(PluginName); - false -> - lager:info("plugin ~p is not found", [PluginName]), - {error, not_foun} - end + {false, true} -> + load_plugin(PluginName); + {false, false} -> + lager:error("plugin ~p is not found", [PluginName]), + {error, not_found} end. -spec load_plugin(App :: atom()) -> {ok, list()} | {error, any()}. @@ -116,20 +135,27 @@ start_app(App) -> end. %%------------------------------------------------------------------------------ -%% @doc UnLoad Plugin +%% @doc UnLoad One Plugin %% @end %%------------------------------------------------------------------------------ -spec unload(atom()) -> ok | {error, any()}. unload(PluginName) when is_atom(PluginName) -> - %% stop plugin - %% write file if plugin is loaded - ok. + case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of + {false, _} -> + lager:error("plugin ~p is not started", [PluginName]), + {error, not_started}; + {true, true} -> + unload_plugin(PluginName); + {true, false} -> + lager:error("~s is not a plugin, cannot unload it", [PluginName]), + {error, not_found} + end. -spec unload_plugin(App :: atom()) -> ok | {error, any()}. unload_plugin(App) -> case stop_app(App) of ok -> - unload_app(App); + plugin_unloaded(App), ok; {error, Reason} -> {error, Reason} end. @@ -144,32 +170,10 @@ stop_app(App) -> lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} end. -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -stop() -> - %% stop all plugins - PluginApps = application:get_env(emqttd, plugins, []), - %%[{App, unload_plugin(App)} || App <- PluginApps]. - ok. - %%%============================================================================= %%% Internal functions %%%============================================================================= -start_apps(Apps) -> - [start_app(App) || App <- Apps]. - -stop_apps(Apps) -> - [stop_app(App) || App <- Apps]. - apps(plugin) -> [Name || #mqtt_plugin{name = Name} <- list()]; @@ -181,7 +185,7 @@ plugin_loaded(Name) -> {ok, Names} -> case lists:member(Name, Names) of true -> - ok; + ignore; false -> %% write file if plugin is loaded write_loaded(lists:append(Names, Name)) @@ -190,17 +194,24 @@ plugin_loaded(Name) -> lager:error("Cannot read loaded plugins: ~p", [Error]) end. - +plugin_unloaded(Name) -> + case read_loaded() of + {ok, Names} -> + case lists:member(Name, Names) of + true -> + write_loaded(lists:delete(Name, Names)); + false -> + lager:error("Cannot find ~s in loaded_file", [Name]) + end; + {error, Error} -> + lager:error("Cannot read loaded plugins: ~p", [Error]) + end. read_loaded() -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"), - file:consult(LoadedFile). + file:consult(env(loaded_file)). write_loaded(AppNames) -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"), - case file:open(LoadedFile, [binary, write]) of + case file:open(env(loaded_file), [binary, write]) of {ok, Fd} -> Line = list_to_binary(io_lib:format("~w.~n", [AppNames])), file:write(Fd, Line); @@ -208,3 +219,12 @@ write_loaded(AppNames) -> {error, Error} end. +env(dir) -> + proplists:get_value(dir, env(), "./plugins"); + +env(loaded_file) -> + proplists:get_value(loaded_file, env(), "./data/loaded_plugins"). + +env() -> + {ok, PluginsEnv} = application:get_env(emqttd, plugins), PluginsEnv. +