diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 2b80000dc..acdb9ff96 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -277,9 +277,8 @@ restart_listener(Type, ListenerName, Conf) -> restart_listener(Type, ListenerName, Conf, Conf). restart_listener(Type, ListenerName, OldConf, NewConf) -> - case do_stop_listener(Type, ListenerName, OldConf) of + case stop_listener(Type, ListenerName, OldConf) of ok -> start_listener(Type, ListenerName, NewConf); - {error, not_found} -> start_listener(Type, ListenerName, NewConf); {error, Reason} -> {error, Reason} end. @@ -296,42 +295,63 @@ stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> - case do_stop_listener(Type, ListenerName, Conf) of + Id = listener_id(Type, ListenerName), + ok = del_limiter_bucket(Id, Conf), + case do_stop_listener(Type, Id, Conf) of ok -> console_print( "Listener ~ts on ~ts stopped.~n", - [listener_id(Type, ListenerName), format_bind(Bind)] + [Id, format_bind(Bind)] ), ok; {error, not_found} -> - ?ELOG( - "Failed to stop listener ~ts on ~ts: ~0p~n", - [listener_id(Type, ListenerName), format_bind(Bind), already_stopped] - ), ok; {error, Reason} -> ?ELOG( "Failed to stop listener ~ts on ~ts: ~0p~n", - [listener_id(Type, ListenerName), format_bind(Bind), Reason] + [Id, format_bind(Bind), Reason] ), {error, Reason} end. -spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}. -do_stop_listener(Type, ListenerName, #{bind := ListenOn} = Conf) when Type == tcp; Type == ssl -> - Id = listener_id(Type, ListenerName), - del_limiter_bucket(Id, Conf), +do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl -> esockd:close(Id, ListenOn); -do_stop_listener(Type, ListenerName, Conf) when Type == ws; Type == wss -> - Id = listener_id(Type, ListenerName), - del_limiter_bucket(Id, Conf), - cowboy:stop_listener(Id); -do_stop_listener(quic, ListenerName, Conf) -> - Id = listener_id(quic, ListenerName), - del_limiter_bucket(Id, Conf), +do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss -> + case cowboy:stop_listener(Id) of + ok -> + wait_listener_stopped(ListenOn); + Error -> + Error + end; +do_stop_listener(quic, Id, _Conf) -> quicer:stop_listener(Id). +wait_listener_stopped(ListenOn) -> + % NOTE + % `cowboy:stop_listener/1` will not close the listening socket explicitly, + % it will be closed by the runtime system **only after** the process exits. + Endpoint = maps:from_list(ip_port(ListenOn)), + case + gen_tcp:connect( + maps:get(ip, Endpoint, loopback), + maps:get(port, Endpoint), + [{active, false}] + ) + of + {error, _EConnrefused} -> + %% NOTE + %% We should get `econnrefused` here because acceptors are already dead + %% but don't want to crash if not, because this doesn't make any difference. + ok; + {ok, Socket} -> + %% NOTE + %% Tiny chance to get a connected socket here, when some other process + %% concurrently binds to the same port. + gen_tcp:close(Socket) + end. + -ifndef(TEST). console_print(Fmt, Args) -> ?ULOG(Fmt, Args). -else.