diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 3b08a77ef..74636c9b5 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -808,6 +808,9 @@ handle_info(Info, Channel) -> handle_timeout(_TRef, {keepalive, _StatVal}, Channel = #channel{keepalive = undefined}) -> {ok, Channel}; +handle_timeout(_TRef, {keepalive, _StatVal}, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; handle_timeout(_TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive}) -> case emqx_keepalive:check(StatVal, Keepalive) of @@ -818,6 +821,9 @@ handle_timeout(_TRef, {keepalive, StatVal}, handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel) end; +handle_timeout(_TRef, retry_delivery, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; handle_timeout(_TRef, retry_delivery, Channel = #channel{session = Session}) -> case emqx_session:retry(Session) of @@ -831,6 +837,9 @@ handle_timeout(_TRef, retry_delivery, handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) end; +handle_timeout(_TRef, expire_awaiting_rel, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; handle_timeout(_TRef, expire_awaiting_rel, Channel = #channel{session = Session}) -> case emqx_session:expire(awaiting_rel, Session) of diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index a516b8757..1f595d3fa 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -476,13 +476,18 @@ handle_timeout(_TRef, emit_stats, State = emqx_cm:set_chan_stats(ClientId, stats(State)), {ok, State#state{stats_timer = undefined}}; -handle_timeout(TRef, keepalive, State = - #state{transport = Transport, socket = Socket}) -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> - handle_timeout(TRef, {keepalive, RecvOct}, State); - {error, Reason} -> - handle_info({sock_error, Reason}, State) +handle_timeout(TRef, keepalive, State = #state{transport = Transport, + socket = Socket, + channel = Channel})-> + case emqx_channel:info(conn_state, Channel) of + disconnected -> {ok, State}; + _ -> + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> + handle_timeout(TRef, {keepalive, RecvOct}, State); + {error, Reason} -> + handle_info({sock_error, Reason}, State) + end end; handle_timeout(TRef, Msg, State) ->