diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 4188bd59f..873ae6fed 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -52,6 +52,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), {ok, Listeners} = application:get_env(listeners), + emqttd_plugin_manager:load_all_plugins(), emqttd:open(Listeners), register(emqttd, self()), print_vsn(), @@ -137,5 +138,6 @@ worker_spec(Name, Opts) -> stop(_State) -> {ok, Listeners} = application:get_env(listeners), emqttd:close(Listeners), + emqttd_plugin_manager:unload_all_plugins(), ok. diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index c32835eec..f16344dcc 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -155,20 +155,17 @@ bridges(["stop", SNode, Topic]) -> end. plugins(["list"]) -> - Plugins = emqttd_plugin_manager:list(), - lists:foreach(fun({Name, Attrs}) -> - ?PRINT("plugin ~s~n", [Name]), - [?PRINT(" ~s:~p~n", [Attr, Val]) || {Attr, Val} <- Attrs] - end, Plugins); + Plugins = emqttd_plugin_manager:loaded_plugins(), + lists:foreach(fun(Plugin) -> ?PRINT("~p~n", [Plugin]) end, Plugins); plugins(["load", Name]) -> - case emqttd_plugin_manager:load(list_to_atom(Name)) of + case emqttd_plugin_manager:load_plugin(list_to_atom(Name)) of ok -> ?PRINT("plugin ~s is loaded successfully.~n", [Name]); {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) end; plugins(["unload", Name]) -> - case emqttd_plugin_manager:load(list_to_atom(Name)) of + case emqttd_plugin_manager:unload_plugin(list_to_atom(Name)) of ok -> ?PRINT("plugin ~s is unloaded successfully.~n", [Name]); {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) end. diff --git a/apps/emqttd/src/emqttd_plugin.erl b/apps/emqttd/src/emqttd_plugin.erl deleted file mode 100644 index f90a96361..000000000 --- a/apps/emqttd/src/emqttd_plugin.erl +++ /dev/null @@ -1,74 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd plugin. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_plugin). - --author("Feng Lee "). - --behaviour(gen_server). - --define(SERVER, ?MODULE). - -%% API Function Exports --export([start_link/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ -init(Args) -> - {ok, Args}. - -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ - - diff --git a/apps/emqttd/src/emqttd_plugin_manager.erl b/apps/emqttd/src/emqttd_plugin_manager.erl index 244018d5d..268d82870 100644 --- a/apps/emqttd/src/emqttd_plugin_manager.erl +++ b/apps/emqttd/src/emqttd_plugin_manager.erl @@ -26,28 +26,131 @@ %%%----------------------------------------------------------------------------- -module(emqttd_plugin_manager). +%%TODO: rewrite this module later... + -author("Feng Lee "). --export([list/0, load/1, unload/1]). +-export([load_all_plugins/0, unload_all_plugins/0]). + +-export([load_plugin/1, unload_plugin/1]). + +-export([loaded_plugins/0]). %%------------------------------------------------------------------------------ -%% @doc List all loaded plugins +%% @doc Load all plugins %% @end %%------------------------------------------------------------------------------ -list() -> - []. +load_all_plugins() -> + {ok, [PluginApps]} = file:consult("etc/plugins.config"), + LoadedPlugins = load_plugins(PluginApps), + RunningPlugins = start_plugins(lists:reverse(LoadedPlugins)), + %% save to application env? + application:set_env(emqttd, loaded_plugins, lists:reverse(RunningPlugins)). + +load_plugins(PluginApps) -> + lists:foldl(fun({App, _Env}, Acc) -> + case application:load(App) of + ok -> + io:format("load plugin ~p successfully~n", [App]), + [App | Acc]; + {error, {already_loaded, App}} -> + io:format("load plugin ~p is already loaded~n", [App]), + [App | Acc]; + {error, Reason} -> + io:format("load plugin ~p error: ~p~n", [App, Reason]), + Acc + end + end, [], PluginApps). + +start_plugins(PluginApps) -> + lists:foldl(fun(App, Acc) -> + case application:start(App) of + ok -> + io:format("start plugin ~p successfully~n", [App]), + [App | Acc]; + {error, {already_started, App}} -> + io:format("plugin ~p is already started~n", [App]), + [App | Acc]; + {error, Reason} -> + io:format("start plugin ~p error: ~p~n", [App, Reason]), + Acc + end + end, [], PluginApps). + +%%------------------------------------------------------------------------------ +%% @doc Loaded plugins +%% @end +%%------------------------------------------------------------------------------ +loaded_plugins() -> + LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []), + lager:info("loaded plugins: ~p", [LoadedPluginApps]), + [App || App = {Name, _Descr, _Vsn} <- application:which_applications(), + lists:member(Name, LoadedPluginApps)]. + +%%------------------------------------------------------------------------------ +%% @doc Unload all plugins +%% @end +%%------------------------------------------------------------------------------ +unload_all_plugins() -> + LoadedPluginApps = application:get_env(emqttd, loaded_plugins, []), + StoppedApps = stop_plugins(lists:reverse(LoadedPluginApps)), + UnloadedApps = unload_plugins(lists:reverse(StoppedApps)), + case LoadedPluginApps -- UnloadedApps of + [] -> + application:unset_env(emqttd, loaded_plugins); + LeftApps -> + lager:error("cannot unload plugins: ~p", [LeftApps]), + application:set_env(emqttd, loaded_plugins, LeftApps) + end. + +stop_plugins(PluginApps) -> + lists:foldl(fun(App, Acc) -> + case application:stop(App) of + ok -> + io:format("stop plugin ~p successfully~n", [App]), + [App | Acc]; + {error, {not_started, App}} -> + io:format("plugin ~p is not started~n", [App]), + [App | Acc]; + {error, Reason} -> + io:format("stop plugin ~p error: ~p~n", [App, Reason]), + Acc + end + end, [], PluginApps). + +unload_plugins(PluginApps) -> + lists:foldl(fun({App, _Env}, Acc) -> + case application:unload(App) of + ok -> + io:format("unload plugin ~p successfully~n", [App]), + [App | Acc]; + {error, {not_loaded, App}} -> + io:format("load plugin ~p is not loaded~n", [App]), + [App | Acc]; + {error, Reason} -> + io:format("unload plugin ~p error: ~p~n", [App, Reason]), + Acc + end + end, [], PluginApps). %%------------------------------------------------------------------------------ %% @doc Load Plugin %% @end %%------------------------------------------------------------------------------ -load(Name) when is_atom(Name) -> - ok. +load_plugin(Name) when is_atom(Name) -> + %% load app + %% start app + %% set env... + application:start(Name). %%------------------------------------------------------------------------------ %% @doc Unload Plugin %% @end %%------------------------------------------------------------------------------ -unload(Name) when is_atom(Name) -> - ok. +unload_plugin(Name) when is_atom(Name) -> + %% stop app + %% unload app + %% set env + application:stop(Name), + application:unload(Name). diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 98079ab7e..016d93584 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -103,7 +103,13 @@ start_link(Id, Opts) -> %%------------------------------------------------------------------------------ -spec create(Topic :: binary()) -> ok | {error, Error :: any()}. create(Topic) when is_binary(Topic) -> - call({create, Topic}). + TopicR = #mqtt_topic{topic = Topic, node = node()}, + case mnesia:transaction(fun add_topic/1, [TopicR]) of + {atomic, ok} -> + setstats(topics), ok; + {aborted, Error} -> + {error, Error} + end. %%------------------------------------------------------------------------------ %% @doc Subscribe topic @@ -193,16 +199,6 @@ init([Id, _Opts]) -> gproc_pool:connect_worker(pubsub, {?MODULE, Id}), {ok, #state{id = Id, submap = maps:new()}}. -handle_call({create, Topic}, _From, State) -> - TopicR = #mqtt_topic{topic = Topic, node = node()}, - Reply = - case mnesia:transaction(fun add_topic/1, [TopicR]) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end, - setstats(topics), - {reply, Reply, State}; - handle_call({subscribe, SubPid, Topics}, _From, State) -> TopicSubs = lists:map(fun({Topic, Qos}) -> {#mqtt_topic{topic = Topic, node = node()},