diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 0f8158c58..385ca134e 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -16,18 +16,30 @@ -module(emqttd_app). --include("emqttd_cli.hrl"). - -behaviour(application). +-include("emqttd_cli.hrl"). + %% Application callbacks -export([start/2, stop/1]). +-export([start_listener/1, stop_listener/1, is_mod_enabled/1]). + +%% MQTT SockOpts +-define(MQTT_SOCKOPTS, [ + binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 512}, + {nodelay, true}]). + +-type listener() :: {atom(), inet:port_number(), [esockd:option()]}. + %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- --spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when +-spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when StartType :: normal | {takeover, node()} | {failover, node()}, StartArgs :: term(), State :: term(), @@ -38,13 +50,21 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd_cli:load(), - emqttd:load_all_mods(), + load_all_mods(), emqttd_plugins:load(), - emqttd:start_listeners(), + start_listeners(), register(emqttd, self()), print_vsn(), {ok, Sup}. +-spec stop(State :: term()) -> term(). +stop(_State) -> + catch stop_listeners(). + +%%-------------------------------------------------------------------- +%% Print Banner +%%-------------------------------------------------------------------- + print_banner() -> ?PRINT("starting emqttd on node '~s'~n", [node()]). @@ -53,14 +73,18 @@ print_vsn() -> {ok, Desc} = application:get_key(description), ?PRINT("~s ~s is running now~n", [Desc, Vsn]). +%%-------------------------------------------------------------------- +%% Start Servers +%%-------------------------------------------------------------------- + start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, - {"emqttd trace", {supervisor, emqttd_trace_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, {"emqttd retainer", emqttd_retainer}, {"emqttd pooler", {supervisor, emqttd_pooler}}, + {"emqttd trace", {supervisor, emqttd_trace_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, @@ -117,7 +141,64 @@ worker_spec(Module, Opts) when is_atom(Module) -> worker_spec(M, F, A) -> {M, {M, F, A}, permanent, 10000, worker, [M]}. --spec stop(State :: term()) -> term(). -stop(_State) -> - catch emqttd:stop_listeners(). +%%-------------------------------------------------------------------- +%% Load Modules +%%-------------------------------------------------------------------- + +%% @doc load all modules +load_all_mods() -> + lists:foreach(fun load_mod/1, emqttd:env(modules)). + +load_mod({Name, Opts}) -> + Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), + case catch Mod:load(Opts) of + ok -> lager:info("Load module ~s successfully", [Name]); + {error, Error} -> lager:error("Load module ~s error: ~p", [Name, Error]); + {'EXIT', Reason} -> lager:error("Load module ~s error: ~p", [Name, Reason]) + end. + +%% @doc Is module enabled? +-spec is_mod_enabled(Name :: atom()) -> boolean(). +is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined. + +%%-------------------------------------------------------------------- +%% Start Listeners +%%-------------------------------------------------------------------- + +%% @doc Start Listeners of the broker. +-spec start_listeners() -> any(). +start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)). + +%% Start mqtt listener +-spec start_listener(listener()) -> any(). +start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts); + +%% Start mqtt(SSL) listener +start_listener({mqtts, Port, Opts}) -> start_listener(mqtts, Port, Opts); + +%% Start http listener +start_listener({http, Port, Opts}) -> + mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}); + +%% Start https listener +start_listener({https, Port, Opts}) -> + mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}). + +start_listener(Protocol, Port, Opts) -> + MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]}, + esockd:open(Protocol, Port, merge_sockopts(Opts), MFArgs). + +merge_sockopts(Options) -> + SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS, + proplists:get_value(sockopts, Options, [])), + emqttd_opts:merge(Options, [{sockopts, SockOpts}]). + +%%-------------------------------------------------------------------- +%% Stop Listeners +%%-------------------------------------------------------------------- + +%% @doc Stop Listeners +stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)). + +stop_listener({Protocol, Port, _Opts}) -> esockd:close({Protocol, Port}).