Rename emqx_mqtt module to emqx_listeners

This commit is contained in:
Feng Lee 2018-06-12 10:04:01 +08:00
parent 055de617fc
commit 5d45d40db5
1 changed files with 29 additions and 41 deletions

View File

@ -1,45 +1,36 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_mqtt). -module(emqx_listeners).
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([bootstrap/0, shutdown/0]). -export([start/0, restart/0, stop/0]).
-export([start_listener/1, stop_listener/1, restart_listener/1]).
-export([start_listeners/0, start_listener/1]). -export([all/0]).
-export([stop_listeners/0, stop_listener/1]).
-export([restart_listeners/0, restart_listener/1]).
-export([listeners/0]).
-type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). -type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}).
bootstrap() ->
start_listeners().
shutdown() ->
stop_listeners().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Start/Stop Listeners %% Start/Stop Listeners
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Start Listeners. %% @doc Start Listeners.
-spec(start_listeners() -> ok). -spec(start() -> ok).
start_listeners() -> start() ->
lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])). lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])).
%% Start mqtt listener %% Start mqtt listener
@ -65,7 +56,7 @@ start_listener(Proto, ListenOn, Opts) ->
MFArgs = {emqx_connection, start_link, [Env]}, MFArgs = {emqx_connection, start_link, [Env]},
{ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs). {ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs).
listeners() -> all() ->
[Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)]. [Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)].
is_mqtt('mqtt:tcp') -> true; is_mqtt('mqtt:tcp') -> true;
@ -75,8 +66,8 @@ is_mqtt('mqtt:wss') -> true;
is_mqtt(_Proto) -> false. is_mqtt(_Proto) -> false.
%% @doc Stop Listeners %% @doc Stop Listeners
-spec(stop_listeners() -> ok). -spec(stop() -> ok).
stop_listeners() -> stop() ->
lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])). lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])).
-spec(stop_listener(listener()) -> ok | {error, any()}). -spec(stop_listener(listener()) -> ok | {error, any()}).
@ -88,16 +79,13 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:stop_http('mqtt:ws', ListenOn); mochiweb:stop_http('mqtt:ws', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:stop_http('mqtt:wss', ListenOn); mochiweb:stop_http('mqtt:wss', ListenOn);
% stop_listener({Proto, ListenOn, _Opts}) when Proto == api ->
% mochiweb:stop_http('mqtt:api', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) -> stop_listener({Proto, ListenOn, _Opts}) ->
esockd:close(Proto, ListenOn). esockd:close(Proto, ListenOn).
%% @doc Restart Listeners %% @doc Restart Listeners
-spec(restart_listeners() -> ok). -spec(restart() -> ok).
restart_listeners() -> restart() ->
lists:foreach(fun restart_listener/1, lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])).
emqx_config:get_env(listeners, [])).
-spec(restart_listener(listener()) -> any()). -spec(restart_listener(listener()) -> any()).
restart_listener({tcp, ListenOn, _Opts}) -> restart_listener({tcp, ListenOn, _Opts}) ->
@ -108,13 +96,13 @@ restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:restart_http('mqtt:ws', ListenOn); mochiweb:restart_http('mqtt:ws', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:restart_http('mqtt:wss', ListenOn); mochiweb:restart_http('mqtt:wss', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == api ->
mochiweb:restart_http('mqtt:api', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) -> restart_listener({Proto, ListenOn, _Opts}) ->
esockd:reopen(Proto, ListenOn). esockd:reopen(Proto, ListenOn).
merge_sockopts(Options) -> merge_sockopts(Options) ->
%%TODO: tcp_options?
SockOpts = emqx_misc:merge_opts( SockOpts = emqx_misc:merge_opts(
?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])), ?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])),
emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]). emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]).