From b701c9ec4fe45281405f54cb0f1106b00694d026 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 28 Jul 2015 11:22:10 +0800 Subject: [PATCH 01/25] try ... catch --- src/emqttd_pooler.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index b284c240e..be0699506 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -75,7 +75,7 @@ handle_call(_Req, _From, State) -> {reply, ok, State}. handle_cast({async_submit, Fun}, State) -> - run(Fun), + try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p", [Error]) end, {noreply, State}; handle_cast(_Msg, State) -> From 0cef1a3096d97e25a080f2a2fe982cc765dd185e Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Jul 2015 00:15:40 +0800 Subject: [PATCH 02/25] running_nodes --- src/emqttd_ctl.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index acccb5b5a..797775631 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -68,7 +68,7 @@ status([]) -> %% @end %%------------------------------------------------------------------------------ cluster([]) -> - Nodes = [node()|nodes()], + Nodes = emqttd_broker:running_nodes(), ?PRINT("cluster nodes: ~p~n", [Nodes]); cluster([SNode]) -> From 48534df6c05ff6b154bd6602982cfd37bbd80afb Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 1 Aug 2015 21:00:40 +0800 Subject: [PATCH 03/25] comment 'min_heap_size' --- src/emqttd_pubsub.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index b2604d3cc..1420fb2df 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -228,7 +228,7 @@ match(Topic) when is_binary(Topic) -> %%%============================================================================= init([Id, _Opts]) -> - process_flag(min_heap_size, 1024*1024), + %%process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. From 686fbfd5bbcdb94aec25452652a30cb89c077647 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 1 Aug 2015 22:12:34 +0800 Subject: [PATCH 04/25] comment autosub --- rel/files/emqttd.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index ab252b9a0..73cd9c995 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -156,7 +156,7 @@ {presence, [{qos, 0}]}, %% Subscribe topics automatically when client connected - {autosub, [{"$Q/client/$c", 0}]} + %% {autosub, [{"$Q/client/$c", 0}]} %% Rewrite rules %% {rewrite, [{file, "etc/rewrite.config"}]} From 1992b79f88e9bd02f78bb58696ea29e0224e3f0d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 2 Aug 2015 12:39:36 +0800 Subject: [PATCH 05/25] plugins --- rel/files/emqttd.config | 5 +++++ rel/files/loaded_plugins | 1 + rel/reltool.config | 3 ++- 3 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 rel/files/loaded_plugins diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 73cd9c995..498416fef 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -162,6 +162,11 @@ %% {rewrite, [{file, "etc/rewrite.config"}]} ]}, + %% Plugins + {plugins, [ + {dir, "./plugins"}, + {loaded_file, "./data/loaded_plugins"} + ]}, %% Listeners {listeners, [ {mqtt, 1883, [ diff --git a/rel/files/loaded_plugins b/rel/files/loaded_plugins new file mode 100644 index 000000000..bf38a149a --- /dev/null +++ b/rel/files/loaded_plugins @@ -0,0 +1 @@ +[emqttd_dashboard]. diff --git a/rel/reltool.config b/rel/reltool.config index 42e4755ef..768b793f0 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -88,5 +88,6 @@ {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, - {template, "files/vm.args", "etc/vm.args"} + {template, "files/vm.args", "etc/vm.args"}, + {copy, "files/loaded_plugins", "data/loaded_plugins"} ]}. From c0d93549326e81a3e1a7c3ad97f99f79d9a75e24 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 2 Aug 2015 21:52:11 +0800 Subject: [PATCH 06/25] plugins --- include/emqttd.hrl | 39 +++++--- rel/files/emqttd.config | 2 +- src/emqttd.erl | 103 -------------------- src/emqttd_app.erl | 2 +- src/emqttd_ctl.erl | 2 +- src/emqttd_plugin_manager.erl | 172 ++++++++++++++++++++++++++++++++++ 6 files changed, 202 insertions(+), 118 deletions(-) create mode 100644 src/emqttd_plugin_manager.erl diff --git a/include/emqttd.hrl b/include/emqttd.hrl index d142fdd7b..fc5ade8e8 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -129,18 +129,6 @@ -type mqtt_message() :: #mqtt_message{}. -%%------------------------------------------------------------------------------ -%% MQTT Plugin -%%------------------------------------------------------------------------------ --record(mqtt_plugin, { - name, - version, - attrs, - description -}). - --type mqtt_plugin() :: #mqtt_plugin{}. - %%------------------------------------------------------------------------------ %% MQTT Alarm %%------------------------------------------------------------------------------ @@ -154,3 +142,30 @@ -type mqtt_alarm() :: #mqtt_alarm{}. +%%------------------------------------------------------------------------------ +%% MQTT Plugin +%%------------------------------------------------------------------------------ +-record(mqtt_plugin, { + name, + version, + descr, + active = false +}). + +-type mqtt_plugin() :: #mqtt_plugin{}. + +%%------------------------------------------------------------------------------ +%% MQTT CLI Command +%% For example: 'broker metrics' +%%------------------------------------------------------------------------------ +-record(mqtt_cli, { + name, + action, + args = [], + opts = [], + usage, + descr +}). + +-type mqtt_cli() :: #mqtt_cli{}. + diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 498416fef..7ff3e49ba 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -153,7 +153,7 @@ {modules, [ %% Client presence management module. %% Publish messages when client connected or disconnected - {presence, [{qos, 0}]}, + {presence, [{qos, 0}]} %% Subscribe topics automatically when client connected %% {autosub, [{"$Q/client/$c", 0}]} diff --git a/src/emqttd.erl b/src/emqttd.erl index 074e8f3e2..e48aee560 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -30,10 +30,7 @@ -export([start/0, env/1, env/2, open_listeners/1, close_listeners/1, - load_all_plugins/0, unload_all_plugins/0, - load_plugin/1, unload_plugin/1, load_all_mods/0, is_mod_enabled/1, - loaded_plugins/0, is_running/1]). -define(MQTT_SOCKOPTS, [ @@ -112,106 +109,6 @@ close_listeners(Listeners) when is_list(Listeners) -> close_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). -%%------------------------------------------------------------------------------ -%% @doc Load all plugins -%% @end -%%------------------------------------------------------------------------------ --spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -load_all_plugins() -> - %% save first - case file:consult("etc/plugins.config") of - {ok, [PluginApps]} -> - application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), - [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; - {error, enoent} -> - lager:error("etc/plugins.config not found!"); - {error, Error} -> - lager:error("Load etc/plugins.config error: ~p", [Error]) - end. - -%%------------------------------------------------------------------------------ -%% @doc Load plugin -%% @end -%%------------------------------------------------------------------------------ --spec load_plugin(App :: atom()) -> ok | {error, any()}. -load_plugin(App) -> - case load_app(App) of - ok -> - start_app(App); - {error, Reason} -> - {error, Reason} - end. - -load_app(App) -> - case application:load(App) of - ok -> - lager:info("load plugin ~p successfully", [App]), ok; - {error, {already_loaded, App}} -> - lager:info("load plugin ~p is already loaded", [App]), ok; - {error, Reason} -> - lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -start_app(App) -> - case application:start(App) of - ok -> - lager:info("start plugin ~p successfully", [App]), ok; - {error, {already_started, App}} -> - lager:error("plugin ~p is already started", [App]), ok; - {error, Reason} -> - lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -%%------------------------------------------------------------------------------ -%% @doc Loaded plugins -%% @end -%%------------------------------------------------------------------------------ -loaded_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [App || App = {Name, _Descr, _Vsn} <- application:which_applications(), - lists:member(Name, PluginApps)]. - -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ --spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -unload_all_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [{App, unload_plugin(App)} || App <- PluginApps]. - -%%------------------------------------------------------------------------------ -%% @doc Unload plugin -%% @end -%%------------------------------------------------------------------------------ --spec unload_plugin(App :: atom()) -> ok | {error, any()}. -unload_plugin(App) -> - case stop_app(App) of - ok -> - unload_app(App); - {error, Reason} -> - {error, Reason} - end. - -stop_app(App) -> - case application:stop(App) of - ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; - {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; - {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} - end. - -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. load_all_mods() -> Mods = application:get_env(emqttd, modules, []), diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index f9434b385..e07a157c2 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -52,7 +52,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), - emqttd:load_all_plugins(), + %% emqttd:load_all_plugins(), {ok, Listeners} = application:get_env(listeners), emqttd:open_listeners(Listeners), register(emqttd, self()), diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 797775631..d7273e8a6 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -78,7 +78,7 @@ cluster([SNode]) -> case emqttd:is_running(Node) of true -> %%TODO: should not unload here. - emqttd:unload_all_plugins(), + %% emqttd:unload_all_plugins(), application:stop(emqttd), application:stop(esockd), application:stop(gproc), diff --git a/src/emqttd_plugin_manager.erl b/src/emqttd_plugin_manager.erl new file mode 100644 index 000000000..de92029b5 --- /dev/null +++ b/src/emqttd_plugin_manager.erl @@ -0,0 +1,172 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd plugin manager. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_plugin_manager). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-export([start/0, list/0, load/1, unload/1, stop/0]). + +start() -> + %% start all plugins + %% + ok. + +%%------------------------------------------------------------------------------ +%% @doc Load all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +load_all_plugins() -> + %% save first + case file:consult("etc/plugins.config") of + {ok, [PluginApps]} -> + ok; + %% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), + %% [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; + {error, enoent} -> + lager:error("etc/plugins.config not found!"); + {error, Error} -> + lager:error("Load etc/plugins.config error: ~p", [Error]) + end. + +%%------------------------------------------------------------------------------ +%% List all available plugins +%%------------------------------------------------------------------------------ +list() -> + {ok, PluginEnv} = application:get_env(emqttd, plugins), + PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"), + AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), + Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], + StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()], + lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> + case lists:member(Name, StartedApps) of + true -> Plugin#mqtt_plugin{active = true}; + false -> Plugin + end + end, Plugins). + +plugin(AppFile) -> + {ok, [{application, Name, Attrs}]} = file:consult(AppFile), + Ver = proplists:get_value(vsn, Attrs), + Descr = proplists:get_value(description, Attrs, ""), + #mqtt_plugin{name = Name, version = Ver, descr = Descr}. + +%%------------------------------------------------------------------------------ +%% @doc Load Plugin +%% @end +%%------------------------------------------------------------------------------ +-spec load(atom()) -> ok | {error, any()}. +load(PluginName) when is_atom(PluginName) -> + %% start plugin + %% write file if plugin is loaded + ok. + +-spec load_plugin(App :: atom()) -> ok | {error, any()}. +load_plugin(App) -> + case load_app(App) of + ok -> + start_app(App); + {error, Reason} -> + {error, Reason} + end. + +load_app(App) -> + case application:load(App) of + ok -> + lager:info("load plugin ~p successfully", [App]), ok; + {error, {already_loaded, App}} -> + lager:info("load plugin ~p is already loaded", [App]), ok; + {error, Reason} -> + lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +start_app(App) -> + case application:start(App) of + ok -> + lager:info("start plugin ~p successfully", [App]), ok; + {error, {already_started, App}} -> + lager:error("plugin ~p is already started", [App]), ok; + {error, Reason} -> + lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + + +%%------------------------------------------------------------------------------ +%% @doc UnLoad Plugin +%% @end +%%------------------------------------------------------------------------------ +-spec unload(atom()) -> ok | {error, any()}. +unload(PluginName) when is_atom(PluginName) -> + %% stop plugin + %% write file if plugin is loaded + ok. + +-spec unload_plugin(App :: atom()) -> ok | {error, any()}. +unload_plugin(App) -> + case stop_app(App) of + ok -> + unload_app(App); + {error, Reason} -> + {error, Reason} + end. + +stop_app(App) -> + case application:stop(App) of + ok -> + lager:info("stop plugin ~p successfully~n", [App]), ok; + {error, {not_started, App}} -> + lager:error("plugin ~p is not started~n", [App]), ok; + {error, Reason} -> + lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} + end. + +unload_app(App) -> + case application:unload(App) of + ok -> + lager:info("unload plugin ~p successfully~n", [App]), ok; + {error, {not_loaded, App}} -> + lager:info("load plugin ~p is not loaded~n", [App]), ok; + {error, Reason} -> + lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +stop() -> + %% stop all plugins + ok. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +unload_all_plugins() -> + PluginApps = application:get_env(emqttd, plugins, []). + %%[{App, unload_plugin(App)} || App <- PluginApps]. + From 5f6b0fc624b724011c4c01c385b62702ae515d79 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 3 Aug 2015 23:50:48 +0800 Subject: [PATCH 07/25] rm apps/ --- Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index a3b509386..2d8b5b01a 100644 --- a/Makefile +++ b/Makefile @@ -44,12 +44,12 @@ APPS = erts kernel stdlib sasl crypto ssl os_mon syntax_tools \ check_plt: compile dialyzer --check_plt --plt $(PLT) --apps $(APPS) \ - deps/*/ebin apps/*/ebin + deps/*/ebin ./ebin build_plt: compile dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) \ - deps/*/ebin apps/*/ebin + deps/*/ebin ./ebin dialyzer: compile - dialyzer -Wno_return --plt $(PLT) deps/*/ebin apps/*/ebin + dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin From dd1d48360d506e29aa1689a35c9b79d973df967a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 4 Aug 2015 09:10:54 +0800 Subject: [PATCH 08/25] rm apps --- Makefile | 6 +++--- src/{emqttd_plugin_manager.erl => emqttd_plugin_mgr.erl} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename src/{emqttd_plugin_manager.erl => emqttd_plugin_mgr.erl} (100%) diff --git a/Makefile b/Makefile index a3b509386..2d8b5b01a 100644 --- a/Makefile +++ b/Makefile @@ -44,12 +44,12 @@ APPS = erts kernel stdlib sasl crypto ssl os_mon syntax_tools \ check_plt: compile dialyzer --check_plt --plt $(PLT) --apps $(APPS) \ - deps/*/ebin apps/*/ebin + deps/*/ebin ./ebin build_plt: compile dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) \ - deps/*/ebin apps/*/ebin + deps/*/ebin ./ebin dialyzer: compile - dialyzer -Wno_return --plt $(PLT) deps/*/ebin apps/*/ebin + dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin diff --git a/src/emqttd_plugin_manager.erl b/src/emqttd_plugin_mgr.erl similarity index 100% rename from src/emqttd_plugin_manager.erl rename to src/emqttd_plugin_mgr.erl From bf75dbf794692e32ea8324a5d2781c089aaff1e2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 4 Aug 2015 09:11:18 +0800 Subject: [PATCH 09/25] sync with mac air --- src/emqttd_plugin_mgr.erl | 148 ++++++++++++++++++++++++-------------- 1 file changed, 93 insertions(+), 55 deletions(-) diff --git a/src/emqttd_plugin_mgr.erl b/src/emqttd_plugin_mgr.erl index de92029b5..55216f7c2 100644 --- a/src/emqttd_plugin_mgr.erl +++ b/src/emqttd_plugin_mgr.erl @@ -25,7 +25,7 @@ %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_plugin_manager). +-module(emqttd_plugin_mgr). -author("Feng Lee "). @@ -33,31 +33,28 @@ -export([start/0, list/0, load/1, unload/1, stop/0]). -start() -> - %% start all plugins - %% - ok. - %%------------------------------------------------------------------------------ %% @doc Load all plugins %% @end %%------------------------------------------------------------------------------ --spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -load_all_plugins() -> - %% save first - case file:consult("etc/plugins.config") of - {ok, [PluginApps]} -> - ok; - %% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), - %% [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; - {error, enoent} -> - lager:error("etc/plugins.config not found!"); +-spec start() -> ok | {error, any()}. +start() -> + case read_loaded() of + {ok, AppNames} -> + NotFound = AppNames -- apps(plugin), + case NotFound of + [] -> ok; + NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) + end, + {ok, start_apps(AppNames -- NotFound -- apps(started))}; {error, Error} -> - lager:error("Load etc/plugins.config error: ~p", [Error]) + lager:error("Read loaded_plugins file error: ~p", [Error]), + {error, Error} end. %%------------------------------------------------------------------------------ -%% List all available plugins +%% @doc List all available plugins +%% @end %%------------------------------------------------------------------------------ list() -> {ok, PluginEnv} = application:get_env(emqttd, plugins), @@ -74,7 +71,7 @@ list() -> plugin(AppFile) -> {ok, [{application, Name, Attrs}]} = file:consult(AppFile), - Ver = proplists:get_value(vsn, Attrs), + Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), #mqtt_plugin{name = Name, version = Ver, descr = Descr}. @@ -84,40 +81,40 @@ plugin(AppFile) -> %%------------------------------------------------------------------------------ -spec load(atom()) -> ok | {error, any()}. load(PluginName) when is_atom(PluginName) -> - %% start plugin - %% write file if plugin is loaded - ok. - --spec load_plugin(App :: atom()) -> ok | {error, any()}. -load_plugin(App) -> - case load_app(App) of - ok -> - start_app(App); - {error, Reason} -> - {error, Reason} + case lists:member(PluginName, apps(started)) of + true -> + lager:info("plugin ~p is started", [PluginName]), + {error, already_started}; + false -> + case lists:member(PluginName, apps(plugin)) of + true -> + load_plugin(PluginName); + false -> + lager:info("plugin ~p is not found", [PluginName]), + {error, not_foun} + end end. - -load_app(App) -> - case application:load(App) of - ok -> - lager:info("load plugin ~p successfully", [App]), ok; - {error, {already_loaded, App}} -> - lager:info("load plugin ~p is already loaded", [App]), ok; - {error, Reason} -> - lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} + +-spec load_plugin(App :: atom()) -> {ok, list()} | {error, any()}. +load_plugin(PluginName) -> + case start_app(PluginName) of + {ok, Started} -> + plugin_loaded(PluginName), + {ok, Started}; + {error, Error} -> + {error, Error} end. start_app(App) -> - case application:start(App) of - ok -> - lager:info("start plugin ~p successfully", [App]), ok; - {error, {already_started, App}} -> - lager:error("plugin ~p is already started", [App]), ok; - {error, Reason} -> - lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} + case application:ensure_all_started(App) of + {ok, Started} -> + lager:info("started apps: ~p, load plugin ~p successfully", [Started, App]), + {ok, Started}; + {error, {ErrApp, Reason}} -> + lager:error("load plugin ~p error, cannot start app ~s for ~p", [App, ErrApp, Reason]), + {error, {ErrApp, Reason}} end. - %%------------------------------------------------------------------------------ %% @doc UnLoad Plugin %% @end @@ -159,14 +156,55 @@ unload_app(App) -> stop() -> %% stop all plugins + PluginApps = application:get_env(emqttd, plugins, []), + %%[{App, unload_plugin(App)} || App <- PluginApps]. ok. -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ --spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -unload_all_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []). - %%[{App, unload_plugin(App)} || App <- PluginApps]. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +start_apps(Apps) -> + [start_app(App) || App <- Apps]. + +stop_apps(Apps) -> + [stop_app(App) || App <- Apps]. + +apps(plugin) -> + [Name || #mqtt_plugin{name = Name} <- list()]; + +apps(started) -> + [Name || {Name, _Descr, _Ver} <- application:which_applications()]. + +plugin_loaded(Name) -> + case read_loaded() of + {ok, Names} -> + case lists:member(Name, Names) of + true -> + ok; + false -> + %% write file if plugin is loaded + write_loaded(lists:append(Names, Name)) + end; + {error, Error} -> + lager:error("Cannot read loaded plugins: ~p", [Error]) + end. + + + +read_loaded() -> + {ok, PluginEnv} = application:get_env(emqttd, plugins), + LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"), + file:consult(LoadedFile). + +write_loaded(AppNames) -> + {ok, PluginEnv} = application:get_env(emqttd, plugins), + LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"), + case file:open(LoadedFile, [binary, write]) of + {ok, Fd} -> + Line = list_to_binary(io_lib:format("~w.~n", [AppNames])), + file:write(Fd, Line); + {error, Error} -> + {error, Error} + end. From f0109c7af7b009e7061d705dde1e6235f1676961 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 4 Aug 2015 11:39:30 +0800 Subject: [PATCH 10/25] plugin --- include/emqttd.hrl | 39 +++++--- rel/files/emqttd.config | 9 +- rel/files/loaded_plugins | 1 + rel/reltool.config | 3 +- src/emqttd.erl | 103 -------------------- src/emqttd_app.erl | 2 +- src/emqttd_ctl.erl | 4 +- src/emqttd_plugin_manager.erl | 172 ++++++++++++++++++++++++++++++++++ src/emqttd_pooler.erl | 2 +- src/emqttd_pubsub.erl | 2 +- 10 files changed, 214 insertions(+), 123 deletions(-) create mode 100644 rel/files/loaded_plugins create mode 100644 src/emqttd_plugin_manager.erl diff --git a/include/emqttd.hrl b/include/emqttd.hrl index d142fdd7b..fc5ade8e8 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -129,18 +129,6 @@ -type mqtt_message() :: #mqtt_message{}. -%%------------------------------------------------------------------------------ -%% MQTT Plugin -%%------------------------------------------------------------------------------ --record(mqtt_plugin, { - name, - version, - attrs, - description -}). - --type mqtt_plugin() :: #mqtt_plugin{}. - %%------------------------------------------------------------------------------ %% MQTT Alarm %%------------------------------------------------------------------------------ @@ -154,3 +142,30 @@ -type mqtt_alarm() :: #mqtt_alarm{}. +%%------------------------------------------------------------------------------ +%% MQTT Plugin +%%------------------------------------------------------------------------------ +-record(mqtt_plugin, { + name, + version, + descr, + active = false +}). + +-type mqtt_plugin() :: #mqtt_plugin{}. + +%%------------------------------------------------------------------------------ +%% MQTT CLI Command +%% For example: 'broker metrics' +%%------------------------------------------------------------------------------ +-record(mqtt_cli, { + name, + action, + args = [], + opts = [], + usage, + descr +}). + +-type mqtt_cli() :: #mqtt_cli{}. + diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index ab252b9a0..7ff3e49ba 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -153,15 +153,20 @@ {modules, [ %% Client presence management module. %% Publish messages when client connected or disconnected - {presence, [{qos, 0}]}, + {presence, [{qos, 0}]} %% Subscribe topics automatically when client connected - {autosub, [{"$Q/client/$c", 0}]} + %% {autosub, [{"$Q/client/$c", 0}]} %% Rewrite rules %% {rewrite, [{file, "etc/rewrite.config"}]} ]}, + %% Plugins + {plugins, [ + {dir, "./plugins"}, + {loaded_file, "./data/loaded_plugins"} + ]}, %% Listeners {listeners, [ {mqtt, 1883, [ diff --git a/rel/files/loaded_plugins b/rel/files/loaded_plugins new file mode 100644 index 000000000..bf38a149a --- /dev/null +++ b/rel/files/loaded_plugins @@ -0,0 +1 @@ +[emqttd_dashboard]. diff --git a/rel/reltool.config b/rel/reltool.config index 42e4755ef..768b793f0 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -88,5 +88,6 @@ {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, {template, "files/plugins.config", "etc/plugins.config"}, - {template, "files/vm.args", "etc/vm.args"} + {template, "files/vm.args", "etc/vm.args"}, + {copy, "files/loaded_plugins", "data/loaded_plugins"} ]}. diff --git a/src/emqttd.erl b/src/emqttd.erl index 074e8f3e2..e48aee560 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -30,10 +30,7 @@ -export([start/0, env/1, env/2, open_listeners/1, close_listeners/1, - load_all_plugins/0, unload_all_plugins/0, - load_plugin/1, unload_plugin/1, load_all_mods/0, is_mod_enabled/1, - loaded_plugins/0, is_running/1]). -define(MQTT_SOCKOPTS, [ @@ -112,106 +109,6 @@ close_listeners(Listeners) when is_list(Listeners) -> close_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). -%%------------------------------------------------------------------------------ -%% @doc Load all plugins -%% @end -%%------------------------------------------------------------------------------ --spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -load_all_plugins() -> - %% save first - case file:consult("etc/plugins.config") of - {ok, [PluginApps]} -> - application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), - [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; - {error, enoent} -> - lager:error("etc/plugins.config not found!"); - {error, Error} -> - lager:error("Load etc/plugins.config error: ~p", [Error]) - end. - -%%------------------------------------------------------------------------------ -%% @doc Load plugin -%% @end -%%------------------------------------------------------------------------------ --spec load_plugin(App :: atom()) -> ok | {error, any()}. -load_plugin(App) -> - case load_app(App) of - ok -> - start_app(App); - {error, Reason} -> - {error, Reason} - end. - -load_app(App) -> - case application:load(App) of - ok -> - lager:info("load plugin ~p successfully", [App]), ok; - {error, {already_loaded, App}} -> - lager:info("load plugin ~p is already loaded", [App]), ok; - {error, Reason} -> - lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -start_app(App) -> - case application:start(App) of - ok -> - lager:info("start plugin ~p successfully", [App]), ok; - {error, {already_started, App}} -> - lager:error("plugin ~p is already started", [App]), ok; - {error, Reason} -> - lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -%%------------------------------------------------------------------------------ -%% @doc Loaded plugins -%% @end -%%------------------------------------------------------------------------------ -loaded_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [App || App = {Name, _Descr, _Vsn} <- application:which_applications(), - lists:member(Name, PluginApps)]. - -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ --spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -unload_all_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [{App, unload_plugin(App)} || App <- PluginApps]. - -%%------------------------------------------------------------------------------ -%% @doc Unload plugin -%% @end -%%------------------------------------------------------------------------------ --spec unload_plugin(App :: atom()) -> ok | {error, any()}. -unload_plugin(App) -> - case stop_app(App) of - ok -> - unload_app(App); - {error, Reason} -> - {error, Reason} - end. - -stop_app(App) -> - case application:stop(App) of - ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; - {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; - {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} - end. - -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. load_all_mods() -> Mods = application:get_env(emqttd, modules, []), diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index f9434b385..e07a157c2 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -52,7 +52,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), - emqttd:load_all_plugins(), + %% emqttd:load_all_plugins(), {ok, Listeners} = application:get_env(listeners), emqttd:open_listeners(Listeners), register(emqttd, self()), diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index acccb5b5a..d7273e8a6 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -68,7 +68,7 @@ status([]) -> %% @end %%------------------------------------------------------------------------------ cluster([]) -> - Nodes = [node()|nodes()], + Nodes = emqttd_broker:running_nodes(), ?PRINT("cluster nodes: ~p~n", [Nodes]); cluster([SNode]) -> @@ -78,7 +78,7 @@ cluster([SNode]) -> case emqttd:is_running(Node) of true -> %%TODO: should not unload here. - emqttd:unload_all_plugins(), + %% emqttd:unload_all_plugins(), application:stop(emqttd), application:stop(esockd), application:stop(gproc), diff --git a/src/emqttd_plugin_manager.erl b/src/emqttd_plugin_manager.erl new file mode 100644 index 000000000..de92029b5 --- /dev/null +++ b/src/emqttd_plugin_manager.erl @@ -0,0 +1,172 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd plugin manager. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_plugin_manager). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-export([start/0, list/0, load/1, unload/1, stop/0]). + +start() -> + %% start all plugins + %% + ok. + +%%------------------------------------------------------------------------------ +%% @doc Load all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +load_all_plugins() -> + %% save first + case file:consult("etc/plugins.config") of + {ok, [PluginApps]} -> + ok; + %% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), + %% [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; + {error, enoent} -> + lager:error("etc/plugins.config not found!"); + {error, Error} -> + lager:error("Load etc/plugins.config error: ~p", [Error]) + end. + +%%------------------------------------------------------------------------------ +%% List all available plugins +%%------------------------------------------------------------------------------ +list() -> + {ok, PluginEnv} = application:get_env(emqttd, plugins), + PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"), + AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), + Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], + StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()], + lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> + case lists:member(Name, StartedApps) of + true -> Plugin#mqtt_plugin{active = true}; + false -> Plugin + end + end, Plugins). + +plugin(AppFile) -> + {ok, [{application, Name, Attrs}]} = file:consult(AppFile), + Ver = proplists:get_value(vsn, Attrs), + Descr = proplists:get_value(description, Attrs, ""), + #mqtt_plugin{name = Name, version = Ver, descr = Descr}. + +%%------------------------------------------------------------------------------ +%% @doc Load Plugin +%% @end +%%------------------------------------------------------------------------------ +-spec load(atom()) -> ok | {error, any()}. +load(PluginName) when is_atom(PluginName) -> + %% start plugin + %% write file if plugin is loaded + ok. + +-spec load_plugin(App :: atom()) -> ok | {error, any()}. +load_plugin(App) -> + case load_app(App) of + ok -> + start_app(App); + {error, Reason} -> + {error, Reason} + end. + +load_app(App) -> + case application:load(App) of + ok -> + lager:info("load plugin ~p successfully", [App]), ok; + {error, {already_loaded, App}} -> + lager:info("load plugin ~p is already loaded", [App]), ok; + {error, Reason} -> + lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +start_app(App) -> + case application:start(App) of + ok -> + lager:info("start plugin ~p successfully", [App]), ok; + {error, {already_started, App}} -> + lager:error("plugin ~p is already started", [App]), ok; + {error, Reason} -> + lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + + +%%------------------------------------------------------------------------------ +%% @doc UnLoad Plugin +%% @end +%%------------------------------------------------------------------------------ +-spec unload(atom()) -> ok | {error, any()}. +unload(PluginName) when is_atom(PluginName) -> + %% stop plugin + %% write file if plugin is loaded + ok. + +-spec unload_plugin(App :: atom()) -> ok | {error, any()}. +unload_plugin(App) -> + case stop_app(App) of + ok -> + unload_app(App); + {error, Reason} -> + {error, Reason} + end. + +stop_app(App) -> + case application:stop(App) of + ok -> + lager:info("stop plugin ~p successfully~n", [App]), ok; + {error, {not_started, App}} -> + lager:error("plugin ~p is not started~n", [App]), ok; + {error, Reason} -> + lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} + end. + +unload_app(App) -> + case application:unload(App) of + ok -> + lager:info("unload plugin ~p successfully~n", [App]), ok; + {error, {not_loaded, App}} -> + lager:info("load plugin ~p is not loaded~n", [App]), ok; + {error, Reason} -> + lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} + end. + +stop() -> + %% stop all plugins + ok. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins +%% @end +%%------------------------------------------------------------------------------ +-spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. +unload_all_plugins() -> + PluginApps = application:get_env(emqttd, plugins, []). + %%[{App, unload_plugin(App)} || App <- PluginApps]. + diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 3f0245250..147c19ee5 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -75,7 +75,7 @@ handle_call(_Req, _From, State) -> {reply, ok, State}. handle_cast({async_submit, Fun}, State) -> - run(Fun), + try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p", [Error]) end, {noreply, State}; handle_cast(_Msg, State) -> diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index b2604d3cc..1420fb2df 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -228,7 +228,7 @@ match(Topic) when is_binary(Topic) -> %%%============================================================================= init([Id, _Opts]) -> - process_flag(min_heap_size, 1024*1024), + %%process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. From 1a64e4e37325b4dadf5962ad9969a29973b851af Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 4 Aug 2015 13:21:59 +0800 Subject: [PATCH 11/25] plugins --- .../src/emqttd_plugin_demo.app.src | 12 ++ .../src/emqttd_plugin_demo_app.erl | 16 ++ .../src/emqttd_plugin_demo_sup.erl | 27 +++ src/emqttd_app.erl | 26 ++- src/emqttd_ctl.erl | 17 +- src/emqttd_plugin_manager.erl | 172 ------------------ ...qttd_plugin_mgr.erl => emqttd_plugins.erl} | 146 ++++++++------- 7 files changed, 165 insertions(+), 251 deletions(-) create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl create mode 100644 plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl delete mode 100644 src/emqttd_plugin_manager.erl rename src/{emqttd_plugin_mgr.erl => emqttd_plugins.erl} (67%) diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src new file mode 100644 index 000000000..1fd8ce62d --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src @@ -0,0 +1,12 @@ +{application, emqttd_plugin_demo, + [ + {description, ""}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { emqttd_plugin_demo_app, []}}, + {env, []} + ]}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl new file mode 100644 index 000000000..9042a449a --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl @@ -0,0 +1,16 @@ +-module(emqttd_plugin_demo_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + emqttd_plugin_demo_sup:start_link(). + +stop(_State) -> + ok. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl new file mode 100644 index 000000000..65dad7a60 --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl @@ -0,0 +1,27 @@ +-module(emqttd_plugin_demo_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. + diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index e07a157c2..159062b16 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -52,9 +52,8 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), - %% emqttd:load_all_plugins(), - {ok, Listeners} = application:get_env(listeners), - emqttd:open_listeners(Listeners), + emqttd_plugins:load(), + start_listeners(), register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -67,6 +66,10 @@ print_vsn() -> {ok, Desc} = application:get_key(description), ?PRINT("~s ~s is running now~n", [Desc, Vsn]). +start_listeners() -> + {ok, Listeners} = application:get_env(listeners), + emqttd:open_listeners(Listeners). + start_servers(Sup) -> Servers = [{"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, @@ -131,14 +134,23 @@ worker_spec(Name, Opts) -> %% close all listeners first... prep_stop(State) -> - %%TODO: esockd app should be running... - {ok, Listeners} = application:get_env(listeners), - emqttd:close_listeners(Listeners), + stop_listeners(), + timer:sleep(2), + emqttd_plugins:unload(), timer:sleep(2), State. +stop_listeners() -> + %% ensure that esockd applications is started? + case lists:keyfind(esockd, 1, application:which_applications()) of + false -> + ignore; + _Tuple -> + {ok, Listeners} = application:get_env(listeners), + emqttd:close_listeners(Listeners) + end. + -spec stop(State :: term()) -> term(). stop(_State) -> ok. - diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index d7273e8a6..048af3431 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -77,8 +77,6 @@ cluster([SNode]) -> pong -> case emqttd:is_running(Node) of true -> - %%TODO: should not unload here. - %% emqttd:unload_all_plugins(), application:stop(emqttd), application:stop(esockd), application:stop(gproc), @@ -180,19 +178,20 @@ bridges(["stop", SNode, Topic]) -> end. plugins(["list"]) -> - Plugins = emqttd:loaded_plugins(), - lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins); + lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active]) + end, emqttd_plugins:list()); plugins(["load", Name]) -> - case emqttd:load_plugin(list_to_atom(Name)) of - ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) + case emqttd_plugins:load(list_to_atom(Name)) of + {ok, StartedApps} -> ?PRINT("start apps: ~p, plugin ~s is loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~s~n", [Reason]) end; plugins(["unload", Name]) -> - case emqttd:unload_plugin(list_to_atom(Name)) of + case emqttd_plugins:unload(list_to_atom(Name)) of ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) + {error, Reason} -> ?PRINT("unload plugin error: ~s~n", [Reason]) end. trace(["list"]) -> diff --git a/src/emqttd_plugin_manager.erl b/src/emqttd_plugin_manager.erl deleted file mode 100644 index de92029b5..000000000 --- a/src/emqttd_plugin_manager.erl +++ /dev/null @@ -1,172 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd plugin manager. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - --module(emqttd_plugin_manager). - --author("Feng Lee "). - --include("emqttd.hrl"). - --export([start/0, list/0, load/1, unload/1, stop/0]). - -start() -> - %% start all plugins - %% - ok. - -%%------------------------------------------------------------------------------ -%% @doc Load all plugins -%% @end -%%------------------------------------------------------------------------------ --spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -load_all_plugins() -> - %% save first - case file:consult("etc/plugins.config") of - {ok, [PluginApps]} -> - ok; - %% application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), - %% [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; - {error, enoent} -> - lager:error("etc/plugins.config not found!"); - {error, Error} -> - lager:error("Load etc/plugins.config error: ~p", [Error]) - end. - -%%------------------------------------------------------------------------------ -%% List all available plugins -%%------------------------------------------------------------------------------ -list() -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"), - AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), - Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], - StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()], - lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> - case lists:member(Name, StartedApps) of - true -> Plugin#mqtt_plugin{active = true}; - false -> Plugin - end - end, Plugins). - -plugin(AppFile) -> - {ok, [{application, Name, Attrs}]} = file:consult(AppFile), - Ver = proplists:get_value(vsn, Attrs), - Descr = proplists:get_value(description, Attrs, ""), - #mqtt_plugin{name = Name, version = Ver, descr = Descr}. - -%%------------------------------------------------------------------------------ -%% @doc Load Plugin -%% @end -%%------------------------------------------------------------------------------ --spec load(atom()) -> ok | {error, any()}. -load(PluginName) when is_atom(PluginName) -> - %% start plugin - %% write file if plugin is loaded - ok. - --spec load_plugin(App :: atom()) -> ok | {error, any()}. -load_plugin(App) -> - case load_app(App) of - ok -> - start_app(App); - {error, Reason} -> - {error, Reason} - end. - -load_app(App) -> - case application:load(App) of - ok -> - lager:info("load plugin ~p successfully", [App]), ok; - {error, {already_loaded, App}} -> - lager:info("load plugin ~p is already loaded", [App]), ok; - {error, Reason} -> - lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -start_app(App) -> - case application:start(App) of - ok -> - lager:info("start plugin ~p successfully", [App]), ok; - {error, {already_started, App}} -> - lager:error("plugin ~p is already started", [App]), ok; - {error, Reason} -> - lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - - -%%------------------------------------------------------------------------------ -%% @doc UnLoad Plugin -%% @end -%%------------------------------------------------------------------------------ --spec unload(atom()) -> ok | {error, any()}. -unload(PluginName) when is_atom(PluginName) -> - %% stop plugin - %% write file if plugin is loaded - ok. - --spec unload_plugin(App :: atom()) -> ok | {error, any()}. -unload_plugin(App) -> - case stop_app(App) of - ok -> - unload_app(App); - {error, Reason} -> - {error, Reason} - end. - -stop_app(App) -> - case application:stop(App) of - ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; - {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; - {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} - end. - -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -stop() -> - %% stop all plugins - ok. - -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ --spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -unload_all_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []). - %%[{App, unload_plugin(App)} || App <- PluginApps]. - diff --git a/src/emqttd_plugin_mgr.erl b/src/emqttd_plugins.erl similarity index 67% rename from src/emqttd_plugin_mgr.erl rename to src/emqttd_plugins.erl index 55216f7c2..4372ca3c7 100644 --- a/src/emqttd_plugin_mgr.erl +++ b/src/emqttd_plugins.erl @@ -20,53 +20,75 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd plugin manager. +%%% emqttd plugin admin. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_plugin_mgr). +-module(emqttd_plugins). -author("Feng Lee "). -include("emqttd.hrl"). --export([start/0, list/0, load/1, unload/1, stop/0]). +-export([load/0, unload/0]). + +-export([list/0, load/1, unload/1]). %%------------------------------------------------------------------------------ -%% @doc Load all plugins +%% @doc Load all plugins when the broker started. %% @end %%------------------------------------------------------------------------------ --spec start() -> ok | {error, any()}. -start() -> +-spec load() -> list() | {error, any()}. +load() -> case read_loaded() of - {ok, AppNames} -> - NotFound = AppNames -- apps(plugin), + {ok, LoadNames} -> + NotFound = LoadNames -- apps(plugin), case NotFound of [] -> ok; NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) end, - {ok, start_apps(AppNames -- NotFound -- apps(started))}; + start_apps(LoadNames -- NotFound -- apps(started)); {error, Error} -> lager:error("Read loaded_plugins file error: ~p", [Error]), {error, Error} end. +start_apps(Apps) -> + [start_app(App) || App <- Apps]. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins before broker stopped. +%% @end +%%------------------------------------------------------------------------------ +-spec unload() -> list() | {error, any()}. +unload() -> + case read_loaded() of + {ok, LoadNames} -> + stop_apps(LoadNames); + {error, Error} -> + lager:error("Read loaded_plugins file error: ~p", [Error]), + {error, Error} + end. + +stop_apps(Apps) -> + [stop_app(App) || App <- Apps]. + %%------------------------------------------------------------------------------ %% @doc List all available plugins %% @end %%------------------------------------------------------------------------------ +-spec list() -> [mqtt_plugin()]. list() -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - PluginsDir = proplists:get_value(dir, PluginEnv, "./plugins"), + PluginsDir = env(dir), AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], - StartedApps = [Name || {Name, _Descr, _Ver} <- application:which_applications()], + StartedApps = apps(started), lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> - case lists:member(Name, StartedApps) of - true -> Plugin#mqtt_plugin{active = true}; - false -> Plugin - end + case lists:member(Name, StartedApps) of + true -> Plugin#mqtt_plugin{active = true}; + false -> Plugin + end end, Plugins). plugin(AppFile) -> @@ -76,23 +98,20 @@ plugin(AppFile) -> #mqtt_plugin{name = Name, version = Ver, descr = Descr}. %%------------------------------------------------------------------------------ -%% @doc Load Plugin +%% @doc Load One Plugin %% @end %%------------------------------------------------------------------------------ -spec load(atom()) -> ok | {error, any()}. load(PluginName) when is_atom(PluginName) -> - case lists:member(PluginName, apps(started)) of - true -> - lager:info("plugin ~p is started", [PluginName]), + case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of + {true, _} -> + lager:error("plugin ~p is started", [PluginName]), {error, already_started}; - false -> - case lists:member(PluginName, apps(plugin)) of - true -> - load_plugin(PluginName); - false -> - lager:info("plugin ~p is not found", [PluginName]), - {error, not_foun} - end + {false, true} -> + load_plugin(PluginName); + {false, false} -> + lager:error("plugin ~p is not found", [PluginName]), + {error, not_found} end. -spec load_plugin(App :: atom()) -> {ok, list()} | {error, any()}. @@ -116,20 +135,27 @@ start_app(App) -> end. %%------------------------------------------------------------------------------ -%% @doc UnLoad Plugin +%% @doc UnLoad One Plugin %% @end %%------------------------------------------------------------------------------ -spec unload(atom()) -> ok | {error, any()}. unload(PluginName) when is_atom(PluginName) -> - %% stop plugin - %% write file if plugin is loaded - ok. + case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of + {false, _} -> + lager:error("plugin ~p is not started", [PluginName]), + {error, not_started}; + {true, true} -> + unload_plugin(PluginName); + {true, false} -> + lager:error("~s is not a plugin, cannot unload it", [PluginName]), + {error, not_found} + end. -spec unload_plugin(App :: atom()) -> ok | {error, any()}. unload_plugin(App) -> case stop_app(App) of ok -> - unload_app(App); + plugin_unloaded(App), ok; {error, Reason} -> {error, Reason} end. @@ -144,32 +170,10 @@ stop_app(App) -> lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} end. -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -stop() -> - %% stop all plugins - PluginApps = application:get_env(emqttd, plugins, []), - %%[{App, unload_plugin(App)} || App <- PluginApps]. - ok. - %%%============================================================================= %%% Internal functions %%%============================================================================= -start_apps(Apps) -> - [start_app(App) || App <- Apps]. - -stop_apps(Apps) -> - [stop_app(App) || App <- Apps]. - apps(plugin) -> [Name || #mqtt_plugin{name = Name} <- list()]; @@ -181,7 +185,7 @@ plugin_loaded(Name) -> {ok, Names} -> case lists:member(Name, Names) of true -> - ok; + ignore; false -> %% write file if plugin is loaded write_loaded(lists:append(Names, Name)) @@ -190,17 +194,24 @@ plugin_loaded(Name) -> lager:error("Cannot read loaded plugins: ~p", [Error]) end. - +plugin_unloaded(Name) -> + case read_loaded() of + {ok, Names} -> + case lists:member(Name, Names) of + true -> + write_loaded(lists:delete(Name, Names)); + false -> + lager:error("Cannot find ~s in loaded_file", [Name]) + end; + {error, Error} -> + lager:error("Cannot read loaded plugins: ~p", [Error]) + end. read_loaded() -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"), - file:consult(LoadedFile). + file:consult(env(loaded_file)). write_loaded(AppNames) -> - {ok, PluginEnv} = application:get_env(emqttd, plugins), - LoadedFile = proplists:get_value(loaded_file, PluginEnv, "./data/loaded_plugins"), - case file:open(LoadedFile, [binary, write]) of + case file:open(env(loaded_file), [binary, write]) of {ok, Fd} -> Line = list_to_binary(io_lib:format("~w.~n", [AppNames])), file:write(Fd, Line); @@ -208,3 +219,12 @@ write_loaded(AppNames) -> {error, Error} end. +env(dir) -> + proplists:get_value(dir, env(), "./plugins"); + +env(loaded_file) -> + proplists:get_value(loaded_file, env(), "./data/loaded_plugins"). + +env() -> + {ok, PluginsEnv} = application:get_env(emqttd, plugins), PluginsEnv. + From 7ba5772264ae7b82caf623112ed486ef9eb84359 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 5 Aug 2015 15:39:29 +0800 Subject: [PATCH 12/25] temporary --- src/emqttd_plugins.erl | 5 +++-- src/emqttd_session_sup.erl | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 4372ca3c7..f8bcc7696 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -213,8 +213,9 @@ read_loaded() -> write_loaded(AppNames) -> case file:open(env(loaded_file), [binary, write]) of {ok, Fd} -> - Line = list_to_binary(io_lib:format("~w.~n", [AppNames])), - file:write(Fd, Line); + lists:foreach(fun(Name) -> + file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name]))) + end, AppNames); {error, Error} -> {error, Error} end. diff --git a/src/emqttd_session_sup.erl b/src/emqttd_session_sup.erl index ff12b5055..1375c9fda 100644 --- a/src/emqttd_session_sup.erl +++ b/src/emqttd_session_sup.erl @@ -57,6 +57,6 @@ start_session(CleanSess, ClientId, ClientPid) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{session, {emqttd_session, start_link, []}, - transient, 10000, worker, [emqttd_session]}]}}. + temporary, 10000, worker, [emqttd_session]}]}}. From 45487523af2d8feb01ff622ddefad4ce154567c3 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Aug 2015 00:01:00 +0800 Subject: [PATCH 13/25] Plugin Guide --- PLUGIN.md | 4 ++++ plugins/README | 4 ++++ rel/files/loaded_plugins | 1 - rel/files/plugins.config | 27 +-------------------------- 4 files changed, 9 insertions(+), 27 deletions(-) create mode 100644 PLUGIN.md create mode 100644 plugins/README delete mode 100644 rel/files/loaded_plugins diff --git a/PLUGIN.md b/PLUGIN.md new file mode 100644 index 000000000..7991de2cd --- /dev/null +++ b/PLUGIN.md @@ -0,0 +1,4 @@ + +Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design). + + diff --git a/plugins/README b/plugins/README new file mode 100644 index 000000000..7991de2cd --- /dev/null +++ b/plugins/README @@ -0,0 +1,4 @@ + +Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design). + + diff --git a/rel/files/loaded_plugins b/rel/files/loaded_plugins deleted file mode 100644 index bf38a149a..000000000 --- a/rel/files/loaded_plugins +++ /dev/null @@ -1 +0,0 @@ -[emqttd_dashboard]. diff --git a/rel/files/plugins.config b/rel/files/plugins.config index 0a5655c66..bf38a149a 100644 --- a/rel/files/plugins.config +++ b/rel/files/plugins.config @@ -1,26 +1 @@ -[ -% {emysql, [ -% {pool_size, 4}, -% {host, "localhost"}, -% {port, 3306}, -% {username, "root"}, -% {password, "public"}, -% {database, "mqtt"}, -% {encoding, utf8} -% ]}, -% {emqttd_auth_mysql, [ -% {user_table, mqtt_users}, -% {password_hash, plain}, -% {field_mapper, [ -% {username, username}, -% {password, password} -% ]} -% ]}, -% {emqttd_dashboard, [ -% {listener, -% {emqttd_dashboard, 18083, [ -% {acceptors, 4}, -% {max_clients, 512}]}} -% ]} -% -]. +[emqttd_dashboard]. From 2e01595be731d67fd84d11f1c72a3d15b0fb31ff Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Aug 2015 00:01:25 +0800 Subject: [PATCH 14/25] plugins --- Makefile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 2d8b5b01a..b319d3928 100644 --- a/Makefile +++ b/Makefile @@ -29,11 +29,14 @@ edoc: @$(REBAR) doc rel: compile - @cd rel && ../rebar generate -f + @cd rel && $(REBAR) generate -f plugins: @for plugin in ./plugins/* ; do \ + if [ -d $${plugin} ]; then \ + echo "copy $${plugin}"; \ cp -R $${plugin} $(DIST)/plugins/ && rm -rf $(DIST)/$${plugin}/src/ ; \ + fi \ done dist: rel plugins From ef4dbd525d3833bc22eb0fc35c6098bceba13ee5 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Aug 2015 00:01:44 +0800 Subject: [PATCH 15/25] rm --- plugins/README | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 plugins/README diff --git a/plugins/README b/plugins/README deleted file mode 100644 index 7991de2cd..000000000 --- a/plugins/README +++ /dev/null @@ -1,4 +0,0 @@ - -Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design). - - From 1a67a75423374b62c1c1ac0d89687c1b08098570 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Aug 2015 00:02:19 +0800 Subject: [PATCH 16/25] rm plugins.config --- rel/files/emqttd | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/rel/files/emqttd b/rel/files/emqttd index 83b0368d0..8574571c1 100755 --- a/rel/files/emqttd +++ b/rel/files/emqttd @@ -141,13 +141,6 @@ case "$1" in echo $RES exit 1 fi - # Sanity check the plugins.config file - RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config` - if [ $? != 0 ]; then - echo "Error reading $RUNNER_ETC_DIR/plugins.config" - echo $RES - exit 1 - fi HEART_COMMAND="$RUNNER_SCRIPT_DIR/$SCRIPT start" export HEART_COMMAND mkdir -p $PIPE_DIR @@ -265,13 +258,6 @@ case "$1" in echo $RES exit 1 fi - # Sanity check the plugins.config file - RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config` - if [ $? != 0 ]; then - echo "Error reading $RUNNER_ETC_DIR/plugins.config" - echo $RES - exit 1 - fi # Setup beam-required vars ROOTDIR=$RUNNER_BASE_DIR ERL_LIBS=$ROOTDIR/plugins @@ -279,7 +265,7 @@ case "$1" in EMU=beam PROGNAME=`echo $0 | sed 's/.*\///'` CMD="$BINDIR/erlexec -boot $RUNNER_BASE_DIR/releases/$APP_VSN/$SCRIPT \ - -embedded -config $RUNNER_ETC_DIR/emqttd.config -config $RUNNER_ETC_DIR/plugins.config \ + -embedded -config $RUNNER_ETC_DIR/emqttd.config \ -pa $RUNNER_LIB_DIR/basho-patches \ -args_file $RUNNER_ETC_DIR/vm.args -- ${1+"$@"}" export EMU @@ -305,13 +291,6 @@ case "$1" in echo $RES exit 1 fi - # Sanity check the plugins.config file - RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config` - if [ $? != 0 ]; then - echo "Error reading $RUNNER_ETC_DIR/plugins.config" - echo $RES - exit 1 - fi echo "config is OK" ;; escript) From 6a7c940da2961f178b7db7c4f84e0e083802386d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Aug 2015 00:02:36 +0800 Subject: [PATCH 17/25] plugins --- rel/files/emqttd.config | 2 +- rel/files/plugins.config | 2 +- rel/reltool.config | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 7ff3e49ba..f49718ff6 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -165,7 +165,7 @@ %% Plugins {plugins, [ {dir, "./plugins"}, - {loaded_file, "./data/loaded_plugins"} + {loaded_file, "./data/plugins.config"} ]}, %% Listeners {listeners, [ diff --git a/rel/files/plugins.config b/rel/files/plugins.config index bf38a149a..68ba6a41d 100644 --- a/rel/files/plugins.config +++ b/rel/files/plugins.config @@ -1 +1 @@ -[emqttd_dashboard]. +emqttd_dashboard. diff --git a/rel/reltool.config b/rel/reltool.config index 768b793f0..19c11600a 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -87,7 +87,6 @@ {template, "files/acl.config", "etc/acl.config"}, {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, - {template, "files/plugins.config", "etc/plugins.config"}, {template, "files/vm.args", "etc/vm.args"}, - {copy, "files/loaded_plugins", "data/loaded_plugins"} + {copy, "files/plugins.config", "data/plugins.config"} ]}. From f91909703ea76d907db34e4fc39ce5b66d2f8a54 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Aug 2015 00:42:02 +0800 Subject: [PATCH 18/25] dashboard --- plugins/emqttd_dashboard | 1 + 1 file changed, 1 insertion(+) create mode 160000 plugins/emqttd_dashboard diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard new file mode 160000 index 000000000..2d3c9aeab --- /dev/null +++ b/plugins/emqttd_dashboard @@ -0,0 +1 @@ +Subproject commit 2d3c9aeabeb5289b9ae27c503f017ad71bd81174 From 0bb228be470defa4cfe0c087b89758bd87398fc9 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Aug 2015 00:43:21 +0800 Subject: [PATCH 19/25] mysql plugin --- .gitmodules | 6 ++++++ plugins/emqttd_plugin_mysql | 1 + plugins/emysql | 1 + 3 files changed, 8 insertions(+) create mode 160000 plugins/emqttd_plugin_mysql create mode 160000 plugins/emysql diff --git a/.gitmodules b/.gitmodules index 093a4a897..4015d5e02 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,9 @@ [submodule "plugins/emqttd_dashboard"] path = plugins/emqttd_dashboard url = https://github.com/emqtt/emqttd_dashboard.git +[submodule "plugins/emysql"] + path = plugins/emysql + url = https://github.com/erylee/emysql.git +[submodule "plugins/emqttd_plugin_mysql"] + path = plugins/emqttd_plugin_mysql + url = https://github.com/emqtt/emqttd_plugin_mysql.git diff --git a/plugins/emqttd_plugin_mysql b/plugins/emqttd_plugin_mysql new file mode 160000 index 000000000..892810dbc --- /dev/null +++ b/plugins/emqttd_plugin_mysql @@ -0,0 +1 @@ +Subproject commit 892810dbc853ba147f9acabecd52bb51218275e2 diff --git a/plugins/emysql b/plugins/emysql new file mode 160000 index 000000000..38927104b --- /dev/null +++ b/plugins/emysql @@ -0,0 +1 @@ +Subproject commit 38927104b44b3f8d237bcf3a2b50f2e0608291b3 From acb9e2a7cf3c8d37da1f013d13066c1936560156 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 6 Aug 2015 00:55:18 +0800 Subject: [PATCH 20/25] rm --- rel/files/loaded_plugins | 1 - 1 file changed, 1 deletion(-) delete mode 100644 rel/files/loaded_plugins diff --git a/rel/files/loaded_plugins b/rel/files/loaded_plugins deleted file mode 100644 index bf38a149a..000000000 --- a/rel/files/loaded_plugins +++ /dev/null @@ -1 +0,0 @@ -[emqttd_dashboard]. From 207c50bf1481e180a0c8df980cbf3a347615e195 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 7 Aug 2015 16:34:28 +0800 Subject: [PATCH 21/25] lib_dirs --- rebar.config | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rebar.config b/rebar.config index fb8d1bddc..37f3894b9 100644 --- a/rebar.config +++ b/rebar.config @@ -18,6 +18,10 @@ {validate_app_modules, true}. +%% plugins cannot find emqttd.hrl without ".." lib dirs:( +%% but this setting will make deps apps collision +{lib_dirs, ["../"]}. + {sub_dirs, [ "rel", "plugins/*/"]}. From 31a85ff0d69c1079a055e153efe0a1909df848bc Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 7 Aug 2015 16:40:54 +0800 Subject: [PATCH 22/25] etc --- plugins/emqttd_plugin_demo/etc/plugin.config | 1 + rel/files/{plugins.config => loaded_plugins} | 0 2 files changed, 1 insertion(+) create mode 100644 plugins/emqttd_plugin_demo/etc/plugin.config rename rel/files/{plugins.config => loaded_plugins} (100%) diff --git a/plugins/emqttd_plugin_demo/etc/plugin.config b/plugins/emqttd_plugin_demo/etc/plugin.config new file mode 100644 index 000000000..57afcca04 --- /dev/null +++ b/plugins/emqttd_plugin_demo/etc/plugin.config @@ -0,0 +1 @@ +[]. diff --git a/rel/files/plugins.config b/rel/files/loaded_plugins similarity index 100% rename from rel/files/plugins.config rename to rel/files/loaded_plugins From c6ccacd990196a0473c38684278bc80c7bf39170 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 7 Aug 2015 17:24:03 +0800 Subject: [PATCH 23/25] plugins --- include/emqttd.hrl | 1 + plugins/emqttd_plugin_mysql | 2 +- rel/files/emqttd.config | 7 +- rel/reltool.config | 2 +- src/emqttd_plugins.erl | 219 +++++++++++++++++++++++------------- 5 files changed, 150 insertions(+), 81 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index fc5ade8e8..6c4b55776 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -149,6 +149,7 @@ name, version, descr, + config, active = false }). diff --git a/plugins/emqttd_plugin_mysql b/plugins/emqttd_plugin_mysql index 892810dbc..04baf44a4 160000 --- a/plugins/emqttd_plugin_mysql +++ b/plugins/emqttd_plugin_mysql @@ -1 +1 @@ -Subproject commit 892810dbc853ba147f9acabecd52bb51218275e2 +Subproject commit 04baf44a465c1513e75dfdcc2f6507e7a315d2d5 diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index f49718ff6..9ee5fcb29 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -164,8 +164,11 @@ ]}, %% Plugins {plugins, [ - {dir, "./plugins"}, - {loaded_file, "./data/plugins.config"} + %% Plugin App Library Dir + {plugins_dir, "./plugins"}, + + %% File to store loaded plugin names. + {loaded_file, "./data/loaded_plugins"} ]}, %% Listeners {listeners, [ diff --git a/rel/reltool.config b/rel/reltool.config index 19c11600a..72dfede6e 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -88,5 +88,5 @@ {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, {template, "files/vm.args", "etc/vm.args"}, - {copy, "files/plugins.config", "data/plugins.config"} + {copy, "files/loaded_plugins", "data/loaded_plugins"} ]}. diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index f8bcc7696..9f3a9e2ef 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd plugin admin. +%%% emqttd plugins. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -33,29 +33,42 @@ -export([load/0, unload/0]). --export([list/0, load/1, unload/1]). +-export([load/1, unload/1]). + +-export([list/0]). %%------------------------------------------------------------------------------ %% @doc Load all plugins when the broker started. %% @end %%------------------------------------------------------------------------------ + -spec load() -> list() | {error, any()}. load() -> - case read_loaded() of - {ok, LoadNames} -> - NotFound = LoadNames -- apps(plugin), - case NotFound of - [] -> ok; - NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) - end, - start_apps(LoadNames -- NotFound -- apps(started)); + case env(loaded_file) of + {ok, File} -> + with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); + undefined -> + %% No plugins available + ignore + end. + +with_loaded_file(File, SuccFun) -> + case read_loaded(File) of + {ok, Names} -> + SuccFun(Names); {error, Error} -> - lager:error("Read loaded_plugins file error: ~p", [Error]), + lager:error("Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. -start_apps(Apps) -> - [start_app(App) || App <- Apps]. +load_plugins(Names, Persistent) -> + Plugins = list(), NotFound = Names -- names(Plugins), + case NotFound of + [] -> ok; + NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) + end, + NeedToLoad = Names -- NotFound -- names(started_app), + [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. %%------------------------------------------------------------------------------ %% @doc Unload all plugins before broker stopped. @@ -63,16 +76,16 @@ start_apps(Apps) -> %%------------------------------------------------------------------------------ -spec unload() -> list() | {error, any()}. unload() -> - case read_loaded() of - {ok, LoadNames} -> - stop_apps(LoadNames); - {error, Error} -> - lager:error("Read loaded_plugins file error: ~p", [Error]), - {error, Error} + case env(loaded_file) of + {ok, File} -> + with_loaded_file(File, fun(Names) -> stop_plugins(Names) end); + undefined -> + ignore end. -stop_apps(Apps) -> - [stop_app(App) || App <- Apps]. +%% stop plugins +stop_plugins(Names) -> + [stop_app(App) || App <- Names]. %%------------------------------------------------------------------------------ %% @doc List all available plugins @@ -80,82 +93,119 @@ stop_apps(Apps) -> %%------------------------------------------------------------------------------ -spec list() -> [mqtt_plugin()]. list() -> - PluginsDir = env(dir), - AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), - Plugins = [plugin(filename:join(PluginsDir, AppFile)) || AppFile <- AppFiles], - StartedApps = apps(started), - lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> - case lists:member(Name, StartedApps) of - true -> Plugin#mqtt_plugin{active = true}; - false -> Plugin - end - end, Plugins). + case env(plugins_dir) of + {ok, PluginsDir} -> + AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), + Plugins = [plugin(PluginsDir, AppFile) || AppFile <- AppFiles], + StartedApps = names(started_app), + lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> + case lists:member(Name, StartedApps) of + true -> Plugin#mqtt_plugin{active = true}; + false -> Plugin + end + end, Plugins); + undefined -> + [] + end. -plugin(AppFile) -> +plugin(PluginsDir, AppFile0) -> + AppFile = filename:join(PluginsDir, AppFile0), {ok, [{application, Name, Attrs}]} = file:consult(AppFile), + CfgFile = filename:join([PluginsDir, Name, "etc/plugin.config"]), + AppsEnv1 = + case filelib:is_file(CfgFile) of + true -> + {ok, [AppsEnv]} = file:consult(CfgFile), + AppsEnv; + false -> + [] + end, Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #mqtt_plugin{name = Name, version = Ver, descr = Descr}. + #mqtt_plugin{name = Name, version = Ver, config = AppsEnv1, descr = Descr}. %%------------------------------------------------------------------------------ %% @doc Load One Plugin %% @end %%------------------------------------------------------------------------------ + -spec load(atom()) -> ok | {error, any()}. load(PluginName) when is_atom(PluginName) -> - case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of - {true, _} -> - lager:error("plugin ~p is started", [PluginName]), + case lists:member(PluginName, names(started_app)) of + true -> + lager:error("Plugin ~p is already started", [PluginName]), {error, already_started}; - {false, true} -> - load_plugin(PluginName); - {false, false} -> - lager:error("plugin ~p is not found", [PluginName]), - {error, not_found} + false -> + case find_plugin(PluginName) of + false -> + lager:error("Plugin ~s not found", [PluginName]), + {error, not_found}; + Plugin -> + load_plugin(Plugin, true) + end end. - --spec load_plugin(App :: atom()) -> {ok, list()} | {error, any()}. -load_plugin(PluginName) -> - case start_app(PluginName) of - {ok, Started} -> - plugin_loaded(PluginName), - {ok, Started}; + +load_plugin(#mqtt_plugin{name = Name, config = Config}, Persistent) -> + case load_app(Name, Config) of + ok -> + start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); {error, Error} -> {error, Error} end. -start_app(App) -> +load_app(App, Config) -> + case application:load(App) of + ok -> + set_config(Config); + {error, Error} -> + {error, Error} + end. + +%% This trick is awesome:) +set_config([]) -> + ok; +set_config([{AppName, Envs} | Config]) -> + [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs], + set_config(Config). + +start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - lager:info("started apps: ~p, load plugin ~p successfully", [Started, App]), + lager:info("Started Apps: ~p, load plugin ~p successfully", [Started, App]), + SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> lager:error("load plugin ~p error, cannot start app ~s for ~p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} end. +find_plugin(Name) -> + find_plugin(Name, list()). + +find_plugin(Name, Plugins) -> + lists:keyfind(Name, 2, Plugins). + %%------------------------------------------------------------------------------ %% @doc UnLoad One Plugin %% @end %%------------------------------------------------------------------------------ -spec unload(atom()) -> ok | {error, any()}. unload(PluginName) when is_atom(PluginName) -> - case {lists:member(PluginName, apps(started)), lists:member(PluginName, apps(plugin))} of - {false, _} -> - lager:error("plugin ~p is not started", [PluginName]), - {error, not_started}; + case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of {true, true} -> - unload_plugin(PluginName); + unload_plugin(PluginName, true); + {false, _} -> + lager:error("Plugin ~p is not started", [PluginName]), + {error, not_started}; {true, false} -> lager:error("~s is not a plugin, cannot unload it", [PluginName]), {error, not_found} end. --spec unload_plugin(App :: atom()) -> ok | {error, any()}. -unload_plugin(App) -> +unload_plugin(App, Persistent) -> case stop_app(App) of ok -> - plugin_unloaded(App), ok; + plugin_unloaded(App, Persistent), ok; {error, Reason} -> {error, Reason} end. @@ -174,27 +224,34 @@ stop_app(App) -> %%% Internal functions %%%============================================================================= -apps(plugin) -> - [Name || #mqtt_plugin{name = Name} <- list()]; +names(plugin) -> + names(list()); -apps(started) -> - [Name || {Name, _Descr, _Ver} <- application:which_applications()]. +names(started_app) -> + [Name || {Name, _Descr, _Ver} <- application:which_applications()]; -plugin_loaded(Name) -> +names(Plugins) -> + [Name || #mqtt_plugin{name = Name} <- Plugins]. + +plugin_loaded(_Name, false) -> + ok; +plugin_loaded(Name, true) -> case read_loaded() of {ok, Names} -> case lists:member(Name, Names) of - true -> - ignore; false -> %% write file if plugin is loaded - write_loaded(lists:append(Names, Name)) + write_loaded(lists:append(Names, Name)); + true -> + ignore end; {error, Error} -> lager:error("Cannot read loaded plugins: ~p", [Error]) end. -plugin_unloaded(Name) -> +plugin_unloaded(_Name, false) -> + ok; +plugin_unloaded(Name, true) -> case read_loaded() of {ok, Names} -> case lists:member(Name, Names) of @@ -204,11 +261,15 @@ plugin_unloaded(Name) -> lager:error("Cannot find ~s in loaded_file", [Name]) end; {error, Error} -> - lager:error("Cannot read loaded plugins: ~p", [Error]) + lager:error("Cannot read loaded_plugins: ~p", [Error]) end. read_loaded() -> - file:consult(env(loaded_file)). + {ok, File} = env(loaded_file), + read_loaded(File). + +read_loaded(File) -> + file:consult(File). write_loaded(AppNames) -> case file:open(env(loaded_file), [binary, write]) of @@ -220,12 +281,16 @@ write_loaded(AppNames) -> {error, Error} end. -env(dir) -> - proplists:get_value(dir, env(), "./plugins"); - -env(loaded_file) -> - proplists:get_value(loaded_file, env(), "./data/loaded_plugins"). - -env() -> - {ok, PluginsEnv} = application:get_env(emqttd, plugins), PluginsEnv. +env(Name) -> + case application:get_env(emqttd, plugins) of + {ok, PluginsEnv} -> + case proplists:get_value(Name, PluginsEnv) of + undefined -> + undefined; + Val -> + {ok, Val} + end; + undefined -> + undefined + end. From 0f602d67ba71d4f4a769359e1c3c0877266975c6 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 8 Aug 2015 18:47:04 +0800 Subject: [PATCH 24/25] fix plugins code bug --- src/emqttd_ctl.erl | 8 ++++---- src/emqttd_plugins.erl | 8 ++++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 048af3431..201641407 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -184,14 +184,14 @@ plugins(["list"]) -> plugins(["load", Name]) -> case emqttd_plugins:load(list_to_atom(Name)) of - {ok, StartedApps} -> ?PRINT("start apps: ~p, plugin ~s is loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> ?PRINT("load plugin error: ~s~n", [Reason]) + {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) end; plugins(["unload", Name]) -> case emqttd_plugins:unload(list_to_atom(Name)) of - ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("unload plugin error: ~s~n", [Reason]) + ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); + {error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason]) end. trace(["list"]) -> diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 9f3a9e2ef..b3ccc122d 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -157,6 +157,8 @@ load_app(App, Config) -> case application:load(App) of ok -> set_config(Config); + {error, {already_loaded, App}} -> + set_config(Config); {error, Error} -> {error, Error} end. @@ -241,7 +243,7 @@ plugin_loaded(Name, true) -> case lists:member(Name, Names) of false -> %% write file if plugin is loaded - write_loaded(lists:append(Names, Name)); + write_loaded(lists:append(Names, [Name])); true -> ignore end; @@ -272,12 +274,14 @@ read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> - case file:open(env(loaded_file), [binary, write]) of + {ok, File} = env(loaded_file), + case file:open(File, [binary, write]) of {ok, Fd} -> lists:foreach(fun(Name) -> file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name]))) end, AppNames); {error, Error} -> + lager:error("Open File ~p Error: ~p", [File, Error]), {error, Error} end. From 0f06fdd1c4738b6543448a54fd4de9884b4e129a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 8 Aug 2015 19:33:14 +0800 Subject: [PATCH 25/25] 0.10.0 --- src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 9aae391fb..bbde5e17f 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.9.3"}, + {vsn, "0.10.0"}, {modules, []}, {registered, []}, {applications, [kernel,