184 lines
7.2 KiB
Erlang
184 lines
7.2 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc Start/Stop MQTT listeners.
|
|
-module(emqx_listeners).
|
|
|
|
-include("emqx_mqtt.hrl").
|
|
|
|
%% APIs
|
|
-export([ start/0
|
|
, restart/0
|
|
, stop/0
|
|
]).
|
|
|
|
-export([ start_listener/1
|
|
, start_listener/3
|
|
, stop_listener/1
|
|
, stop_listener/3
|
|
, restart_listener/1
|
|
, restart_listener/3
|
|
]).
|
|
|
|
-type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc Start all listeners.
|
|
-spec(start() -> ok).
|
|
start() ->
|
|
lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
|
|
|
|
-spec(start_listener(listener()) -> ok).
|
|
start_listener({Proto, ListenOn, Options}) ->
|
|
case start_listener(Proto, ListenOn, Options) of
|
|
{ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n",
|
|
[Proto, format(ListenOn)]);
|
|
{error, Reason} ->
|
|
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!",
|
|
[Proto, format(ListenOn), Reason]),
|
|
error(Reason)
|
|
end.
|
|
|
|
%% Start MQTT/TCP listener
|
|
-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
|
|
-> {ok, pid()} | {error, term()}).
|
|
start_listener(tcp, ListenOn, Options) ->
|
|
start_mqtt_listener('mqtt:tcp', ListenOn, Options);
|
|
|
|
%% Start MQTT/TLS listener
|
|
start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls ->
|
|
start_mqtt_listener('mqtt:ssl', ListenOn, Options);
|
|
|
|
%% Start MQTT/WS listener
|
|
start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
|
|
start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn,
|
|
ranch_opts(Options), ws_opts(Options));
|
|
|
|
%% Start MQTT/WSS listener
|
|
start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
|
|
start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn,
|
|
ranch_opts(Options), ws_opts(Options)).
|
|
|
|
start_mqtt_listener(Name, ListenOn, Options) ->
|
|
SockOpts = esockd:parse_opt(Options),
|
|
esockd:open(Name, ListenOn, merge_default(SockOpts),
|
|
{emqx_connection, start_link, [Options -- SockOpts]}).
|
|
|
|
start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) ->
|
|
Start(ws_name(Name, ListenOn), with_port(ListenOn, RanchOpts), ProtoOpts).
|
|
|
|
mqtt_path(Options) ->
|
|
proplists:get_value(mqtt_path, Options, "/mqtt").
|
|
|
|
ws_opts(Options) ->
|
|
WsPaths = [{mqtt_path(Options), emqx_ws_connection, Options}],
|
|
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
|
|
ProxyProto = proplists:get_value(proxy_protocol, Options, false),
|
|
#{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
|
|
|
|
ranch_opts(Options) ->
|
|
NumAcceptors = proplists:get_value(acceptors, Options, 4),
|
|
MaxConnections = proplists:get_value(max_connections, Options, 1024),
|
|
TcpOptions = proplists:get_value(tcp_options, Options, []),
|
|
RanchOpts = #{num_acceptors => NumAcceptors,
|
|
max_connections => MaxConnections,
|
|
socket_opts => TcpOptions},
|
|
case proplists:get_value(ssl_options, Options) of
|
|
undefined -> RanchOpts;
|
|
SslOptions -> RanchOpts#{socket_opts => TcpOptions ++ SslOptions}
|
|
end.
|
|
|
|
with_port(Port, Opts = #{socket_opts := SocketOption}) when is_integer(Port) ->
|
|
Opts#{socket_opts => [{port, Port}| SocketOption]};
|
|
with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) ->
|
|
Opts#{socket_opts => [{ip, Addr}, {port, Port}| SocketOption]}.
|
|
|
|
%% @doc Restart all listeners
|
|
-spec(restart() -> ok).
|
|
restart() ->
|
|
lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
|
|
|
|
-spec(restart_listener(listener()) -> any()).
|
|
restart_listener({Proto, ListenOn, Options}) ->
|
|
restart_listener(Proto, ListenOn, Options).
|
|
|
|
-spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()).
|
|
restart_listener(tcp, ListenOn, _Options) ->
|
|
esockd:reopen('mqtt:tcp', ListenOn);
|
|
restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls ->
|
|
esockd:reopen('mqtt:ssl', ListenOn);
|
|
restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
|
|
cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)),
|
|
start_listener(Proto, ListenOn, Options);
|
|
restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
|
|
cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)),
|
|
start_listener(Proto, ListenOn, Options);
|
|
restart_listener(Proto, ListenOn, _Opts) ->
|
|
esockd:reopen(Proto, ListenOn).
|
|
|
|
%% @doc Stop all listeners.
|
|
-spec(stop() -> ok).
|
|
stop() ->
|
|
lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])).
|
|
|
|
-spec(stop_listener(listener()) -> ok | {error, term()}).
|
|
stop_listener({Proto, ListenOn, Opts}) ->
|
|
StopRet = stop_listener(Proto, ListenOn, Opts),
|
|
case StopRet of
|
|
ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n",
|
|
[Proto, format(ListenOn)]);
|
|
{error, Reason} ->
|
|
io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.",
|
|
[Proto, format(ListenOn), Reason])
|
|
end,
|
|
StopRet.
|
|
|
|
-spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
|
|
-> ok | {error, term()}).
|
|
stop_listener(tcp, ListenOn, _Opts) ->
|
|
esockd:close('mqtt:tcp', ListenOn);
|
|
stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls ->
|
|
esockd:close('mqtt:ssl', ListenOn);
|
|
stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws ->
|
|
cowboy:stop_listener(ws_name('mqtt:ws', ListenOn));
|
|
stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss ->
|
|
cowboy:stop_listener(ws_name('mqtt:wss', ListenOn));
|
|
stop_listener(Proto, ListenOn, _Opts) ->
|
|
esockd:close(Proto, ListenOn).
|
|
|
|
merge_default(Options) ->
|
|
case lists:keytake(tcp_options, 1, Options) of
|
|
{value, {tcp_options, TcpOpts}, Options1} ->
|
|
[{tcp_options, emqx_misc:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1];
|
|
false ->
|
|
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
|
|
end.
|
|
|
|
format(Port) when is_integer(Port) ->
|
|
io_lib:format("0.0.0.0:~w", [Port]);
|
|
format({Addr, Port}) when is_list(Addr) ->
|
|
io_lib:format("~s:~w", [Addr, Port]);
|
|
format({Addr, Port}) when is_tuple(Addr) ->
|
|
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
|
|
|
|
ws_name(Name, {_Addr, Port}) ->
|
|
ws_name(Name, Port);
|
|
ws_name(Name, Port) ->
|
|
list_to_atom(lists:concat([Name, ":", Port])).
|