rm emqttd_plugin_manager, emqttd.erl to support plugins management

This commit is contained in:
Feng Lee 2015-04-23 00:49:53 +08:00
parent 74217bf9ea
commit cec8ab6b4a
5 changed files with 127 additions and 182 deletions

View File

@ -28,7 +28,12 @@
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-export([start/0, open/1, close/1, is_running/1]). -export([start/0,
open_listeners/1, close_listeners/1,
load_all_plugins/0, unload_all_plugins/0,
load_plugin/1, unload_plugin/1,
loaded_plugins/0,
is_running/1]).
-define(MQTT_SOCKOPTS, [ -define(MQTT_SOCKOPTS, [
binary, binary,
@ -52,24 +57,24 @@ start() ->
%% @doc Open Listeners %% @doc Open Listeners
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec open([listener()] | listener()) -> any(). -spec open_listeners([listener()]) -> any().
open(Listeners) when is_list(Listeners) -> open_listeners(Listeners) when is_list(Listeners) ->
[open(Listener) || Listener <- Listeners]; [open_listener(Listener) || Listener <- Listeners].
%% open mqtt port %% open mqtt port
open({mqtt, Port, Options}) -> open_listener({mqtt, Port, Options}) ->
open(mqtt, Port, Options); open_listener(mqtt, Port, Options);
%% open mqtt(SSL) port %% open mqtt(SSL) port
open({mqtts, Port, Options}) -> open_listener({mqtts, Port, Options}) ->
open(mqtts, Port, Options); open_listener(mqtts, Port, Options);
%% open http port %% open http port
open({http, Port, Options}) -> open_listener({http, Port, Options}) ->
MFArgs = {emqttd_http, handle, []}, MFArgs = {emqttd_http, handle, []},
mochiweb:start_http(Port, Options, MFArgs). mochiweb:start_http(Port, Options, MFArgs).
open(Protocol, Port, Options) -> open_listener(Protocol, Port, Options) ->
{ok, PktOpts} = application:get_env(emqttd, mqtt_packet), {ok, PktOpts} = application:get_env(emqttd, mqtt_packet),
MFArgs = {emqttd_client, start_link, [PktOpts]}, MFArgs = {emqttd_client, start_link, [PktOpts]},
esockd:open(Protocol, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs). esockd:open(Protocol, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs).
@ -78,13 +83,114 @@ open(Protocol, Port, Options) ->
%% @doc Close Listeners %% @doc Close Listeners
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec close([listener()] | listener()) -> any(). -spec close_listeners([listener()]) -> any().
close(Listeners) when is_list(Listeners) -> close_listeners(Listeners) when is_list(Listeners) ->
[close(Listener) || Listener <- Listeners]; [close_listener(Listener) || Listener <- Listeners].
close({Protocol, Port, _Options}) -> close_listener({Protocol, Port, _Options}) ->
esockd:close({Protocol, Port}). esockd:close({Protocol, Port}).
%%------------------------------------------------------------------------------
%% @doc Load all plugins
%% @end
%%------------------------------------------------------------------------------
-spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
load_all_plugins() ->
%% save first
{ok, [PluginApps]} = file:consult("etc/plugins.config"),
application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]),
[{App, load_plugin(App)} || {App, _Env} <- PluginApps].
%%------------------------------------------------------------------------------
%% @doc Load plugin
%% @end
%%------------------------------------------------------------------------------
-spec load_plugin(App :: atom()) -> ok | {error, any()}.
load_plugin(App) ->
case load_app(App) of
ok ->
start_app(App);
{error, Reason} ->
{error, Reason}
end.
load_app(App) ->
case application:load(App) of
ok ->
lager:info("load plugin ~p successfully", [App]), ok;
{error, {already_loaded, App}} ->
lager:info("load plugin ~p is already loaded", [App]), ok;
{error, Reason} ->
lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
start_app(App) ->
case application:start(App) of
ok ->
lager:info("start plugin ~p successfully", [App]), ok;
{error, {already_started, App}} ->
lager:error("plugin ~p is already started", [App]), ok;
{error, Reason} ->
lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
%%------------------------------------------------------------------------------
%% @doc Loaded plugins
%% @end
%%------------------------------------------------------------------------------
loaded_plugins() ->
PluginApps = application:get_env(emqttd, plugins, []),
[App || App = {Name, _Descr, _Vsn} <- application:which_applications(),
lists:member(Name, PluginApps)].
%%------------------------------------------------------------------------------
%% @doc Unload all plugins
%% @end
%%------------------------------------------------------------------------------
-spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
unload_all_plugins() ->
PluginApps = application:get_env(emqttd, plugins, []),
[{App, unload_plugin(App)} || {App, _Env} <- PluginApps].
%%------------------------------------------------------------------------------
%% @doc Unload plugin
%% @end
%%------------------------------------------------------------------------------
-spec unload_plugin(App :: atom()) -> ok | {error, any()}.
unload_plugin(App) ->
case stop_app(App) of
ok ->
unload_app(App);
{error, Reason} ->
{error, Reason}
end.
stop_app(App) ->
case application:stop(App) of
ok ->
lager:info("stop plugin ~p successfully~n", [App]), ok;
{error, {not_started, App}} ->
lager:error("plugin ~p is not started~n", [App]), ok;
{error, Reason} ->
lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
end.
unload_app(App) ->
case application:unload(App) of
ok ->
lager:info("unload plugin ~p successfully~n", [App]), ok;
{error, {not_loaded, App}} ->
lager:info("load plugin ~p is not loaded~n", [App]), ok;
{error, Reason} ->
lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
%%------------------------------------------------------------------------------
%% @doc Is running?
%% @end
%%------------------------------------------------------------------------------
is_running(Node) -> is_running(Node) ->
case rpc:call(Node, erlang, whereis, [emqttd]) of case rpc:call(Node, erlang, whereis, [emqttd]) of
{badrpc, _} -> false; {badrpc, _} -> false;

View File

@ -52,8 +52,8 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = emqttd_sup:start_link(), {ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup), start_servers(Sup),
{ok, Listeners} = application:get_env(listeners), {ok, Listeners} = application:get_env(listeners),
emqttd_plugin_manager:load_all_plugins(), emqttd:load_all_plugins(),
emqttd:open(Listeners), emqttd:open_listeners(Listeners),
register(emqttd, self()), register(emqttd, self()),
print_vsn(), print_vsn(),
{ok, Sup}. {ok, Sup}.
@ -137,7 +137,7 @@ worker_spec(Name, Opts) ->
-spec stop(State :: term()) -> term(). -spec stop(State :: term()) -> term().
stop(_State) -> stop(_State) ->
{ok, Listeners} = application:get_env(listeners), {ok, Listeners} = application:get_env(listeners),
emqttd:close(Listeners), emqttd:close_listeners(Listeners),
emqttd_plugin_manager:unload_all_plugins(), emqttd:unload_all_plugins(),
ok. ok.

View File

@ -155,17 +155,17 @@ bridges(["stop", SNode, Topic]) ->
end. end.
plugins(["list"]) -> plugins(["list"]) ->
Plugins = emqttd_plugin_manager:loaded_plugins(), Plugins = emqttd:loaded_plugins(),
lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins); lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins);
plugins(["load", Name]) -> plugins(["load", Name]) ->
case emqttd_plugin_manager:load_plugin(list_to_atom(Name)) of case emqttd:load_plugin(list_to_atom(Name)) of
ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]); ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]);
{error, Reason} -> ?PRINT("error: ~s~n", [Reason]) {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
end; end;
plugins(["unload", Name]) -> plugins(["unload", Name]) ->
case emqttd_plugin_manager:unload_plugin(list_to_atom(Name)) of case emqttd:unload_plugin(list_to_atom(Name)) of
ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]); ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]);
{error, Reason} -> ?PRINT("error: ~s~n", [Reason]) {error, Reason} -> ?PRINT("error: ~s~n", [Reason])
end. end.

View File

@ -1,156 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd plugin manager.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin_manager).
%%TODO: rewrite this module later...
-author("Feng Lee <feng@emqtt.io>").
-export([load_all_plugins/0, unload_all_plugins/0]).
-export([load_plugin/1, unload_plugin/1]).
-export([loaded_plugins/0]).
%%------------------------------------------------------------------------------
%% @doc Load all plugins
%% @end
%%------------------------------------------------------------------------------
load_all_plugins() ->
{ok, [PluginApps]} = file:consult("etc/plugins.config"),
LoadedPlugins = load_plugins(PluginApps),
RunningPlugins = start_plugins(lists:reverse(LoadedPlugins)),
%% save to application env?
application:set_env(emqttd, loaded_plugins, lists:reverse(RunningPlugins)).
load_plugins(PluginApps) ->
lists:foldl(fun({App, _Env}, Acc) ->
case application:load(App) of
ok ->
io:format("load plugin ~p successfully~n", [App]),
[App | Acc];
{error, {already_loaded, App}} ->
io:format("load plugin ~p is already loaded~n", [App]),
[App | Acc];
{error, Reason} ->
io:format("load plugin ~p error: ~p~n", [App, Reason]),
Acc
end
end, [], PluginApps).
start_plugins(PluginApps) ->
lists:foldl(fun(App, Acc) ->
case application:start(App) of
ok ->
io:format("start plugin ~p successfully~n", [App]),
[App | Acc];
{error, {already_started, App}} ->
io:format("plugin ~p is already started~n", [App]),
[App | Acc];
{error, Reason} ->
io:format("start plugin ~p error: ~p~n", [App, Reason]),
Acc
end
end, [], PluginApps).
%%------------------------------------------------------------------------------
%% @doc Loaded plugins
%% @end
%%------------------------------------------------------------------------------
loaded_plugins() ->
LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []),
lager:info("loaded plugins: ~p", [LoadedPluginApps]),
[App || App = {Name, _Descr, _Vsn} <- application:which_applications(),
lists:member(Name, LoadedPluginApps)].
%%------------------------------------------------------------------------------
%% @doc Unload all plugins
%% @end
%%------------------------------------------------------------------------------
unload_all_plugins() ->
LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []),
StoppedApps = stop_plugins(lists:reverse(LoadedPluginApps)),
UnloadedApps = unload_plugins(lists:reverse(StoppedApps)),
case LoadedPluginApps -- UnloadedApps of
[] ->
application:unset_env(emqttd, loaded_plugins);
LeftApps ->
lager:error("cannot unload plugins: ~p", [LeftApps]),
application:set_env(emqttd, loaded_plugins, LeftApps)
end.
stop_plugins(PluginApps) ->
lists:foldl(fun(App, Acc) ->
case application:stop(App) of
ok ->
io:format("stop plugin ~p successfully~n", [App]),
[App | Acc];
{error, {not_started, App}} ->
io:format("plugin ~p is not started~n", [App]),
[App | Acc];
{error, Reason} ->
io:format("stop plugin ~p error: ~p~n", [App, Reason]),
Acc
end
end, [], PluginApps).
unload_plugins(PluginApps) ->
lists:foldl(fun({App, _Env}, Acc) ->
case application:unload(App) of
ok ->
io:format("unload plugin ~p successfully~n", [App]),
[App | Acc];
{error, {not_loaded, App}} ->
io:format("load plugin ~p is not loaded~n", [App]),
[App | Acc];
{error, Reason} ->
io:format("unload plugin ~p error: ~p~n", [App, Reason]),
Acc
end
end, [], PluginApps).
%%------------------------------------------------------------------------------
%% @doc Load Plugin
%% @end
%%------------------------------------------------------------------------------
load_plugin(Name) when is_atom(Name) ->
%% load app
%% start app
%% set env...
application:start(Name).
%%------------------------------------------------------------------------------
%% @doc Unload Plugin
%% @end
%%------------------------------------------------------------------------------
unload_plugin(Name) when is_atom(Name) ->
%% stop app
%% unload app
%% set env
application:stop(Name),
application:unload(Name).

View File

@ -112,11 +112,6 @@
{max_clients, 512}, {max_clients, 512},
{access, [{allow, "127.0.0.1"}]} {access, [{allow, "127.0.0.1"}]}
]} ]}
]},
% Plugins
{plugins, [
{emqttd_auth_ldap, [ldap_params]},
{emqttd_auth_mysql, [mysql_params]}
]} ]}
]} ]}
]. ].