From 01b9115fd8bafbebcf50b6646b303dafae9226da Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 3 Aug 2022 16:30:34 +0200 Subject: [PATCH] fix: keep alive check According to MQTT spec, MQTT Server should check a complete MQTT message recv in last keep-alive time frame instead of number of received bytes from the socket. This commit change to check the recv pkt counter from process dict instead. Also it could save some calls to erlang port. --- apps/emqx/src/emqx_connection.erl | 11 +++-------- apps/emqx/test/emqx_connection_SUITE.erl | 5 ----- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 1caf345e6..4ebc5b5e6 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -708,8 +708,6 @@ handle_timeout( TRef, keepalive, State = #state{ - transport = Transport, - socket = Socket, channel = Channel } ) -> @@ -717,12 +715,9 @@ handle_timeout( disconnected -> {ok, State}; _ -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> - handle_timeout(TRef, {keepalive, RecvOct}, State); - {error, Reason} -> - handle_info({sock_error, Reason}, State) - end + %% recv_pkt: valid MQTT message + RecvCnt = emqx_pd:get_counter(recv_pkt), + handle_timeout(TRef, {keepalive, RecvCnt}, State) end; handle_timeout(TRef, Msg, State) -> with_channel(handle_timeout, [TRef, Msg], State). diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index c5dfdf34a..344d81f8c 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -316,11 +316,6 @@ t_handle_timeout(_) -> emqx_connection:handle_timeout(TRef, keepalive, State) ), - ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end), - ?assertMatch( - {stop, {shutdown, for_testing}, _NState}, - emqx_connection:handle_timeout(TRef, keepalive, State) - ), ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)). t_parse_incoming(_) ->