diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 5a2fc0dc8..78198c4e4 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -568,16 +568,22 @@ handle_timeout(_TRef, limit_timeout, State) -> limit_timer = undefined }, handle_info(activate_socket, NState); -handle_timeout(TRef, keepalive, State = #state{ +handle_timeout(TRef, Keepalive, State = #state{ chann_mod = ChannMod, socket = Socket, - channel = Channel})-> + channel = Channel}) + when Keepalive == keepalive; + Keepalive == keepalive_send -> + Stat = case Keepalive of + keepalive -> recv_oct; + keepalive_send -> send_oct + end, case ChannMod:info(conn_state, Channel) of disconnected -> {ok, State}; _ -> - case esockd_getstat(Socket, [recv_oct, send_oct]) of - {ok, [{recv_oct, RecvOct}, {send_oct, SendOct}]} -> - handle_timeout(TRef, {keepalive, {RecvOct, SendOct}}, State); + case esockd_getstat(Socket, [Stat]) of + {ok, [{Stat, RecvOct}]} -> + handle_timeout(TRef, {Keepalive, RecvOct}, State); {error, Reason} -> handle_info({sock_error, Reason}, State) end diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index c83a65e64..bfa8f6fba 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1272,7 +1272,7 @@ handle_timeout(_TRef, {keepalive, _StatVal}, when ConnState =:= disconnected; ConnState =:= asleep -> {ok, Channel}; -handle_timeout(_TRef, {keepalive, {StatVal, _}}, +handle_timeout(_TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive}) -> case emqx_keepalive:check(StatVal, Keepalive) of {ok, NKeepalive} -> diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index e071df433..9027c515e 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -42,6 +42,7 @@ ]). -export([ handle_call/2 + , handle_cast/2 , handle_info/2 ]). @@ -84,8 +85,8 @@ -type(replies() :: stomp_frame() | reply() | [reply()]). -define(TIMER_TABLE, #{ - incoming_timer => incoming, - outgoing_timer => outgoing, + incoming_timer => keepalive, + outgoing_timer => keepalive_send, clean_trans_timer => clean_trans }). @@ -740,7 +741,7 @@ handle_deliver(Delivers, | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()}). -handle_timeout(_TRef, {incoming, NewVal}, +handle_timeout(_TRef, {keepalive, NewVal}, Channel = #channel{heartbeat = HrtBt}) -> case emqx_stomp_heartbeat:check(incoming, NewVal, HrtBt) of {error, timeout} -> @@ -751,7 +752,7 @@ handle_timeout(_TRef, {incoming, NewVal}, )} end; -handle_timeout(_TRef, {outgoing, NewVal}, +handle_timeout(_TRef, {keepalive_send, NewVal}, Channel = #channel{heartbeat = HrtBt}) -> case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of {error, timeout} ->