From 78a8ccd0f2bdad31aecac0790410cbbae4de5947 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 21:17:20 +0800 Subject: [PATCH] Only store packet_id and timestamp for qos2 message --- src/emqx_session.erl | 53 ++++++++++++++++---------------------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b291bd1fa..65bce85aa 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -376,7 +376,7 @@ handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> {stop, {shutdown, conflict}, ok, State}; %% PUBLISH: -handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, +handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From, State = #state{awaiting_rel = AwaitingRel}) -> reply(case is_awaiting_full(State) of false -> @@ -384,13 +384,12 @@ handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, true -> {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; false -> - State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, + State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)}, {emqx_broker:publish(Msg), ensure_await_rel_timer(State1)} end; true -> emqx_metrics:inc('messages/qos2/dropped'), - ?LOG(warning, "Dropped message for too many awaiting_rel: ~p", - [emqx_message:format(Msg)], State), + ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State), {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} end); @@ -408,7 +407,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In %% PUBREL: handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) -> reply(case maps:take(PacketId, AwaitingRel) of - {_, AwaitingRel1} -> + {_Ts, AwaitingRel1} -> {ok, State#state{awaiting_rel = AwaitingRel1}}; error -> emqx_metrics:inc('packets/pubrel/missed'), @@ -639,8 +638,9 @@ retry_delivery(Force, State = #state{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> State; false -> - InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), - retry_delivery(Force, InflightMsgs, os:timestamp(), State) + SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, + Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)), + retry_delivery(Force, Msgs, os:timestamp(), State) end. retry_delivery(_Force, [], _Now, State) -> @@ -650,9 +650,9 @@ retry_delivery(_Force, [], _Now, State) -> retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, State = #state{inflight = Inflight, retry_interval = Interval}) -> %% Microseconds -> MilliSeconds - Diff = timer:now_diff(Now, Ts) div 1000, + Age = timer:now_diff(Now, Ts) div 1000, if - Force orelse (Diff >= Interval) -> + Force orelse (Age >= Interval) -> Inflight1 = case {Type, Msg0} of {publish, {PacketId, Msg}} -> case emqx_message:is_expired(Msg) of @@ -669,7 +669,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, end, retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); true -> - ensure_retry_timer(Interval - Diff, State) + ensure_retry_timer(Interval - max(0, Age), State) end. %%------------------------------------------------------------------------------ @@ -679,36 +679,21 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> case maps:size(AwaitingRel) of 0 -> State; - _ -> Msgs = lists:sort(sortfun(awaiting_rel), maps:to_list(AwaitingRel)), - expire_awaiting_rel(Msgs, os:timestamp(), State) + _ -> expire_awaiting_rel(lists:keysort(2, maps:to_list(AwaitingRel)), os:timestamp(), State) end. expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; -expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now, +expire_awaiting_rel([{PacketId, Ts} | More], Now, State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> - case (timer:now_diff(Now, TS) div 1000) of - Diff when Diff >= Timeout -> - emqx_metrics:inc('messages/qos2/dropped'), - ?LOG(warning, "Dropped message for await_rel_timeout: ~p", - [emqx_message:format(Msg)], State), - expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); - Diff -> - ensure_await_rel_timer(Timeout - Diff, State) - end. - -%%------------------------------------------------------------------------------ -%% Sort Inflight, AwaitingRel -%%------------------------------------------------------------------------------ - -sortfun(inflight) -> - fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end; - -sortfun(awaiting_rel) -> - fun({_, #message{timestamp = Ts1}}, - {_, #message{timestamp = Ts2}}) -> - Ts1 < Ts2 + case (timer:now_diff(Now, Ts) div 1000) of + Age when Age >= Timeout -> + emqx_metrics:inc('messages/qos2/expired'), + ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State), + expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); + Age -> + ensure_await_rel_timer(Timeout - max(0, Age), State) end. %%------------------------------------------------------------------------------