diff --git a/.gitmodules b/.gitmodules index 093a4a897..4015d5e02 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,9 @@ [submodule "plugins/emqttd_dashboard"] path = plugins/emqttd_dashboard url = https://github.com/emqtt/emqttd_dashboard.git +[submodule "plugins/emysql"] + path = plugins/emysql + url = https://github.com/erylee/emysql.git +[submodule "plugins/emqttd_plugin_mysql"] + path = plugins/emqttd_plugin_mysql + url = https://github.com/emqtt/emqttd_plugin_mysql.git diff --git a/Makefile b/Makefile index a3b509386..b319d3928 100644 --- a/Makefile +++ b/Makefile @@ -29,11 +29,14 @@ edoc: @$(REBAR) doc rel: compile - @cd rel && ../rebar generate -f + @cd rel && $(REBAR) generate -f plugins: @for plugin in ./plugins/* ; do \ + if [ -d $${plugin} ]; then \ + echo "copy $${plugin}"; \ cp -R $${plugin} $(DIST)/plugins/ && rm -rf $(DIST)/$${plugin}/src/ ; \ + fi \ done dist: rel plugins @@ -44,12 +47,12 @@ APPS = erts kernel stdlib sasl crypto ssl os_mon syntax_tools \ check_plt: compile dialyzer --check_plt --plt $(PLT) --apps $(APPS) \ - deps/*/ebin apps/*/ebin + deps/*/ebin ./ebin build_plt: compile dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) \ - deps/*/ebin apps/*/ebin + deps/*/ebin ./ebin dialyzer: compile - dialyzer -Wno_return --plt $(PLT) deps/*/ebin apps/*/ebin + dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin diff --git a/PLUGIN.md b/PLUGIN.md new file mode 100644 index 000000000..7991de2cd --- /dev/null +++ b/PLUGIN.md @@ -0,0 +1,4 @@ + +Please see [Plugin Design](https://github.com/emqtt/emqttd/wiki/Plugin%20Design). + + diff --git a/include/emqttd.hrl b/include/emqttd.hrl index d142fdd7b..6c4b55776 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -129,18 +129,6 @@ -type mqtt_message() :: #mqtt_message{}. -%%------------------------------------------------------------------------------ -%% MQTT Plugin -%%------------------------------------------------------------------------------ --record(mqtt_plugin, { - name, - version, - attrs, - description -}). - --type mqtt_plugin() :: #mqtt_plugin{}. - %%------------------------------------------------------------------------------ %% MQTT Alarm %%------------------------------------------------------------------------------ @@ -154,3 +142,31 @@ -type mqtt_alarm() :: #mqtt_alarm{}. +%%------------------------------------------------------------------------------ +%% MQTT Plugin +%%------------------------------------------------------------------------------ +-record(mqtt_plugin, { + name, + version, + descr, + config, + active = false +}). + +-type mqtt_plugin() :: #mqtt_plugin{}. + +%%------------------------------------------------------------------------------ +%% MQTT CLI Command +%% For example: 'broker metrics' +%%------------------------------------------------------------------------------ +-record(mqtt_cli, { + name, + action, + args = [], + opts = [], + usage, + descr +}). + +-type mqtt_cli() :: #mqtt_cli{}. + diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard new file mode 160000 index 000000000..2d3c9aeab --- /dev/null +++ b/plugins/emqttd_dashboard @@ -0,0 +1 @@ +Subproject commit 2d3c9aeabeb5289b9ae27c503f017ad71bd81174 diff --git a/plugins/emqttd_plugin_demo/etc/plugin.config b/plugins/emqttd_plugin_demo/etc/plugin.config new file mode 100644 index 000000000..57afcca04 --- /dev/null +++ b/plugins/emqttd_plugin_demo/etc/plugin.config @@ -0,0 +1 @@ +[]. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src new file mode 100644 index 000000000..1fd8ce62d --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src @@ -0,0 +1,12 @@ +{application, emqttd_plugin_demo, + [ + {description, ""}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { emqttd_plugin_demo_app, []}}, + {env, []} + ]}. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl new file mode 100644 index 000000000..9042a449a --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl @@ -0,0 +1,16 @@ +-module(emqttd_plugin_demo_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + emqttd_plugin_demo_sup:start_link(). + +stop(_State) -> + ok. diff --git a/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl new file mode 100644 index 000000000..65dad7a60 --- /dev/null +++ b/plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl @@ -0,0 +1,27 @@ +-module(emqttd_plugin_demo_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, []} }. + diff --git a/plugins/emqttd_plugin_mysql b/plugins/emqttd_plugin_mysql new file mode 160000 index 000000000..04baf44a4 --- /dev/null +++ b/plugins/emqttd_plugin_mysql @@ -0,0 +1 @@ +Subproject commit 04baf44a465c1513e75dfdcc2f6507e7a315d2d5 diff --git a/plugins/emysql b/plugins/emysql new file mode 160000 index 000000000..38927104b --- /dev/null +++ b/plugins/emysql @@ -0,0 +1 @@ +Subproject commit 38927104b44b3f8d237bcf3a2b50f2e0608291b3 diff --git a/rebar.config b/rebar.config index fb8d1bddc..37f3894b9 100644 --- a/rebar.config +++ b/rebar.config @@ -18,6 +18,10 @@ {validate_app_modules, true}. +%% plugins cannot find emqttd.hrl without ".." lib dirs:( +%% but this setting will make deps apps collision +{lib_dirs, ["../"]}. + {sub_dirs, [ "rel", "plugins/*/"]}. diff --git a/rel/files/emqttd b/rel/files/emqttd index 83b0368d0..8574571c1 100755 --- a/rel/files/emqttd +++ b/rel/files/emqttd @@ -141,13 +141,6 @@ case "$1" in echo $RES exit 1 fi - # Sanity check the plugins.config file - RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config` - if [ $? != 0 ]; then - echo "Error reading $RUNNER_ETC_DIR/plugins.config" - echo $RES - exit 1 - fi HEART_COMMAND="$RUNNER_SCRIPT_DIR/$SCRIPT start" export HEART_COMMAND mkdir -p $PIPE_DIR @@ -265,13 +258,6 @@ case "$1" in echo $RES exit 1 fi - # Sanity check the plugins.config file - RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config` - if [ $? != 0 ]; then - echo "Error reading $RUNNER_ETC_DIR/plugins.config" - echo $RES - exit 1 - fi # Setup beam-required vars ROOTDIR=$RUNNER_BASE_DIR ERL_LIBS=$ROOTDIR/plugins @@ -279,7 +265,7 @@ case "$1" in EMU=beam PROGNAME=`echo $0 | sed 's/.*\///'` CMD="$BINDIR/erlexec -boot $RUNNER_BASE_DIR/releases/$APP_VSN/$SCRIPT \ - -embedded -config $RUNNER_ETC_DIR/emqttd.config -config $RUNNER_ETC_DIR/plugins.config \ + -embedded -config $RUNNER_ETC_DIR/emqttd.config \ -pa $RUNNER_LIB_DIR/basho-patches \ -args_file $RUNNER_ETC_DIR/vm.args -- ${1+"$@"}" export EMU @@ -305,13 +291,6 @@ case "$1" in echo $RES exit 1 fi - # Sanity check the plugins.config file - RES=`$NODETOOL_LITE chkconfig $RUNNER_ETC_DIR/plugins.config` - if [ $? != 0 ]; then - echo "Error reading $RUNNER_ETC_DIR/plugins.config" - echo $RES - exit 1 - fi echo "config is OK" ;; escript) diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index ab252b9a0..9ee5fcb29 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -153,15 +153,23 @@ {modules, [ %% Client presence management module. %% Publish messages when client connected or disconnected - {presence, [{qos, 0}]}, + {presence, [{qos, 0}]} %% Subscribe topics automatically when client connected - {autosub, [{"$Q/client/$c", 0}]} + %% {autosub, [{"$Q/client/$c", 0}]} %% Rewrite rules %% {rewrite, [{file, "etc/rewrite.config"}]} ]}, + %% Plugins + {plugins, [ + %% Plugin App Library Dir + {plugins_dir, "./plugins"}, + + %% File to store loaded plugin names. + {loaded_file, "./data/loaded_plugins"} + ]}, %% Listeners {listeners, [ {mqtt, 1883, [ diff --git a/rel/files/loaded_plugins b/rel/files/loaded_plugins new file mode 100644 index 000000000..68ba6a41d --- /dev/null +++ b/rel/files/loaded_plugins @@ -0,0 +1 @@ +emqttd_dashboard. diff --git a/rel/files/plugins.config b/rel/files/plugins.config deleted file mode 100644 index 0a5655c66..000000000 --- a/rel/files/plugins.config +++ /dev/null @@ -1,26 +0,0 @@ -[ -% {emysql, [ -% {pool_size, 4}, -% {host, "localhost"}, -% {port, 3306}, -% {username, "root"}, -% {password, "public"}, -% {database, "mqtt"}, -% {encoding, utf8} -% ]}, -% {emqttd_auth_mysql, [ -% {user_table, mqtt_users}, -% {password_hash, plain}, -% {field_mapper, [ -% {username, username}, -% {password, password} -% ]} -% ]}, -% {emqttd_dashboard, [ -% {listener, -% {emqttd_dashboard, 18083, [ -% {acceptors, 4}, -% {max_clients, 512}]}} -% ]} -% -]. diff --git a/rel/reltool.config b/rel/reltool.config index 42e4755ef..72dfede6e 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -87,6 +87,6 @@ {template, "files/acl.config", "etc/acl.config"}, {template, "files/rewrite.config", "etc/rewrite.config"}, {template, "files/clients.config", "etc/clients.config"}, - {template, "files/plugins.config", "etc/plugins.config"}, - {template, "files/vm.args", "etc/vm.args"} + {template, "files/vm.args", "etc/vm.args"}, + {copy, "files/loaded_plugins", "data/loaded_plugins"} ]}. diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 9aae391fb..bbde5e17f 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.9.3"}, + {vsn, "0.10.0"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/src/emqttd.erl b/src/emqttd.erl index 074e8f3e2..e48aee560 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -30,10 +30,7 @@ -export([start/0, env/1, env/2, open_listeners/1, close_listeners/1, - load_all_plugins/0, unload_all_plugins/0, - load_plugin/1, unload_plugin/1, load_all_mods/0, is_mod_enabled/1, - loaded_plugins/0, is_running/1]). -define(MQTT_SOCKOPTS, [ @@ -112,106 +109,6 @@ close_listeners(Listeners) when is_list(Listeners) -> close_listener({Protocol, Port, _Options}) -> esockd:close({Protocol, Port}). -%%------------------------------------------------------------------------------ -%% @doc Load all plugins -%% @end -%%------------------------------------------------------------------------------ --spec load_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -load_all_plugins() -> - %% save first - case file:consult("etc/plugins.config") of - {ok, [PluginApps]} -> - application:set_env(emqttd, plugins, [App || {App, _Env} <- PluginApps]), - [{App, load_plugin(App)} || {App, _Env} <- PluginApps]; - {error, enoent} -> - lager:error("etc/plugins.config not found!"); - {error, Error} -> - lager:error("Load etc/plugins.config error: ~p", [Error]) - end. - -%%------------------------------------------------------------------------------ -%% @doc Load plugin -%% @end -%%------------------------------------------------------------------------------ --spec load_plugin(App :: atom()) -> ok | {error, any()}. -load_plugin(App) -> - case load_app(App) of - ok -> - start_app(App); - {error, Reason} -> - {error, Reason} - end. - -load_app(App) -> - case application:load(App) of - ok -> - lager:info("load plugin ~p successfully", [App]), ok; - {error, {already_loaded, App}} -> - lager:info("load plugin ~p is already loaded", [App]), ok; - {error, Reason} -> - lager:error("load plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -start_app(App) -> - case application:start(App) of - ok -> - lager:info("start plugin ~p successfully", [App]), ok; - {error, {already_started, App}} -> - lager:error("plugin ~p is already started", [App]), ok; - {error, Reason} -> - lager:error("start plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. - -%%------------------------------------------------------------------------------ -%% @doc Loaded plugins -%% @end -%%------------------------------------------------------------------------------ -loaded_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [App || App = {Name, _Descr, _Vsn} <- application:which_applications(), - lists:member(Name, PluginApps)]. - -%%------------------------------------------------------------------------------ -%% @doc Unload all plugins -%% @end -%%------------------------------------------------------------------------------ --spec unload_all_plugins() -> [{App :: atom(), ok | {error, any()}}]. -unload_all_plugins() -> - PluginApps = application:get_env(emqttd, plugins, []), - [{App, unload_plugin(App)} || App <- PluginApps]. - -%%------------------------------------------------------------------------------ -%% @doc Unload plugin -%% @end -%%------------------------------------------------------------------------------ --spec unload_plugin(App :: atom()) -> ok | {error, any()}. -unload_plugin(App) -> - case stop_app(App) of - ok -> - unload_app(App); - {error, Reason} -> - {error, Reason} - end. - -stop_app(App) -> - case application:stop(App) of - ok -> - lager:info("stop plugin ~p successfully~n", [App]), ok; - {error, {not_started, App}} -> - lager:error("plugin ~p is not started~n", [App]), ok; - {error, Reason} -> - lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} - end. - -unload_app(App) -> - case application:unload(App) of - ok -> - lager:info("unload plugin ~p successfully~n", [App]), ok; - {error, {not_loaded, App}} -> - lager:info("load plugin ~p is not loaded~n", [App]), ok; - {error, Reason} -> - lager:error("unload plugin ~p error: ~p", [App, Reason]), {error, Reason} - end. load_all_mods() -> Mods = application:get_env(emqttd, modules, []), diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index f9434b385..159062b16 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -52,9 +52,8 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd:load_all_mods(), - emqttd:load_all_plugins(), - {ok, Listeners} = application:get_env(listeners), - emqttd:open_listeners(Listeners), + emqttd_plugins:load(), + start_listeners(), register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -67,6 +66,10 @@ print_vsn() -> {ok, Desc} = application:get_key(description), ?PRINT("~s ~s is running now~n", [Desc, Vsn]). +start_listeners() -> + {ok, Listeners} = application:get_env(listeners), + emqttd:open_listeners(Listeners). + start_servers(Sup) -> Servers = [{"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, @@ -131,14 +134,23 @@ worker_spec(Name, Opts) -> %% close all listeners first... prep_stop(State) -> - %%TODO: esockd app should be running... - {ok, Listeners} = application:get_env(listeners), - emqttd:close_listeners(Listeners), + stop_listeners(), + timer:sleep(2), + emqttd_plugins:unload(), timer:sleep(2), State. +stop_listeners() -> + %% ensure that esockd applications is started? + case lists:keyfind(esockd, 1, application:which_applications()) of + false -> + ignore; + _Tuple -> + {ok, Listeners} = application:get_env(listeners), + emqttd:close_listeners(Listeners) + end. + -spec stop(State :: term()) -> term(). stop(_State) -> ok. - diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index acccb5b5a..201641407 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -68,7 +68,7 @@ status([]) -> %% @end %%------------------------------------------------------------------------------ cluster([]) -> - Nodes = [node()|nodes()], + Nodes = emqttd_broker:running_nodes(), ?PRINT("cluster nodes: ~p~n", [Nodes]); cluster([SNode]) -> @@ -77,8 +77,6 @@ cluster([SNode]) -> pong -> case emqttd:is_running(Node) of true -> - %%TODO: should not unload here. - emqttd:unload_all_plugins(), application:stop(emqttd), application:stop(esockd), application:stop(gproc), @@ -180,19 +178,20 @@ bridges(["stop", SNode, Topic]) -> end. plugins(["list"]) -> - Plugins = emqttd:loaded_plugins(), - lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins); + lists:foreach(fun(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> + ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", [Name, Ver, Descr, Active]) + end, emqttd_plugins:list()); plugins(["load", Name]) -> - case emqttd:load_plugin(list_to_atom(Name)) of - ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) + case emqttd_plugins:load(list_to_atom(Name)) of + {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) end; plugins(["unload", Name]) -> - case emqttd:unload_plugin(list_to_atom(Name)) of - ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]); - {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) + case emqttd_plugins:unload(list_to_atom(Name)) of + ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); + {error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason]) end. trace(["list"]) -> diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl new file mode 100644 index 000000000..b3ccc122d --- /dev/null +++ b/src/emqttd_plugins.erl @@ -0,0 +1,300 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd plugins. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_plugins). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-export([load/0, unload/0]). + +-export([load/1, unload/1]). + +-export([list/0]). + +%%------------------------------------------------------------------------------ +%% @doc Load all plugins when the broker started. +%% @end +%%------------------------------------------------------------------------------ + +-spec load() -> list() | {error, any()}. +load() -> + case env(loaded_file) of + {ok, File} -> + with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); + undefined -> + %% No plugins available + ignore + end. + +with_loaded_file(File, SuccFun) -> + case read_loaded(File) of + {ok, Names} -> + SuccFun(Names); + {error, Error} -> + lager:error("Failed to read: ~p, error: ~p", [File, Error]), + {error, Error} + end. + +load_plugins(Names, Persistent) -> + Plugins = list(), NotFound = Names -- names(Plugins), + case NotFound of + [] -> ok; + NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) + end, + NeedToLoad = Names -- NotFound -- names(started_app), + [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins before broker stopped. +%% @end +%%------------------------------------------------------------------------------ +-spec unload() -> list() | {error, any()}. +unload() -> + case env(loaded_file) of + {ok, File} -> + with_loaded_file(File, fun(Names) -> stop_plugins(Names) end); + undefined -> + ignore + end. + +%% stop plugins +stop_plugins(Names) -> + [stop_app(App) || App <- Names]. + +%%------------------------------------------------------------------------------ +%% @doc List all available plugins +%% @end +%%------------------------------------------------------------------------------ +-spec list() -> [mqtt_plugin()]. +list() -> + case env(plugins_dir) of + {ok, PluginsDir} -> + AppFiles = filelib:wildcard("*/ebin/*.app", PluginsDir), + Plugins = [plugin(PluginsDir, AppFile) || AppFile <- AppFiles], + StartedApps = names(started_app), + lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> + case lists:member(Name, StartedApps) of + true -> Plugin#mqtt_plugin{active = true}; + false -> Plugin + end + end, Plugins); + undefined -> + [] + end. + +plugin(PluginsDir, AppFile0) -> + AppFile = filename:join(PluginsDir, AppFile0), + {ok, [{application, Name, Attrs}]} = file:consult(AppFile), + CfgFile = filename:join([PluginsDir, Name, "etc/plugin.config"]), + AppsEnv1 = + case filelib:is_file(CfgFile) of + true -> + {ok, [AppsEnv]} = file:consult(CfgFile), + AppsEnv; + false -> + [] + end, + Ver = proplists:get_value(vsn, Attrs, "0"), + Descr = proplists:get_value(description, Attrs, ""), + #mqtt_plugin{name = Name, version = Ver, config = AppsEnv1, descr = Descr}. + +%%------------------------------------------------------------------------------ +%% @doc Load One Plugin +%% @end +%%------------------------------------------------------------------------------ + +-spec load(atom()) -> ok | {error, any()}. +load(PluginName) when is_atom(PluginName) -> + case lists:member(PluginName, names(started_app)) of + true -> + lager:error("Plugin ~p is already started", [PluginName]), + {error, already_started}; + false -> + case find_plugin(PluginName) of + false -> + lager:error("Plugin ~s not found", [PluginName]), + {error, not_found}; + Plugin -> + load_plugin(Plugin, true) + end + end. + +load_plugin(#mqtt_plugin{name = Name, config = Config}, Persistent) -> + case load_app(Name, Config) of + ok -> + start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); + {error, Error} -> + {error, Error} + end. + +load_app(App, Config) -> + case application:load(App) of + ok -> + set_config(Config); + {error, {already_loaded, App}} -> + set_config(Config); + {error, Error} -> + {error, Error} + end. + +%% This trick is awesome:) +set_config([]) -> + ok; +set_config([{AppName, Envs} | Config]) -> + [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs], + set_config(Config). + +start_app(App, SuccFun) -> + case application:ensure_all_started(App) of + {ok, Started} -> + lager:info("Started Apps: ~p, load plugin ~p successfully", [Started, App]), + SuccFun(App), + {ok, Started}; + {error, {ErrApp, Reason}} -> + lager:error("load plugin ~p error, cannot start app ~s for ~p", [App, ErrApp, Reason]), + {error, {ErrApp, Reason}} + end. + +find_plugin(Name) -> + find_plugin(Name, list()). + +find_plugin(Name, Plugins) -> + lists:keyfind(Name, 2, Plugins). + +%%------------------------------------------------------------------------------ +%% @doc UnLoad One Plugin +%% @end +%%------------------------------------------------------------------------------ +-spec unload(atom()) -> ok | {error, any()}. +unload(PluginName) when is_atom(PluginName) -> + case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of + {true, true} -> + unload_plugin(PluginName, true); + {false, _} -> + lager:error("Plugin ~p is not started", [PluginName]), + {error, not_started}; + {true, false} -> + lager:error("~s is not a plugin, cannot unload it", [PluginName]), + {error, not_found} + end. + +unload_plugin(App, Persistent) -> + case stop_app(App) of + ok -> + plugin_unloaded(App, Persistent), ok; + {error, Reason} -> + {error, Reason} + end. + +stop_app(App) -> + case application:stop(App) of + ok -> + lager:info("stop plugin ~p successfully~n", [App]), ok; + {error, {not_started, App}} -> + lager:error("plugin ~p is not started~n", [App]), ok; + {error, Reason} -> + lager:error("stop plugin ~p error: ~p", [App]), {error, Reason} + end. + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +names(plugin) -> + names(list()); + +names(started_app) -> + [Name || {Name, _Descr, _Ver} <- application:which_applications()]; + +names(Plugins) -> + [Name || #mqtt_plugin{name = Name} <- Plugins]. + +plugin_loaded(_Name, false) -> + ok; +plugin_loaded(Name, true) -> + case read_loaded() of + {ok, Names} -> + case lists:member(Name, Names) of + false -> + %% write file if plugin is loaded + write_loaded(lists:append(Names, [Name])); + true -> + ignore + end; + {error, Error} -> + lager:error("Cannot read loaded plugins: ~p", [Error]) + end. + +plugin_unloaded(_Name, false) -> + ok; +plugin_unloaded(Name, true) -> + case read_loaded() of + {ok, Names} -> + case lists:member(Name, Names) of + true -> + write_loaded(lists:delete(Name, Names)); + false -> + lager:error("Cannot find ~s in loaded_file", [Name]) + end; + {error, Error} -> + lager:error("Cannot read loaded_plugins: ~p", [Error]) + end. + +read_loaded() -> + {ok, File} = env(loaded_file), + read_loaded(File). + +read_loaded(File) -> + file:consult(File). + +write_loaded(AppNames) -> + {ok, File} = env(loaded_file), + case file:open(File, [binary, write]) of + {ok, Fd} -> + lists:foreach(fun(Name) -> + file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name]))) + end, AppNames); + {error, Error} -> + lager:error("Open File ~p Error: ~p", [File, Error]), + {error, Error} + end. + +env(Name) -> + case application:get_env(emqttd, plugins) of + {ok, PluginsEnv} -> + case proplists:get_value(Name, PluginsEnv) of + undefined -> + undefined; + Val -> + {ok, Val} + end; + undefined -> + undefined + end. + diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 3f0245250..147c19ee5 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -75,7 +75,7 @@ handle_call(_Req, _From, State) -> {reply, ok, State}. handle_cast({async_submit, Fun}, State) -> - run(Fun), + try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p", [Error]) end, {noreply, State}; handle_cast(_Msg, State) -> diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index b2604d3cc..1420fb2df 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -228,7 +228,7 @@ match(Topic) when is_binary(Topic) -> %%%============================================================================= init([Id, _Opts]) -> - process_flag(min_heap_size, 1024*1024), + %%process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. diff --git a/src/emqttd_session_sup.erl b/src/emqttd_session_sup.erl index ff12b5055..1375c9fda 100644 --- a/src/emqttd_session_sup.erl +++ b/src/emqttd_session_sup.erl @@ -57,6 +57,6 @@ start_session(CleanSess, ClientId, ClientPid) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{session, {emqttd_session, start_link, []}, - transient, 10000, worker, [emqttd_session]}]}}. + temporary, 10000, worker, [emqttd_session]}]}}.