Fix unexpected timeout
This commit is contained in:
parent
7f807c0b11
commit
597558fee8
|
@ -786,9 +786,8 @@ handle_info(Info, Channel) ->
|
||||||
-> {ok, channel()}
|
-> {ok, channel()}
|
||||||
| {ok, replies(), channel()}
|
| {ok, replies(), channel()}
|
||||||
| {shutdown, Reason :: term(), channel()}).
|
| {shutdown, Reason :: term(), channel()}).
|
||||||
handle_timeout(TRef, {keepalive, StatVal},
|
handle_timeout(_TRef, {keepalive, StatVal},
|
||||||
Channel = #channel{keepalive = Keepalive,
|
Channel = #channel{keepalive = Keepalive}) ->
|
||||||
timers = #{alive_timer := TRef}}) ->
|
|
||||||
case emqx_keepalive:check(StatVal, Keepalive) of
|
case emqx_keepalive:check(StatVal, Keepalive) of
|
||||||
{ok, NKeepalive} ->
|
{ok, NKeepalive} ->
|
||||||
NChannel = Channel#channel{keepalive = NKeepalive},
|
NChannel = Channel#channel{keepalive = NKeepalive},
|
||||||
|
@ -797,9 +796,8 @@ handle_timeout(TRef, {keepalive, StatVal},
|
||||||
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
|
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(TRef, retry_delivery,
|
handle_timeout(_TRef, retry_delivery,
|
||||||
Channel = #channel{session = Session,
|
Channel = #channel{session = Session}) ->
|
||||||
timers = #{retry_timer := TRef}}) ->
|
|
||||||
case emqx_session:retry(Session) of
|
case emqx_session:retry(Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
||||||
|
@ -811,9 +809,8 @@ handle_timeout(TRef, retry_delivery,
|
||||||
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(TRef, expire_awaiting_rel,
|
handle_timeout(_TRef, expire_awaiting_rel,
|
||||||
Channel = #channel{session = Session,
|
Channel = #channel{session = Session}) ->
|
||||||
timers = #{await_timer := TRef}}) ->
|
|
||||||
case emqx_session:expire(awaiting_rel, Session) of
|
case emqx_session:expire(awaiting_rel, Session) of
|
||||||
{ok, Session} ->
|
{ok, Session} ->
|
||||||
{ok, clean_timer(await_timer, Channel#channel{session = Session})};
|
{ok, clean_timer(await_timer, Channel#channel{session = Session})};
|
||||||
|
@ -821,11 +818,10 @@ handle_timeout(TRef, expire_awaiting_rel,
|
||||||
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
|
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) ->
|
handle_timeout(_TRef, expire_session, Channel) ->
|
||||||
shutdown(expired, Channel);
|
shutdown(expired, Channel);
|
||||||
|
|
||||||
handle_timeout(TRef, will_message, Channel = #channel{will_msg = WillMsg,
|
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
|
||||||
timers = #{will_timer := TRef}}) ->
|
|
||||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||||
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
||||||
|
|
||||||
|
|
|
@ -459,17 +459,17 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle timeout
|
%% Handle timeout
|
||||||
|
|
||||||
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
|
handle_timeout(_TRef, idle_timeout, State) ->
|
||||||
shutdown(idle_timeout, State);
|
shutdown(idle_timeout, State);
|
||||||
|
|
||||||
handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) ->
|
handle_timeout(_TRef, limit_timeout, State) ->
|
||||||
NState = State#state{sockstate = idle,
|
NState = State#state{sockstate = idle,
|
||||||
limit_timer = undefined
|
limit_timer = undefined
|
||||||
},
|
},
|
||||||
handle_info(activate_socket, NState);
|
handle_info(activate_socket, NState);
|
||||||
|
|
||||||
handle_timeout(TRef, emit_stats, State =
|
handle_timeout(_TRef, emit_stats, State =
|
||||||
#state{stats_timer = TRef, channel = Channel}) ->
|
#state{channel = Channel}) ->
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||||
{ok, State#state{stats_timer = undefined}};
|
{ok, State#state{stats_timer = undefined}};
|
||||||
|
|
Loading…
Reference in New Issue