diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 7a5426bae..d5ca8f6ae 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -31,7 +31,7 @@ start(_Type, _Args) -> emqx_modules:load(), emqx_plugins:init(), emqx_plugins:load(), - emqx_listeners:start_all(), + emqx_listeners:start(), start_autocluster(), register(emqx, self()), print_vsn(), diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 257820fb1..78fd5db80 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -12,75 +12,83 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% @doc start/stop MQTT listeners. +%% @doc Start/Stop MQTT listeners. -module(emqx_listeners). -include("emqx_mqtt.hrl"). --export([start_all/0, restart_all/0, stop_all/0]). +-export([start/0, restart/0, stop/0]). -export([start_listener/1, stop_listener/1, restart_listener/1]). -type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). %% @doc Start all listeners --spec(start_all() -> ok). -start_all() -> +-spec(start() -> ok). +start() -> lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])). %% Start MQTT/TCP listener -spec(start_listener(listener()) -> {ok, pid()} | {error, term()}). start_listener({tcp, ListenOn, Options}) -> start_mqtt_listener('mqtt:tcp', ListenOn, Options); + %% Start MQTT/TLS listener start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls -> start_mqtt_listener('mqtt:ssl', ListenOn, Options); + %% Start MQTT/WS listener start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws -> - Dispatch = [{"/mqtt", emqx_ws, []}], + Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]), NumAcceptors = proplists:get_value(acceptors, Options, 4), - MaxConnections = proplists:get_value(max_clients, Options, 1024), + MaxConnections = proplists:get_value(max_connections, Options, 1024), TcpOptions = proplists:get_value(tcp_options, Options, []), - Options1 = [{port, ListenOn}, - {num_acceptors, NumAcceptors}, - {max_connections, MaxConnections} | TcpOptions], - minirest:start_http(Proto, Options1, Dispatch); + RanchOpts = [{num_acceptors, NumAcceptors}, + {max_connections, MaxConnections} | TcpOptions], + cowboy:start_clear('mqtt:ws', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}); + %% Start MQTT/WSS listener start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss -> - Dispatch = [{"/mqtt", emqx_ws, []}], + Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]), NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_clients, Options, 1024), TcpOptions = proplists:get_value(tcp_options, Options, []), SslOptions = proplists:get_value(ssl_options, Options, []), - Options1 = [{port, ListenOn}, - {num_acceptors, NumAcceptors}, - {max_connections, MaxConnections} | TcpOptions ++ SslOptions], - minirest:start_https(Proto, Options1, Dispatch). + RanchOpts = [{num_acceptors, NumAcceptors}, + {max_connections, MaxConnections} | TcpOptions ++ SslOptions], + cowboy:start_tls('mqtt:wss', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). start_mqtt_listener(Name, ListenOn, Options) -> SockOpts = esockd:parse_opt(Options), MFA = {emqx_connection, start_link, [Options -- SockOpts]}, {ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA). +with_port(Port, Opts) when is_integer(Port) -> + [{port, Port}|Opts]; +with_port({Addr, Port}, Opts) -> + [{ip, Addr}, {port, Port}|Opts]. + %% @doc Restart all listeners --spec(restart_all() -> ok). -restart_all() -> +-spec(restart() -> ok). +restart() -> lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])). -spec(restart_listener(listener()) -> any()). -restart_listener({tcp, ListenOn, _Opts}) -> +restart_listener({tcp, ListenOn, _Options}) -> esockd:reopen('mqtt:tcp', ListenOn); -restart_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> +restart_listener({Proto, ListenOn, _Options}) when Proto == ssl; Proto == tls -> esockd:reopen('mqtt:ssl', ListenOn); -restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> - mochiweb:restart_http('mqtt:ws', ListenOn); -restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> - mochiweb:restart_http('mqtt:wss', ListenOn); +restart_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws -> + cowboy:stop_listener('mqtt:ws'), + start_listener({Proto, ListenOn, Options}); +restart_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss -> + cowboy:stop_listener('mqtt:wss'), + start_listener({Proto, ListenOn, Options}); restart_listener({Proto, ListenOn, _Opts}) -> esockd:reopen(Proto, ListenOn). %% @doc Stop all listeners --spec(stop_all() -> ok). -stop_all() -> +-spec(stop() -> ok). +stop() -> lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])). -spec(stop_listener(listener()) -> ok | {error, any()}). @@ -88,10 +96,10 @@ stop_listener({tcp, ListenOn, _Opts}) -> esockd:close('mqtt:tcp', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> esockd:close('mqtt:ssl', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> - mochiweb:stop_http('mqtt:ws', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> - mochiweb:stop_http('mqtt:wss', ListenOn); +stop_listener({Proto, _ListenOn, _Opts}) when Proto == http; Proto == ws -> + cowboy:stop_listener('mqtt:ws'); +stop_listener({Proto, _ListenOn, _Opts}) when Proto == https; Proto == wss -> + cowboy:stop_listener('mqtt:wss'); stop_listener({Proto, ListenOn, _Opts}) -> esockd:close(Proto, ListenOn).