diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9c28884c9..e9bc0ff98 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -417,7 +417,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From, {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; false -> State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)}, - {ok, ensure_await_rel_timer(State1)} + {ok, ensure_stats_timer(ensure_await_rel_timer(State1))} end; true -> ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State), @@ -430,7 +430,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In reply( case emqx_inflight:contain(PacketId, Inflight) of true -> - {ok, acked(pubrec, PacketId, State)}; + {ok, ensure_stats_timer(acked(pubrec, PacketId, State))}; false -> ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State), emqx_metrics:trans(inc, 'packets/pubrec/missed'), @@ -442,7 +442,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel reply( case maps:take(PacketId, AwaitingRel) of {_Ts, AwaitingRel1} -> - {ok, State#state{awaiting_rel = AwaitingRel1}}; + {ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})}; error -> ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId], State), emqx_metrics:trans(inc, 'packets/pubrel/missed'), @@ -477,7 +477,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, end} end, {[], Subscriptions}, TopicFilters), suback(FromPid, PacketId, ReasonCodes), - noreply(State#state{subscriptions = Subscriptions1}); + noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1})); %% UNSUBSCRIBE: handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, @@ -494,14 +494,14 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, end end, {[], Subscriptions}, TopicFilters), unsuback(From, PacketId, ReasonCodes), - noreply(State#state{subscriptions = Subscriptions1}); + noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1})); %% PUBACK: handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> noreply( case emqx_inflight:contain(PacketId, Inflight) of true -> - dequeue(acked(puback, PacketId, State)); + ensure_stats_timer(dequeue(acked(puback, PacketId, State))); false -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State), emqx_metrics:trans(inc, 'packets/puback/missed'), @@ -513,7 +513,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight noreply( case emqx_inflight:contain(PacketId, Inflight) of true -> - dequeue(acked(pubcomp, PacketId, State)); + ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State))); false -> ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State), emqx_metrics:trans(inc, 'packets/pubcomp/missed'), @@ -565,7 +565,7 @@ handle_cast({resume, #{conn_pid := ConnPid, emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), %% Replay delivery and Dequeue pending messages - noreply(dequeue(retry_delivery(true, State1))); + noreply(ensure_stats_timer(dequeue(retry_delivery(true, State1)))); handle_cast({update_expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; @@ -590,7 +590,7 @@ handle_info({dispatch, Topic, Msg = #message{}}, State) -> ok = emqx_shared_sub:nack_no_connection(Msg), {noreply, State}; false -> - noreply(handle_dispatch(Topic, Msg, State)) + noreply(ensure_stats_timer(handle_dispatch(Topic, Msg, State))) end; %% Do nothing if the client has been disconnected. @@ -601,11 +601,11 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer noreply(retry_delivery(false, State#state{retry_timer = undefined})); handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) -> - noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); + State1 = State#state{await_rel_timer = undefined}, + noreply(ensure_stats_timer(expire_awaiting_rel(State1))); handle_info({timeout, Timer, emit_stats}, - State = #state{client_id = ClientId, - stats_timer = Timer}) -> + State = #state{client_id = ClientId, stats_timer = Timer}) -> emqx_metrics:commit(), _ = emqx_sm:set_session_stats(ClientId, stats(State)), NewState = State#state{stats_timer = undefined}, @@ -931,8 +931,7 @@ dequeue(State = #state{inflight = Inflight}) -> dequeue2(State = #state{mqueue = Q}) -> case emqx_mqueue:out(Q) of - {empty, _Q} -> - State; + {empty, _Q} -> State; {{value, Msg}, Q1} -> %% Dequeue more dequeue(dispatch(Msg, State#state{mqueue = Q1})) @@ -972,7 +971,8 @@ ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Del ensure_will_delay_timer(State) -> State. -ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, +ensure_stats_timer(State = #state{enable_stats = true, + stats_timer = undefined, idle_timeout = IdleTimeout}) -> State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> @@ -1010,11 +1010,11 @@ reply({Reply, State}) -> reply(Reply, State). reply(Reply, State) -> - {reply, Reply, ensure_stats_timer(State)}. + {reply, Reply, State}. noreply(State) -> - {noreply, ensure_stats_timer(State)}. + {noreply, State}. shutdown(Reason, State) -> - {stop, {shutdown, Reason}, State}. + {stop, Reason, State}.