Merge pull request #1771 from emqtt/emqx30-feng
Only store packet_id and timestamp for qos2 message
This commit is contained in:
commit
4a1fdddc31
|
@ -614,7 +614,7 @@ zone.external.max_awaiting_rel = 100
|
||||||
## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
|
## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
|
||||||
##
|
##
|
||||||
## Value: Duration
|
## Value: Duration
|
||||||
zone.external.await_rel_timeout = 60s
|
zone.external.await_rel_timeout = 300s
|
||||||
|
|
||||||
## Default session expiry interval for MQTT V3.1.1 connections.
|
## Default session expiry interval for MQTT V3.1.1 connections.
|
||||||
##
|
##
|
||||||
|
|
|
@ -774,7 +774,7 @@ end}.
|
||||||
|
|
||||||
%% @doc Awaiting PUBREL timeout
|
%% @doc Awaiting PUBREL timeout
|
||||||
{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
|
{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
|
||||||
{default, "60s"},
|
{default, "300s"},
|
||||||
{datatype, {duration, ms}}
|
{datatype, {duration, ms}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,7 @@
|
||||||
{counter, 'messages/qos1/received'}, % QoS1 Messages received
|
{counter, 'messages/qos1/received'}, % QoS1 Messages received
|
||||||
{counter, 'messages/qos1/sent'}, % QoS1 Messages sent
|
{counter, 'messages/qos1/sent'}, % QoS1 Messages sent
|
||||||
{counter, 'messages/qos2/received'}, % QoS2 Messages received
|
{counter, 'messages/qos2/received'}, % QoS2 Messages received
|
||||||
|
{counter, 'messages/qos2/expired'}, % QoS2 Messages expired
|
||||||
{counter, 'messages/qos2/sent'}, % QoS2 Messages sent
|
{counter, 'messages/qos2/sent'}, % QoS2 Messages sent
|
||||||
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
|
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
|
||||||
{gauge, 'messages/retained'}, % Messagea retained
|
{gauge, 'messages/retained'}, % Messagea retained
|
||||||
|
|
|
@ -376,7 +376,7 @@ handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) ->
|
||||||
{stop, {shutdown, conflict}, ok, State};
|
{stop, {shutdown, conflict}, ok, State};
|
||||||
|
|
||||||
%% PUBLISH:
|
%% PUBLISH:
|
||||||
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
|
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From,
|
||||||
State = #state{awaiting_rel = AwaitingRel}) ->
|
State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
reply(case is_awaiting_full(State) of
|
reply(case is_awaiting_full(State) of
|
||||||
false ->
|
false ->
|
||||||
|
@ -384,13 +384,12 @@ handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
|
||||||
true ->
|
true ->
|
||||||
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
|
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
|
||||||
false ->
|
false ->
|
||||||
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)},
|
State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)},
|
||||||
{emqx_broker:publish(Msg), ensure_await_rel_timer(State1)}
|
{emqx_broker:publish(Msg), ensure_await_rel_timer(State1)}
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
emqx_metrics:inc('messages/qos2/dropped'),
|
emqx_metrics:inc('messages/qos2/dropped'),
|
||||||
?LOG(warning, "Dropped message for too many awaiting_rel: ~p",
|
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
|
||||||
[emqx_message:format(Msg)], State),
|
|
||||||
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
|
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
|
||||||
end);
|
end);
|
||||||
|
|
||||||
|
@ -408,7 +407,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
|
||||||
%% PUBREL:
|
%% PUBREL:
|
||||||
handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) ->
|
handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
reply(case maps:take(PacketId, AwaitingRel) of
|
reply(case maps:take(PacketId, AwaitingRel) of
|
||||||
{_, AwaitingRel1} ->
|
{_Ts, AwaitingRel1} ->
|
||||||
{ok, State#state{awaiting_rel = AwaitingRel1}};
|
{ok, State#state{awaiting_rel = AwaitingRel1}};
|
||||||
error ->
|
error ->
|
||||||
emqx_metrics:inc('packets/pubrel/missed'),
|
emqx_metrics:inc('packets/pubrel/missed'),
|
||||||
|
@ -639,8 +638,9 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
|
||||||
case emqx_inflight:is_empty(Inflight) of
|
case emqx_inflight:is_empty(Inflight) of
|
||||||
true -> State;
|
true -> State;
|
||||||
false ->
|
false ->
|
||||||
InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
|
SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end,
|
||||||
retry_delivery(Force, InflightMsgs, os:timestamp(), State)
|
Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)),
|
||||||
|
retry_delivery(Force, Msgs, os:timestamp(), State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_delivery(_Force, [], _Now, State) ->
|
retry_delivery(_Force, [], _Now, State) ->
|
||||||
|
@ -650,9 +650,9 @@ retry_delivery(_Force, [], _Now, State) ->
|
||||||
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||||
State = #state{inflight = Inflight, retry_interval = Interval}) ->
|
State = #state{inflight = Inflight, retry_interval = Interval}) ->
|
||||||
%% Microseconds -> MilliSeconds
|
%% Microseconds -> MilliSeconds
|
||||||
Diff = timer:now_diff(Now, Ts) div 1000,
|
Age = timer:now_diff(Now, Ts) div 1000,
|
||||||
if
|
if
|
||||||
Force orelse (Diff >= Interval) ->
|
Force orelse (Age >= Interval) ->
|
||||||
Inflight1 = case {Type, Msg0} of
|
Inflight1 = case {Type, Msg0} of
|
||||||
{publish, {PacketId, Msg}} ->
|
{publish, {PacketId, Msg}} ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg) of
|
||||||
|
@ -669,7 +669,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||||
end,
|
end,
|
||||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
||||||
true ->
|
true ->
|
||||||
ensure_retry_timer(Interval - Diff, State)
|
ensure_retry_timer(Interval - max(0, Age), State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -679,36 +679,21 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||||
expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
|
expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
case maps:size(AwaitingRel) of
|
case maps:size(AwaitingRel) of
|
||||||
0 -> State;
|
0 -> State;
|
||||||
_ -> Msgs = lists:sort(sortfun(awaiting_rel), maps:to_list(AwaitingRel)),
|
_ -> expire_awaiting_rel(lists:keysort(2, maps:to_list(AwaitingRel)), os:timestamp(), State)
|
||||||
expire_awaiting_rel(Msgs, os:timestamp(), State)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
expire_awaiting_rel([], _Now, State) ->
|
expire_awaiting_rel([], _Now, State) ->
|
||||||
State#state{await_rel_timer = undefined};
|
State#state{await_rel_timer = undefined};
|
||||||
|
|
||||||
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now,
|
expire_awaiting_rel([{PacketId, Ts} | More], Now,
|
||||||
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
||||||
case (timer:now_diff(Now, TS) div 1000) of
|
case (timer:now_diff(Now, Ts) div 1000) of
|
||||||
Diff when Diff >= Timeout ->
|
Age when Age >= Timeout ->
|
||||||
emqx_metrics:inc('messages/qos2/dropped'),
|
emqx_metrics:inc('messages/qos2/expired'),
|
||||||
?LOG(warning, "Dropped message for await_rel_timeout: ~p",
|
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State),
|
||||||
[emqx_message:format(Msg)], State),
|
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
||||||
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
Age ->
|
||||||
Diff ->
|
ensure_await_rel_timer(Timeout - max(0, Age), State)
|
||||||
ensure_await_rel_timer(Timeout - Diff, State)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Sort Inflight, AwaitingRel
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
sortfun(inflight) ->
|
|
||||||
fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end;
|
|
||||||
|
|
||||||
sortfun(awaiting_rel) ->
|
|
||||||
fun({_, #message{timestamp = Ts1}},
|
|
||||||
{_, #message{timestamp = Ts2}}) ->
|
|
||||||
Ts1 < Ts2
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue