Use cowboy to replace minirest

This commit is contained in:
Feng Lee 2018-08-09 14:27:49 +08:00
parent 3ac4be84e4
commit 919eb9fa1e
2 changed files with 38 additions and 30 deletions

View File

@ -31,7 +31,7 @@ start(_Type, _Args) ->
emqx_modules:load(), emqx_modules:load(),
emqx_plugins:init(), emqx_plugins:init(),
emqx_plugins:load(), emqx_plugins:load(),
emqx_listeners:start_all(), emqx_listeners:start(),
start_autocluster(), start_autocluster(),
register(emqx, self()), register(emqx, self()),
print_vsn(), print_vsn(),

View File

@ -12,75 +12,83 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%% @doc start/stop MQTT listeners. %% @doc Start/Stop MQTT listeners.
-module(emqx_listeners). -module(emqx_listeners).
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([start_all/0, restart_all/0, stop_all/0]). -export([start/0, restart/0, stop/0]).
-export([start_listener/1, stop_listener/1, restart_listener/1]). -export([start_listener/1, stop_listener/1, restart_listener/1]).
-type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). -type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}).
%% @doc Start all listeners %% @doc Start all listeners
-spec(start_all() -> ok). -spec(start() -> ok).
start_all() -> start() ->
lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])). lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])).
%% Start MQTT/TCP listener %% Start MQTT/TCP listener
-spec(start_listener(listener()) -> {ok, pid()} | {error, term()}). -spec(start_listener(listener()) -> {ok, pid()} | {error, term()}).
start_listener({tcp, ListenOn, Options}) -> start_listener({tcp, ListenOn, Options}) ->
start_mqtt_listener('mqtt:tcp', ListenOn, Options); start_mqtt_listener('mqtt:tcp', ListenOn, Options);
%% Start MQTT/TLS listener %% Start MQTT/TLS listener
start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls -> start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls ->
start_mqtt_listener('mqtt:ssl', ListenOn, Options); start_mqtt_listener('mqtt:ssl', ListenOn, Options);
%% Start MQTT/WS listener %% Start MQTT/WS listener
start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws -> start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws ->
Dispatch = [{"/mqtt", emqx_ws, []}], Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]),
NumAcceptors = proplists:get_value(acceptors, Options, 4), NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_clients, Options, 1024), MaxConnections = proplists:get_value(max_connections, Options, 1024),
TcpOptions = proplists:get_value(tcp_options, Options, []), TcpOptions = proplists:get_value(tcp_options, Options, []),
Options1 = [{port, ListenOn}, RanchOpts = [{num_acceptors, NumAcceptors},
{num_acceptors, NumAcceptors}, {max_connections, MaxConnections} | TcpOptions],
{max_connections, MaxConnections} | TcpOptions], cowboy:start_clear('mqtt:ws', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}});
minirest:start_http(Proto, Options1, Dispatch);
%% Start MQTT/WSS listener %% Start MQTT/WSS listener
start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss -> start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss ->
Dispatch = [{"/mqtt", emqx_ws, []}], Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]),
NumAcceptors = proplists:get_value(acceptors, Options, 4), NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_clients, Options, 1024), MaxConnections = proplists:get_value(max_clients, Options, 1024),
TcpOptions = proplists:get_value(tcp_options, Options, []), TcpOptions = proplists:get_value(tcp_options, Options, []),
SslOptions = proplists:get_value(ssl_options, Options, []), SslOptions = proplists:get_value(ssl_options, Options, []),
Options1 = [{port, ListenOn}, RanchOpts = [{num_acceptors, NumAcceptors},
{num_acceptors, NumAcceptors}, {max_connections, MaxConnections} | TcpOptions ++ SslOptions],
{max_connections, MaxConnections} | TcpOptions ++ SslOptions], cowboy:start_tls('mqtt:wss', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}).
minirest:start_https(Proto, Options1, Dispatch).
start_mqtt_listener(Name, ListenOn, Options) -> start_mqtt_listener(Name, ListenOn, Options) ->
SockOpts = esockd:parse_opt(Options), SockOpts = esockd:parse_opt(Options),
MFA = {emqx_connection, start_link, [Options -- SockOpts]}, MFA = {emqx_connection, start_link, [Options -- SockOpts]},
{ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA). {ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA).
with_port(Port, Opts) when is_integer(Port) ->
[{port, Port}|Opts];
with_port({Addr, Port}, Opts) ->
[{ip, Addr}, {port, Port}|Opts].
%% @doc Restart all listeners %% @doc Restart all listeners
-spec(restart_all() -> ok). -spec(restart() -> ok).
restart_all() -> restart() ->
lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])). lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])).
-spec(restart_listener(listener()) -> any()). -spec(restart_listener(listener()) -> any()).
restart_listener({tcp, ListenOn, _Opts}) -> restart_listener({tcp, ListenOn, _Options}) ->
esockd:reopen('mqtt:tcp', ListenOn); esockd:reopen('mqtt:tcp', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> restart_listener({Proto, ListenOn, _Options}) when Proto == ssl; Proto == tls ->
esockd:reopen('mqtt:ssl', ListenOn); esockd:reopen('mqtt:ssl', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> restart_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws ->
mochiweb:restart_http('mqtt:ws', ListenOn); cowboy:stop_listener('mqtt:ws'),
restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> start_listener({Proto, ListenOn, Options});
mochiweb:restart_http('mqtt:wss', ListenOn); restart_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss ->
cowboy:stop_listener('mqtt:wss'),
start_listener({Proto, ListenOn, Options});
restart_listener({Proto, ListenOn, _Opts}) -> restart_listener({Proto, ListenOn, _Opts}) ->
esockd:reopen(Proto, ListenOn). esockd:reopen(Proto, ListenOn).
%% @doc Stop all listeners %% @doc Stop all listeners
-spec(stop_all() -> ok). -spec(stop() -> ok).
stop_all() -> stop() ->
lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])). lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])).
-spec(stop_listener(listener()) -> ok | {error, any()}). -spec(stop_listener(listener()) -> ok | {error, any()}).
@ -88,10 +96,10 @@ stop_listener({tcp, ListenOn, _Opts}) ->
esockd:close('mqtt:tcp', ListenOn); esockd:close('mqtt:tcp', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls ->
esockd:close('mqtt:ssl', ListenOn); esockd:close('mqtt:ssl', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> stop_listener({Proto, _ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:stop_http('mqtt:ws', ListenOn); cowboy:stop_listener('mqtt:ws');
stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> stop_listener({Proto, _ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:stop_http('mqtt:wss', ListenOn); cowboy:stop_listener('mqtt:wss');
stop_listener({Proto, ListenOn, _Opts}) -> stop_listener({Proto, ListenOn, _Opts}) ->
esockd:close(Proto, ListenOn). esockd:close(Proto, ListenOn).