This commit is contained in:
Feng 2015-08-04 11:39:30 +08:00
parent 5f6b0fc624
commit f0109c7af7
10 changed files with 214 additions and 123 deletions

View File

@ -129,18 +129,6 @@
-type mqtt_message() :: #mqtt_message{}. -type mqtt_message() :: #mqtt_message{}.
%%------------------------------------------------------------------------------
%% MQTT Plugin
%%------------------------------------------------------------------------------
-record(mqtt_plugin, {
name,
version,
attrs,
description
}).
-type mqtt_plugin() :: #mqtt_plugin{}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% MQTT Alarm %% MQTT Alarm
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -154,3 +142,30 @@
-type mqtt_alarm() :: #mqtt_alarm{}. -type mqtt_alarm() :: #mqtt_alarm{}.
%%------------------------------------------------------------------------------
%% MQTT Plugin
%%------------------------------------------------------------------------------
-record(mqtt_plugin, {
name,
version,
descr,
active = false
}).
-type mqtt_plugin() :: #mqtt_plugin{}.
%%------------------------------------------------------------------------------
%% MQTT CLI Command
%% For example: 'broker metrics'
%%------------------------------------------------------------------------------
-record(mqtt_cli, {
name,
action,
args = [],
opts = [],
usage,
descr
}).
-type mqtt_cli() :: #mqtt_cli{}.

View File

@ -153,15 +153,20 @@
{modules, [ {modules, [
%% Client presence management module. %% Client presence management module.
%% Publish messages when client connected or disconnected %% Publish messages when client connected or disconnected
{presence, [{qos, 0}]}, {presence, [{qos, 0}]}
%% Subscribe topics automatically when client connected %% Subscribe topics automatically when client connected
{autosub, [{"$Q/client/$c", 0}]} %% {autosub, [{"$Q/client/$c", 0}]}
%% Rewrite rules %% Rewrite rules
%% {rewrite, [{file, "etc/rewrite.config"}]} %% {rewrite, [{file, "etc/rewrite.config"}]}
]}, ]},
%% Plugins
{plugins, [
{dir, "./plugins"},
{loaded_file, "./data/loaded_plugins"}
]},
%% Listeners %% Listeners
{listeners, [ {listeners, [
{mqtt, 1883, [ {mqtt, 1883, [

1
rel/files/loaded_plugins Normal file
View File

@ -0,0 +1 @@
[emqttd_dashboard].

View File

@ -88,5 +88,6 @@
{template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/rewrite.config", "etc/rewrite.config"},
{template, "files/clients.config", "etc/clients.config"}, {template, "files/clients.config", "etc/clients.config"},
{template, "files/plugins.config", "etc/plugins.config"}, {template, "files/plugins.config", "etc/plugins.config"},
{template, "files/vm.args", "etc/vm.args"} {template, "files/vm.args", "etc/vm.args"},
{copy, "files/loaded_plugins", "data/loaded_plugins"}
]}. ]}.

View File

@ -30,10 +30,7 @@
-export([start/0, env/1, env/2, -export([start/0, env/1, env/2,
open_listeners/1, close_listeners/1, open_listeners/1, close_listeners/1,
load_all_plugins/0, unload_all_plugins/0,
load_plugin/1, unload_plugin/1,
load_all_mods/0, is_mod_enabled/1, load_all_mods/0, is_mod_enabled/1,
loaded_plugins/0,
is_running/1]). is_running/1]).
-define(MQTT_SOCKOPTS, [ -define(MQTT_SOCKOPTS, [
@ -112,106 +109,6 @@ close_listeners(Listeners) when is_list(Listeners) ->
close_listener({Protocol, Port, _Options}) -> close_listener({Protocol, Port, _Options}) ->
esockd:close({Protocol, Port}). esockd:close({Protocol, Port}).
%%------------------------------------------------------------------------------
%% @doc Load all plugins
%% @end
%%------------------------------------------------------------------------------
-spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
load_all_plugins() ->
%% save first
case file:consult("etc/plugins.config") of
{ok, [PluginApps]} ->
application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]),
[{App, load_plugin(App)} || {App, _Env} <- PluginApps];
{error, enoent} ->
lager:error("etc/plugins.config not found!");
{error, Error} ->
lager:error("Load etc/plugins.config error: ~p", [Error])
end.
%%------------------------------------------------------------------------------
%% @doc Load plugin
%% @end
%%------------------------------------------------------------------------------
-spec load_plugin(App :: atom()) -> ok | {error, any()}.
load_plugin(App) ->
case load_app(App) of
ok ->
start_app(App);
{error, Reason} ->
{error, Reason}
end.
load_app(App) ->
case application:load(App) of
ok ->
lager:info("load plugin ~p successfully", [App]), ok;
{error, {already_loaded, App}} ->
lager:info("load plugin ~p is already loaded", [App]), ok;
{error, Reason} ->
lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
start_app(App) ->
case application:start(App) of
ok ->
lager:info("start plugin ~p successfully", [App]), ok;
{error, {already_started, App}} ->
lager:error("plugin ~p is already started", [App]), ok;
{error, Reason} ->
lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
%%------------------------------------------------------------------------------
%% @doc Loaded plugins
%% @end
%%------------------------------------------------------------------------------
loaded_plugins() ->
PluginApps = application:get_env(emqttd, plugins, []),
[App || App = {Name, _Descr, _Vsn} <- application:which_applications(),
lists:member(Name, PluginApps)].
%%------------------------------------------------------------------------------
%% @doc Unload all plugins
%% @end
%%------------------------------------------------------------------------------
-spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
unload_all_plugins() ->
PluginApps = application:get_env(emqttd, plugins, []),
[{App, unload_plugin(App)} || App <- PluginApps].
%%------------------------------------------------------------------------------
%% @doc Unload plugin
%% @end
%%------------------------------------------------------------------------------
-spec unload_plugin(App :: atom()) -> ok | {error, any()}.
unload_plugin(App) ->
case stop_app(App) of
ok ->
unload_app(App);
{error, Reason} ->
{error, Reason}
end.
stop_app(App) ->
case application:stop(App) of
ok ->
lager:info("stop plugin ~p successfully~n", [App]), ok;
{error, {not_started, App}} ->
lager:error("plugin ~p is not started~n", [App]), ok;
{error, Reason} ->
lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
end.
unload_app(App) ->
case application:unload(App) of
ok ->
lager:info("unload plugin ~p successfully~n", [App]), ok;
{error, {not_loaded, App}} ->
lager:info("load plugin ~p is not loaded~n", [App]), ok;
{error, Reason} ->
lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
load_all_mods() -> load_all_mods() ->
Mods = application:get_env(emqttd, modules, []), Mods = application:get_env(emqttd, modules, []),

View File

@ -52,7 +52,7 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = emqttd_sup:start_link(), {ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup), start_servers(Sup),
emqttd:load_all_mods(), emqttd:load_all_mods(),
emqttd:load_all_plugins(), %% emqttd:load_all_plugins(),
{ok, Listeners} = application:get_env(listeners), {ok, Listeners} = application:get_env(listeners),
emqttd:open_listeners(Listeners), emqttd:open_listeners(Listeners),
register(emqttd, self()), register(emqttd, self()),

View File

@ -68,7 +68,7 @@ status([]) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
cluster([]) -> cluster([]) ->
Nodes = [node()|nodes()], Nodes = emqttd_broker:running_nodes(),
?PRINT("cluster nodes: ~p~n", [Nodes]); ?PRINT("cluster nodes: ~p~n", [Nodes]);
cluster([SNode]) -> cluster([SNode]) ->
@ -78,7 +78,7 @@ cluster([SNode]) ->
case emqttd:is_running(Node) of case emqttd:is_running(Node) of
true -> true ->
%%TODO: should not unload here. %%TODO: should not unload here.
emqttd:unload_all_plugins(), %% emqttd:unload_all_plugins(),
application:stop(emqttd), application:stop(emqttd),
application:stop(esockd), application:stop(esockd),
application:stop(gproc), application:stop(gproc),

View File

@ -0,0 +1,172 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd plugin manager.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin_manager).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-export([start/0, list/0, load/1, unload/1, stop/0]).
start() ->
%% start all plugins
%%
ok.
%%------------------------------------------------------------------------------
%% @doc Load all plugins
%% @end
%%------------------------------------------------------------------------------
-spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
load_all_plugins() ->
%% save first
case file:consult("etc/plugins.config") of
{ok, [PluginApps]} ->
ok;
%% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]),
%% [{App, load_plugin(App)} || {App, _Env} <- PluginApps];
{error, enoent} ->
lager:error("etc/plugins.config not found!");
{error, Error} ->
lager:error("Load etc/plugins.config error: ~p", [Error])
end.
%%------------------------------------------------------------------------------
%% List all available plugins
%%------------------------------------------------------------------------------
list() ->
{ok, PluginEnv} = application:get_env(emqttd, plugins),
PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"),
AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir),
Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles],
StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()],
lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->
case lists:member(Name, StartedApps) of
true -> Plugin#mqtt_plugin{active = true};
false -> Plugin
end
end, Plugins).
plugin(AppFile) ->
{ok, [{application, Name, Attrs}]} = file:consult(AppFile),
Ver = proplists:get_value(vsn, Attrs),
Descr = proplists:get_value(description, Attrs, ""),
#mqtt_plugin{name = Name, version = Ver, descr = Descr}.
%%------------------------------------------------------------------------------
%% @doc Load Plugin
%% @end
%%------------------------------------------------------------------------------
-spec load(atom()) -> ok | {error, any()}.
load(PluginName) when is_atom(PluginName) ->
%% start plugin
%% write file if plugin is loaded
ok.
-spec load_plugin(App :: atom()) -> ok | {error, any()}.
load_plugin(App) ->
case load_app(App) of
ok ->
start_app(App);
{error, Reason} ->
{error, Reason}
end.
load_app(App) ->
case application:load(App) of
ok ->
lager:info("load plugin ~p successfully", [App]), ok;
{error, {already_loaded, App}} ->
lager:info("load plugin ~p is already loaded", [App]), ok;
{error, Reason} ->
lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
start_app(App) ->
case application:start(App) of
ok ->
lager:info("start plugin ~p successfully", [App]), ok;
{error, {already_started, App}} ->
lager:error("plugin ~p is already started", [App]), ok;
{error, Reason} ->
lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
%%------------------------------------------------------------------------------
%% @doc UnLoad Plugin
%% @end
%%------------------------------------------------------------------------------
-spec unload(atom()) -> ok | {error, any()}.
unload(PluginName) when is_atom(PluginName) ->
%% stop plugin
%% write file if plugin is loaded
ok.
-spec unload_plugin(App :: atom()) -> ok | {error, any()}.
unload_plugin(App) ->
case stop_app(App) of
ok ->
unload_app(App);
{error, Reason} ->
{error, Reason}
end.
stop_app(App) ->
case application:stop(App) of
ok ->
lager:info("stop plugin ~p successfully~n", [App]), ok;
{error, {not_started, App}} ->
lager:error("plugin ~p is not started~n", [App]), ok;
{error, Reason} ->
lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
end.
unload_app(App) ->
case application:unload(App) of
ok ->
lager:info("unload plugin ~p successfully~n", [App]), ok;
{error, {not_loaded, App}} ->
lager:info("load plugin ~p is not loaded~n", [App]), ok;
{error, Reason} ->
lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason}
end.
stop() ->
%% stop all plugins
ok.
%%------------------------------------------------------------------------------
%% @doc Unload all plugins
%% @end
%%------------------------------------------------------------------------------
-spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}].
unload_all_plugins() ->
PluginApps = application:get_env(emqttd, plugins, []).
%%[{App, unload_plugin(App)} || App <- PluginApps].

View File

@ -75,7 +75,7 @@ handle_call(_Req, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast({async_submit, Fun}, State) -> handle_cast({async_submit, Fun}, State) ->
run(Fun), try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p", [Error]) end,
{noreply, State}; {noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->

View File

@ -228,7 +228,7 @@ match(Topic) when is_binary(Topic) ->
%%%============================================================================= %%%=============================================================================
init([Id, _Opts]) -> init([Id, _Opts]) ->
process_flag(min_heap_size, 1024*1024), %%process_flag(min_heap_size, 1024*1024),
gproc_pool:connect_worker(pubsub, {?MODULE, Id}), gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
{ok, #state{id = Id, submap = maps:new()}}. {ok, #state{id = Id, submap = maps:new()}}.