diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 92778f7a8..1d17cac03 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -21,6 +21,7 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[MQTT]"). @@ -449,7 +450,7 @@ handle_msg(Msg, State) -> -spec terminate(any(), state()) -> no_return(). terminate(Reason, State = #state{channel = Channel, transport = Transport, socket = Socket}) -> - ?LOG(debug, "Terminated due to ~p", [Reason]), + ?tp(debug, terminate, #{reason => Reason}), Channel1 = emqx_channel:set_conn_state(disconnected, Channel), emqx_congestion:cancel_alarms(Socket, Transport, Channel1), emqx_channel:terminate(Reason, Channel1), @@ -686,6 +687,7 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> check_oom(State = #state{channel = Channel}) -> Zone = emqx_channel:info(zone, Channel), OomPolicy = emqx_zone:oom_policy(Zone), + ?tp(debug, check_oom, #{policy => OomPolicy}), case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of {shutdown, Reason} -> %% triggers terminate/2 callback immediately diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 22abd2ca8..ae1587d7b 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_ct:all(?MODULE). @@ -88,11 +89,18 @@ init_per_testcase(TestCase, Config) when ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end), ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end), ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end), - Config; + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase(init, Config); + _ -> Config + end; init_per_testcase(_, Config) -> Config. -end_per_testcase(_TestCase, Config) -> +end_per_testcase(TestCase, Config) -> + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase('end', Config); + false -> ok + end, Config. %%-------------------------------------------------------------------- @@ -386,6 +394,40 @@ t_get_conn_info(_) -> }, SockInfo) end). +t_oom_shutdown(init, Config) -> + ok = snabbkaffe:start_trace(), + ok = meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]), + ok = meck:new(emqx_zone, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_zone, oom_policy, + fun(_Zone) -> #{message_queue_len => 10, max_heap_size => 8000000} end), + meck:expect(emqx_misc, check_oom, + fun(_) -> {shutdown, "fake_oom"} end), + Config; +t_oom_shutdown('end', _Config) -> + snabbkaffe:stop(), + meck:unload(emqx_misc), + meck:unload(emqx_zone), + ok. + +t_oom_shutdown(_) -> + Opts = #{trap_exit => true}, + with_conn( + fun(Pid) -> + Pid ! {tcp_passive, foo}, + ?block_until(#{?snk_kind := check_oom}, 100), + ?block_until(#{?snk_kind := terminate}, 10), + Trace = snabbkaffe:collect_trace(), + ?assertEqual(1, length(?of_kind(terminate, Trace))), + receive + {'EXIT', Pid, Reason} -> + ?assertEqual({shutdown, "fake_oom"}, Reason) + after 1000 -> + error(timeout) + end, + ?assertNot(erlang:is_process_alive(Pid)) + end, Opts), + ok. + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------