diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 36de0d7f0..ce559f74a 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -59,7 +59,7 @@ -export([format_bind/1]). -ifdef(TEST). --export([certs_dir/2]). +-export([certs_dir/2, wait_listener_stopped/1]). -endif. -export_type([listener_id/0]). @@ -355,21 +355,33 @@ do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) -> do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) -> case cowboy:stop_listener(Id) of ok -> - wait_listener_stopped(ListenOn, 0); + _ = wait_listener_stopped(ListenOn), + ok; Error -> Error end; do_stop_listener(quic, Id, _Conf) -> quicer:terminate_listener(Id). -wait_listener_stopped(ListenOn, RetryMs) -> +wait_listener_stopped(ListenOn) -> + wait_listener_stopped(ListenOn, 0). + +wait_listener_stopped(ListenOn, 3) -> + Log = #{ + msg => "cowboy_listener_not_closed_when_stopping_listener", + listener => ListenOn, + wait_seconds => 9 + }, + ?SLOG(warning, Log), + timeout; +wait_listener_stopped(ListenOn, RetryCount) -> % NOTE % `cowboy:stop_listener/1` will not close the listening socket explicitly, % it will be closed by the runtime system **only after** the process exits. Endpoint = maps:from_list(ip_port(ListenOn)), case gen_tcp:connect( - maps:get(ip, Endpoint, loopback), + maps:get(ip, Endpoint, "127.0.0.1"), maps:get(port, Endpoint), [{active, false}] ) @@ -380,26 +392,18 @@ wait_listener_stopped(ListenOn, RetryMs) -> %% but don't want to crash if not, because this doesn't make any difference. ok; {ok, Socket} -> - %% NOTE - %% Tiny chance to get a connected socket here, when some other process - %% concurrently binds to the same port. - gen_tcp:close(Socket), - Log = #{ - msg => "cowboy_listener_still_open_after_stopping_listener", - listener => ListenOn, - wait_ms => RetryMs - }, - case RetryMs >= 3000 of - true -> - %% Don't stop this listener, in case other processes start it again. - ?SLOG(warning, Log), - ok; - false -> - ?SLOG(info, Log), - Interval = 300, - NewRetryMs = RetryMs + Interval, - timer:sleep(Interval), - wait_listener_stopped(ListenOn, NewRetryMs) + %% cowboy(ws/wss) will close the socket if we don't send packet in 5 seconds. + %% so we only wait 3 second here. + case gen_tcp:recv(Socket, 0, 3000) of + {ok, _} -> + _ = gen_tcp:close(Socket), + wait_listener_stopped(ListenOn, RetryCount + 1); + {error, timeout} -> + _ = gen_tcp:close(Socket), + wait_listener_stopped(ListenOn, RetryCount + 1); + {error, _} -> + _ = gen_tcp:close(Socket), + ok end end. diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index 087e1b062..78c2c2e3a 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -41,6 +41,7 @@ end_per_suite(Config) -> init_per_testcase(Case, Config) when Case =:= t_start_stop_listeners; Case =:= t_restart_listeners; + Case =:= t_wait_for_stop_listeners; Case =:= t_restart_listeners_with_hibernate_after_disabled -> ok = emqx_listeners:stop(), @@ -57,6 +58,36 @@ t_start_stop_listeners(_) -> ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})), ok = emqx_listeners:stop(). +t_wait_for_stop_listeners(_) -> + ok = emqx_listeners:start(), + meck:new([cowboy], [passthrough, no_history, no_link]), + %% mock stop_listener return ok but listen port is still open + meck:expect(cowboy, stop_listener, fun(_) -> ok end), + List = [ + {<<"ws:default">>, {"127.0.0.1", 8083}}, + {<<"wss:default">>, {"127.0.0.1", 8084}} + ], + lists:foreach( + fun({Id, ListenerOn}) -> + Start = erlang:system_time(seconds), + ok = emqx_listeners:stop_listener(Id), + ?assertEqual(timeout, emqx_listeners:wait_listener_stopped(ListenerOn)), + End = erlang:system_time(seconds), + ?assert(End - Start >= 9, "wait_listener_stopped should wait at least 9 seconds") + end, + List + ), + meck:unload(cowboy), + lists:foreach( + fun({Id, ListenerOn}) -> + ok = emqx_listeners:stop_listener(Id), + ?assertEqual(ok, emqx_listeners:wait_listener_stopped(ListenerOn)) + end, + List + ), + ok = emqx_listeners:stop(), + ok. + t_restart_listeners(_) -> ok = emqx_listeners:start(), ok = emqx_listeners:stop(),