Remove the ensure_stats_timer/1 call from reply/2 and noreply/1
This commit is contained in:
parent
42fc8f5811
commit
d827604213
|
@ -417,7 +417,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
|
||||||
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
|
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
|
||||||
false ->
|
false ->
|
||||||
State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)},
|
State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)},
|
||||||
{ok, ensure_await_rel_timer(State1)}
|
{ok, ensure_stats_timer(ensure_await_rel_timer(State1))}
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
|
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
|
||||||
|
@ -430,7 +430,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
|
||||||
reply(
|
reply(
|
||||||
case emqx_inflight:contain(PacketId, Inflight) of
|
case emqx_inflight:contain(PacketId, Inflight) of
|
||||||
true ->
|
true ->
|
||||||
{ok, acked(pubrec, PacketId, State)};
|
{ok, ensure_stats_timer(acked(pubrec, PacketId, State))};
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
|
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
|
||||||
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
|
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
|
||||||
|
@ -442,7 +442,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
|
||||||
reply(
|
reply(
|
||||||
case maps:take(PacketId, AwaitingRel) of
|
case maps:take(PacketId, AwaitingRel) of
|
||||||
{_Ts, AwaitingRel1} ->
|
{_Ts, AwaitingRel1} ->
|
||||||
{ok, State#state{awaiting_rel = AwaitingRel1}};
|
{ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})};
|
||||||
error ->
|
error ->
|
||||||
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId], State),
|
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId], State),
|
||||||
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
|
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
|
||||||
|
@ -477,7 +477,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
|
||||||
end}
|
end}
|
||||||
end, {[], Subscriptions}, TopicFilters),
|
end, {[], Subscriptions}, TopicFilters),
|
||||||
suback(FromPid, PacketId, ReasonCodes),
|
suback(FromPid, PacketId, ReasonCodes),
|
||||||
noreply(State#state{subscriptions = Subscriptions1});
|
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
|
||||||
|
|
||||||
%% UNSUBSCRIBE:
|
%% UNSUBSCRIBE:
|
||||||
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
||||||
|
@ -494,14 +494,14 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
||||||
end
|
end
|
||||||
end, {[], Subscriptions}, TopicFilters),
|
end, {[], Subscriptions}, TopicFilters),
|
||||||
unsuback(From, PacketId, ReasonCodes),
|
unsuback(From, PacketId, ReasonCodes),
|
||||||
noreply(State#state{subscriptions = Subscriptions1});
|
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
|
||||||
|
|
||||||
%% PUBACK:
|
%% PUBACK:
|
||||||
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||||
noreply(
|
noreply(
|
||||||
case emqx_inflight:contain(PacketId, Inflight) of
|
case emqx_inflight:contain(PacketId, Inflight) of
|
||||||
true ->
|
true ->
|
||||||
dequeue(acked(puback, PacketId, State));
|
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
|
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
|
||||||
emqx_metrics:trans(inc, 'packets/puback/missed'),
|
emqx_metrics:trans(inc, 'packets/puback/missed'),
|
||||||
|
@ -513,7 +513,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
|
||||||
noreply(
|
noreply(
|
||||||
case emqx_inflight:contain(PacketId, Inflight) of
|
case emqx_inflight:contain(PacketId, Inflight) of
|
||||||
true ->
|
true ->
|
||||||
dequeue(acked(pubcomp, PacketId, State));
|
ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State)));
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
|
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
|
||||||
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
|
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
|
||||||
|
@ -565,7 +565,7 @@ handle_cast({resume, #{conn_pid := ConnPid,
|
||||||
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
|
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
|
||||||
|
|
||||||
%% Replay delivery and Dequeue pending messages
|
%% Replay delivery and Dequeue pending messages
|
||||||
noreply(dequeue(retry_delivery(true, State1)));
|
noreply(ensure_stats_timer(dequeue(retry_delivery(true, State1))));
|
||||||
|
|
||||||
handle_cast({update_expiry_interval, Interval}, State) ->
|
handle_cast({update_expiry_interval, Interval}, State) ->
|
||||||
{noreply, State#state{expiry_interval = Interval}};
|
{noreply, State#state{expiry_interval = Interval}};
|
||||||
|
@ -590,7 +590,7 @@ handle_info({dispatch, Topic, Msg = #message{}}, State) ->
|
||||||
ok = emqx_shared_sub:nack_no_connection(Msg),
|
ok = emqx_shared_sub:nack_no_connection(Msg),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
false ->
|
false ->
|
||||||
noreply(handle_dispatch(Topic, Msg, State))
|
noreply(ensure_stats_timer(handle_dispatch(Topic, Msg, State)))
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% Do nothing if the client has been disconnected.
|
%% Do nothing if the client has been disconnected.
|
||||||
|
@ -601,11 +601,11 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer
|
||||||
noreply(retry_delivery(false, State#state{retry_timer = undefined}));
|
noreply(retry_delivery(false, State#state{retry_timer = undefined}));
|
||||||
|
|
||||||
handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
|
handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
|
||||||
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
|
State1 = State#state{await_rel_timer = undefined},
|
||||||
|
noreply(ensure_stats_timer(expire_awaiting_rel(State1)));
|
||||||
|
|
||||||
handle_info({timeout, Timer, emit_stats},
|
handle_info({timeout, Timer, emit_stats},
|
||||||
State = #state{client_id = ClientId,
|
State = #state{client_id = ClientId, stats_timer = Timer}) ->
|
||||||
stats_timer = Timer}) ->
|
|
||||||
emqx_metrics:commit(),
|
emqx_metrics:commit(),
|
||||||
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
|
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
NewState = State#state{stats_timer = undefined},
|
NewState = State#state{stats_timer = undefined},
|
||||||
|
@ -931,8 +931,7 @@ dequeue(State = #state{inflight = Inflight}) ->
|
||||||
|
|
||||||
dequeue2(State = #state{mqueue = Q}) ->
|
dequeue2(State = #state{mqueue = Q}) ->
|
||||||
case emqx_mqueue:out(Q) of
|
case emqx_mqueue:out(Q) of
|
||||||
{empty, _Q} ->
|
{empty, _Q} -> State;
|
||||||
State;
|
|
||||||
{{value, Msg}, Q1} ->
|
{{value, Msg}, Q1} ->
|
||||||
%% Dequeue more
|
%% Dequeue more
|
||||||
dequeue(dispatch(Msg, State#state{mqueue = Q1}))
|
dequeue(dispatch(Msg, State#state{mqueue = Q1}))
|
||||||
|
@ -972,7 +971,8 @@ ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Del
|
||||||
ensure_will_delay_timer(State) ->
|
ensure_will_delay_timer(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
|
ensure_stats_timer(State = #state{enable_stats = true,
|
||||||
|
stats_timer = undefined,
|
||||||
idle_timeout = IdleTimeout}) ->
|
idle_timeout = IdleTimeout}) ->
|
||||||
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||||
ensure_stats_timer(State) ->
|
ensure_stats_timer(State) ->
|
||||||
|
@ -1010,11 +1010,11 @@ reply({Reply, State}) ->
|
||||||
reply(Reply, State).
|
reply(Reply, State).
|
||||||
|
|
||||||
reply(Reply, State) ->
|
reply(Reply, State) ->
|
||||||
{reply, Reply, ensure_stats_timer(State)}.
|
{reply, Reply, State}.
|
||||||
|
|
||||||
noreply(State) ->
|
noreply(State) ->
|
||||||
{noreply, ensure_stats_timer(State)}.
|
{noreply, State}.
|
||||||
|
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
{stop, {shutdown, Reason}, State}.
|
{stop, Reason, State}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue