From f0109c7af7b009e7061d705dde1e6235f1676961 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 4 Aug 2015 11:39:30 +0800 Subject: [PATCH] plugin --- include/emqttd.hrl | 39 +++++--- rel/files/emqttd.config | 9 +- rel/files/loaded_plugins | 1 + rel/reltool.config | 3 +- src/emqttd.erl | 103 -------------------- src/emqttd_app.erl | 2 +- src/emqttd_ctl.erl | 4 +- src/emqttd_plugin_manager.erl | 172 ++++++++++++++++++++++++++++++++++ src/emqttd_pooler.erl | 2 +- src/emqttd_pubsub.erl | 2 +- 10 files changed, 214 insertions(+), 123 deletions(-) create mode 100644 rel/files/loaded_plugins create mode 100644 src/emqttd_plugin_manager.erl diff --git a/include/emqttd.hrl b/include/emqttd.hrl index d142fdd7b..fc5ade8e8 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -129,18 +129,6 @@ -type mqtt_message() :: #mqtt_message{}. -%%------------------------------------------------------------------------------ -%% MQTT Plugin -%%------------------------------------------------------------------------------ --record(mqtt_plugin, { - name, - version, - attrs, - description -}). - --type mqtt_plugin() :: #mqtt_plugin{}. - %%------------------------------------------------------------------------------ %% MQTT Alarm %%------------------------------------------------------------------------------ @@ -154,3 +142,30 @@ -type mqtt_alarm() :: #mqtt_alarm{}. +%%------------------------------------------------------------------------------ +%% MQTT Plugin +%%------------------------------------------------------------------------------ +-record(mqtt_plugin, { + name, + version, + descr, + active = false +}). + +-type mqtt_plugin() :: #mqtt_plugin{}. + +%%------------------------------------------------------------------------------ +%% MQTT CLI Command +%% For example: 'broker metrics' +%%------------------------------------------------------------------------------ +-record(mqtt_cli, { + name, + action, + args = [], + opts = [], + usage, + descr +}). + +-type mqtt_cli() :: #mqtt_cli{}. + diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index ab252b9a0..7ff3e49ba 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -153,15 +153,20 @@ {modules, [ %% Client presence management module. %% Publish messages when client connected or disconnected - {presence, [{qos, 0}]}, + {presence, [{qos, 0}]} %% Subscribe topics automatically when client connected - {autosub, [{"$Q/client/$c", 0}]} + %% {autosub, [{"$Q/client/$c", 0}]} %% Rewrite rules %% {rewrite, [{file, "etc/rewrite.config"}]} ]}, + %% Plugins + {plugins, [ + {dir, "./plugins"}, + {loaded_file, "./data/loaded_plugins"} + ]}, %% Listeners {listeners, [ {mqtt, 1883, [ diff --git a/rel/files/loaded_plugins b/rel/files/loaded_plugins new file mode 100644 index 000000000..bf38a149a --- /dev/null +++ b/rel/files/loaded_plugins @@ -0,0 +1 @@ +[emqttd_dashboard]. diff --git a/rel/reltool.config b/rel/reltool.config index 42e4755ef..768b793f0 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -88,5 +88,6 @@ {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, - {template, "files/vm.args", "etc/vm.args"} + {template, "files/vm.args", "etc/vm.args"}, + {copy, "files/loaded_plugins", "data/loaded_plugins"} ]}. diff --git a/src/emqttd.erl b/src/emqttd.erl index 074e8f3e2..e48aee560 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -30,10 +30,7 @@ -export([start/0, env/1, env/2, open_listeners/1, close_listeners/1, - load_all_plugins/0, unload_all_plugins/0, - load_plugin/1, unload_plugin/1, load_all_mods/0, is_mod_enabled/1, - loaded_plugins/0, is_running/1]). -define(MQTT_SOCKOPTS, [ @@ -112,106 +109,6 @@ close_listeners(Listeners) when is_list(Listeners) -> close_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). -%%------------------------------------------------------------------------------ -%% @doc Load all plugins -%% @end -%%------------------------------------------------------------------------------ --spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -load_all_plugins() -> - %% save first - case file:consult("etc/plugins.config") of - {ok, [PluginApps]} -> - application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), - [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; - {error, enoent} -> - lager:error("etc/plugins.config not found!"); - {error, Error} -> - lager:error("Load etc/plugins.config error: ~p", [Error]) - end. - -%%------------------------------------------------------------------------------ -%% @doc Load plugin -%% @end -%%------------------------------------------------------------------------------ --spec load_plugin(App :: atom()) -> ok | {error, any()}. -load_plugin(App) -> - case load_app(App) of - ok -> - start_app(App); - {error, Reason} -> - {error, Reason} - end. - -load_app(App) -> - case application:load(App) of - ok -> - lager:info("load plugin ~p successfully", [App]), ok; - {error, {already_loaded, App}} -> - lager:info("load plugin ~p is already loaded", [App]), ok; - {error, Reason} -> - lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -start_app(App) -> - case application:start(App) of - ok -> - lager:info("start plugin ~p successfully", [App]), ok; - {error, {already_started, App}} -> - lager:error("plugin ~p is already started", [App]), ok; - {error, Reason} -> - lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -%%------------------------------------------------------------------------------ -%% @doc Loaded plugins -%% @end -%%------------------------------------------------------------------------------ -loaded_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [App || App = {Name, _Descr, _Vsn} <- application:which_applications(), - lists:member(Name, PluginApps)]. - -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ --spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -unload_all_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [{App, unload_plugin(App)} || App <- PluginApps]. - -%%------------------------------------------------------------------------------ -%% @doc Unload plugin -%% @end -%%------------------------------------------------------------------------------ --spec unload_plugin(App :: atom()) -> ok | {error, any()}. -unload_plugin(App) -> - case stop_app(App) of - ok -> - unload_app(App); - {error, Reason} -> - {error, Reason} - end. - -stop_app(App) -> - case application:stop(App) of - ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; - {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; - {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} - end. - -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. load_all_mods() -> Mods = application:get_env(emqttd, modules, []), diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index f9434b385..e07a157c2 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -52,7 +52,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), - emqttd:load_all_plugins(), + %% emqttd:load_all_plugins(), {ok, Listeners} = application:get_env(listeners), emqttd:open_listeners(Listeners), register(emqttd, self()), diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index acccb5b5a..d7273e8a6 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -68,7 +68,7 @@ status([]) -> %% @end %%------------------------------------------------------------------------------ cluster([]) -> - Nodes = [node()|nodes()], + Nodes = emqttd_broker:running_nodes(), ?PRINT("cluster nodes: ~p~n", [Nodes]); cluster([SNode]) -> @@ -78,7 +78,7 @@ cluster([SNode]) -> case emqttd:is_running(Node) of true -> %%TODO: should not unload here. - emqttd:unload_all_plugins(), + %% emqttd:unload_all_plugins(), application:stop(emqttd), application:stop(esockd), application:stop(gproc), diff --git a/src/emqttd_plugin_manager.erl b/src/emqttd_plugin_manager.erl new file mode 100644 index 000000000..de92029b5 --- /dev/null +++ b/src/emqttd_plugin_manager.erl @@ -0,0 +1,172 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd plugin manager. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_plugin_manager). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-export([start/0, list/0, load/1, unload/1, stop/0]). + +start() -> + %% start all plugins + %% + ok. + +%%------------------------------------------------------------------------------ +%% @doc Load all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +load_all_plugins() -> + %% save first + case file:consult("etc/plugins.config") of + {ok, [PluginApps]} -> + ok; + %% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), + %% [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; + {error, enoent} -> + lager:error("etc/plugins.config not found!"); + {error, Error} -> + lager:error("Load etc/plugins.config error: ~p", [Error]) + end. + +%%------------------------------------------------------------------------------ +%% List all available plugins +%%------------------------------------------------------------------------------ +list() -> + {ok, PluginEnv} = application:get_env(emqttd, plugins), + PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"), + AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), + Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], + StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()], + lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> + case lists:member(Name, StartedApps) of + true -> Plugin#mqtt_plugin{active = true}; + false -> Plugin + end + end, Plugins). + +plugin(AppFile) -> + {ok, [{application, Name, Attrs}]} = file:consult(AppFile), + Ver = proplists:get_value(vsn, Attrs), + Descr = proplists:get_value(description, Attrs, ""), + #mqtt_plugin{name = Name, version = Ver, descr = Descr}. + +%%------------------------------------------------------------------------------ +%% @doc Load Plugin +%% @end +%%------------------------------------------------------------------------------ +-spec load(atom()) -> ok | {error, any()}. +load(PluginName) when is_atom(PluginName) -> + %% start plugin + %% write file if plugin is loaded + ok. + +-spec load_plugin(App :: atom()) -> ok | {error, any()}. +load_plugin(App) -> + case load_app(App) of + ok -> + start_app(App); + {error, Reason} -> + {error, Reason} + end. + +load_app(App) -> + case application:load(App) of + ok -> + lager:info("load plugin ~p successfully", [App]), ok; + {error, {already_loaded, App}} -> + lager:info("load plugin ~p is already loaded", [App]), ok; + {error, Reason} -> + lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +start_app(App) -> + case application:start(App) of + ok -> + lager:info("start plugin ~p successfully", [App]), ok; + {error, {already_started, App}} -> + lager:error("plugin ~p is already started", [App]), ok; + {error, Reason} -> + lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + + +%%------------------------------------------------------------------------------ +%% @doc UnLoad Plugin +%% @end +%%------------------------------------------------------------------------------ +-spec unload(atom()) -> ok | {error, any()}. +unload(PluginName) when is_atom(PluginName) -> + %% stop plugin + %% write file if plugin is loaded + ok. + +-spec unload_plugin(App :: atom()) -> ok | {error, any()}. +unload_plugin(App) -> + case stop_app(App) of + ok -> + unload_app(App); + {error, Reason} -> + {error, Reason} + end. + +stop_app(App) -> + case application:stop(App) of + ok -> + lager:info("stop plugin ~p successfully~n", [App]), ok; + {error, {not_started, App}} -> + lager:error("plugin ~p is not started~n", [App]), ok; + {error, Reason} -> + lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} + end. + +unload_app(App) -> + case application:unload(App) of + ok -> + lager:info("unload plugin ~p successfully~n", [App]), ok; + {error, {not_loaded, App}} -> + lager:info("load plugin ~p is not loaded~n", [App]), ok; + {error, Reason} -> + lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +stop() -> + %% stop all plugins + ok. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +unload_all_plugins() -> + PluginApps = application:get_env(emqttd, plugins, []). + %%[{App, unload_plugin(App)} || App <- PluginApps]. + diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 3f0245250..147c19ee5 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -75,7 +75,7 @@ handle_call(_Req, _From, State) -> {reply, ok, State}. handle_cast({async_submit, Fun}, State) -> - run(Fun), + try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p", [Error]) end, {noreply, State}; handle_cast(_Msg, State) -> diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index b2604d3cc..1420fb2df 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -228,7 +228,7 @@ match(Topic) when is_binary(Topic) -> %%%============================================================================= init([Id, _Opts]) -> - process_flag(min_heap_size, 1024*1024), + %%process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}.