diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index b8a3e1e47..e44c1f462 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -28,7 +28,12 @@ -author("Feng Lee "). --export([start/0, open/1, close/1, is_running/1]). +-export([start/0, + open_listeners/1, close_listeners/1, + load_all_plugins/0, unload_all_plugins/0, + load_plugin/1, unload_plugin/1, + loaded_plugins/0, + is_running/1]). -define(MQTT_SOCKOPTS, [ binary, @@ -52,24 +57,24 @@ start() -> %% @doc Open Listeners %% @end %%------------------------------------------------------------------------------ --spec open([listener()] | listener()) -> any(). -open(Listeners) when is_list(Listeners) -> - [open(Listener) || Listener <- Listeners]; +-spec open_listeners([listener()]) -> any(). +open_listeners(Listeners) when is_list(Listeners) -> + [open_listener(Listener) || Listener <- Listeners]. %% open mqtt port -open({mqtt, Port, Options}) -> - open(mqtt, Port, Options); +open_listener({mqtt, Port, Options}) -> + open_listener(mqtt, Port, Options); %% open mqtt(SSL) port -open({mqtts, Port, Options}) -> - open(mqtts, Port, Options); +open_listener({mqtts, Port, Options}) -> + open_listener(mqtts, Port, Options); %% open http port -open({http, Port, Options}) -> +open_listener({http, Port, Options}) -> MFArgs = {emqttd_http, handle, []}, mochiweb:start_http(Port, Options, MFArgs). -open(Protocol, Port, Options) -> +open_listener(Protocol, Port, Options) -> {ok, PktOpts} = application:get_env(emqttd, mqtt_packet), MFArgs = {emqttd_client, start_link, [PktOpts]}, esockd:open(Protocol, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs). @@ -78,13 +83,114 @@ open(Protocol, Port, Options) -> %% @doc Close Listeners %% @end %%------------------------------------------------------------------------------ --spec close([listener()] | listener()) -> any(). -close(Listeners) when is_list(Listeners) -> - [close(Listener) || Listener <- Listeners]; +-spec close_listeners([listener()]) -> any(). +close_listeners(Listeners) when is_list(Listeners) -> + [close_listener(Listener) || Listener <- Listeners]. -close({Protocol, Port, _Options}) -> +close_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). +%%------------------------------------------------------------------------------ +%% @doc Load all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +load_all_plugins() -> + %% save first + {ok, [PluginApps]} = file:consult("etc/plugins.config"), + application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), + [{App, load_plugin(App)} || {App, _Env} <- PluginApps]. + +%%------------------------------------------------------------------------------ +%% @doc Load plugin +%% @end +%%------------------------------------------------------------------------------ +-spec load_plugin(App :: atom()) -> ok | {error, any()}. +load_plugin(App) -> + case load_app(App) of + ok -> + start_app(App); + {error, Reason} -> + {error, Reason} + end. + +load_app(App) -> + case application:load(App) of + ok -> + lager:info("load plugin ~p successfully", [App]), ok; + {error, {already_loaded, App}} -> + lager:info("load plugin ~p is already loaded", [App]), ok; + {error, Reason} -> + lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +start_app(App) -> + case application:start(App) of + ok -> + lager:info("start plugin ~p successfully", [App]), ok; + {error, {already_started, App}} -> + lager:error("plugin ~p is already started", [App]), ok; + {error, Reason} -> + lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +%%------------------------------------------------------------------------------ +%% @doc Loaded plugins +%% @end +%%------------------------------------------------------------------------------ +loaded_plugins() -> + PluginApps = application:get_env(emqttd, plugins, []), + [App || App = {Name, _Descr, _Vsn} <- application:which_applications(), + lists:member(Name, PluginApps)]. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +unload_all_plugins() -> + PluginApps = application:get_env(emqttd, plugins, []), + [{App, unload_plugin(App)} || {App, _Env} <- PluginApps]. + + +%%------------------------------------------------------------------------------ +%% @doc Unload plugin +%% @end +%%------------------------------------------------------------------------------ +-spec unload_plugin(App :: atom()) -> ok | {error, any()}. +unload_plugin(App) -> + case stop_app(App) of + ok -> + unload_app(App); + {error, Reason} -> + {error, Reason} + end. + +stop_app(App) -> + case application:stop(App) of + ok -> + lager:info("stop plugin ~p successfully~n", [App]), ok; + {error, {not_started, App}} -> + lager:error("plugin ~p is not started~n", [App]), ok; + {error, Reason} -> + lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} + end. + +unload_app(App) -> + case application:unload(App) of + ok -> + lager:info("unload plugin ~p successfully~n", [App]), ok; + {error, {not_loaded, App}} -> + lager:info("load plugin ~p is not loaded~n", [App]), ok; + {error, Reason} -> + lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + + +%%------------------------------------------------------------------------------ +%% @doc Is running? +%% @end +%%------------------------------------------------------------------------------ is_running(Node) -> case rpc:call(Node, erlang, whereis, [emqttd]) of {badrpc, _} -> false; diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 873ae6fed..28b430f93 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -52,8 +52,8 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), {ok, Listeners} = application:get_env(listeners), - emqttd_plugin_manager:load_all_plugins(), - emqttd:open(Listeners), + emqttd:load_all_plugins(), + emqttd:open_listeners(Listeners), register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -137,7 +137,7 @@ worker_spec(Name, Opts) -> -spec stop(State :: term()) -> term(). stop(_State) -> {ok, Listeners} = application:get_env(listeners), - emqttd:close(Listeners), - emqttd_plugin_manager:unload_all_plugins(), + emqttd:close_listeners(Listeners), + emqttd:unload_all_plugins(), ok. diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index f16344dcc..0da5003b9 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -155,17 +155,17 @@ bridges(["stop", SNode, Topic]) -> end. plugins(["list"]) -> - Plugins = emqttd_plugin_manager:loaded_plugins(), + Plugins = emqttd:loaded_plugins(), lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins); plugins(["load", Name]) -> - case emqttd_plugin_manager:load_plugin(list_to_atom(Name)) of + case emqttd:load_plugin(list_to_atom(Name)) of ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]); {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) end; plugins(["unload", Name]) -> - case emqttd_plugin_manager:unload_plugin(list_to_atom(Name)) of + case emqttd:unload_plugin(list_to_atom(Name)) of ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]); {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) end. diff --git a/apps/emqttd/src/emqttd_plugin_manager.erl b/apps/emqttd/src/emqttd_plugin_manager.erl deleted file mode 100644 index 268d82870..000000000 --- a/apps/emqttd/src/emqttd_plugin_manager.erl +++ /dev/null @@ -1,156 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd plugin manager. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_plugin_manager). - -%%TODO: rewrite this module later... - --author("Feng Lee "). - --export([load_all_plugins/0, unload_all_plugins/0]). - --export([load_plugin/1, unload_plugin/1]). - --export([loaded_plugins/0]). - -%%------------------------------------------------------------------------------ -%% @doc Load all plugins -%% @end -%%------------------------------------------------------------------------------ -load_all_plugins() -> - {ok, [PluginApps]} = file:consult("etc/plugins.config"), - LoadedPlugins = load_plugins(PluginApps), - RunningPlugins = start_plugins(lists:reverse(LoadedPlugins)), - %% save to application env? - application:set_env(emqttd, loaded_plugins, lists:reverse(RunningPlugins)). - -load_plugins(PluginApps) -> - lists:foldl(fun({App, _Env}, Acc) -> - case application:load(App) of - ok -> - io:format("load plugin ~p successfully~n", [App]), - [App | Acc]; - {error, {already_loaded, App}} -> - io:format("load plugin ~p is already loaded~n", [App]), - [App | Acc]; - {error, Reason} -> - io:format("load plugin ~p error: ~p~n", [App, Reason]), - Acc - end - end, [], PluginApps). - -start_plugins(PluginApps) -> - lists:foldl(fun(App, Acc) -> - case application:start(App) of - ok -> - io:format("start plugin ~p successfully~n", [App]), - [App | Acc]; - {error, {already_started, App}} -> - io:format("plugin ~p is already started~n", [App]), - [App | Acc]; - {error, Reason} -> - io:format("start plugin ~p error: ~p~n", [App, Reason]), - Acc - end - end, [], PluginApps). - -%%------------------------------------------------------------------------------ -%% @doc Loaded plugins -%% @end -%%------------------------------------------------------------------------------ -loaded_plugins() -> - LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []), - lager:info("loaded plugins: ~p", [LoadedPluginApps]), - [App || App = {Name, _Descr, _Vsn} <- application:which_applications(), - lists:member(Name, LoadedPluginApps)]. - -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ -unload_all_plugins() -> - LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []), - StoppedApps = stop_plugins(lists:reverse(LoadedPluginApps)), - UnloadedApps = unload_plugins(lists:reverse(StoppedApps)), - case LoadedPluginApps -- UnloadedApps of - [] -> - application:unset_env(emqttd, loaded_plugins); - LeftApps -> - lager:error("cannot unload plugins: ~p", [LeftApps]), - application:set_env(emqttd, loaded_plugins, LeftApps) - end. - -stop_plugins(PluginApps) -> - lists:foldl(fun(App, Acc) -> - case application:stop(App) of - ok -> - io:format("stop plugin ~p successfully~n", [App]), - [App | Acc]; - {error, {not_started, App}} -> - io:format("plugin ~p is not started~n", [App]), - [App | Acc]; - {error, Reason} -> - io:format("stop plugin ~p error: ~p~n", [App, Reason]), - Acc - end - end, [], PluginApps). - -unload_plugins(PluginApps) -> - lists:foldl(fun({App, _Env}, Acc) -> - case application:unload(App) of - ok -> - io:format("unload plugin ~p successfully~n", [App]), - [App | Acc]; - {error, {not_loaded, App}} -> - io:format("load plugin ~p is not loaded~n", [App]), - [App | Acc]; - {error, Reason} -> - io:format("unload plugin ~p error: ~p~n", [App, Reason]), - Acc - end - end, [], PluginApps). - -%%------------------------------------------------------------------------------ -%% @doc Load Plugin -%% @end -%%------------------------------------------------------------------------------ -load_plugin(Name) when is_atom(Name) -> - %% load app - %% start app - %% set env... - application:start(Name). - -%%------------------------------------------------------------------------------ -%% @doc Unload Plugin -%% @end -%%------------------------------------------------------------------------------ -unload_plugin(Name) when is_atom(Name) -> - %% stop app - %% unload app - %% set env - application:stop(Name), - application:unload(Name). - diff --git a/rel/files/app.config b/rel/files/app.config index c46c2034c..33c2b5868 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -112,11 +112,6 @@ {max_clients, 512}, {access, [{allow, "127.0.0.1"}]} ]} - ]}, - % Plugins - {plugins, [ - {emqttd_auth_ldap, [ldap_params]}, - {emqttd_auth_mysql, [mysql_params]} ]} ]} ].