Only store packet_id and timestamp for qos2 message

This commit is contained in:
Feng Lee 2018-08-30 21:17:20 +08:00
parent bbb58dad68
commit 78a8ccd0f2
1 changed files with 19 additions and 34 deletions

View File

@ -376,7 +376,7 @@ handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) ->
{stop, {shutdown, conflict}, ok, State}; {stop, {shutdown, conflict}, ok, State};
%% PUBLISH: %% PUBLISH:
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From,
State = #state{awaiting_rel = AwaitingRel}) -> State = #state{awaiting_rel = AwaitingRel}) ->
reply(case is_awaiting_full(State) of reply(case is_awaiting_full(State) of
false -> false ->
@ -384,13 +384,12 @@ handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
true -> true ->
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
false -> false ->
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)},
{emqx_broker:publish(Msg), ensure_await_rel_timer(State1)} {emqx_broker:publish(Msg), ensure_await_rel_timer(State1)}
end; end;
true -> true ->
emqx_metrics:inc('messages/qos2/dropped'), emqx_metrics:inc('messages/qos2/dropped'),
?LOG(warning, "Dropped message for too many awaiting_rel: ~p", ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
[emqx_message:format(Msg)], State),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end); end);
@ -408,7 +407,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
%% PUBREL: %% PUBREL:
handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) -> handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) ->
reply(case maps:take(PacketId, AwaitingRel) of reply(case maps:take(PacketId, AwaitingRel) of
{_, AwaitingRel1} -> {_Ts, AwaitingRel1} ->
{ok, State#state{awaiting_rel = AwaitingRel1}}; {ok, State#state{awaiting_rel = AwaitingRel1}};
error -> error ->
emqx_metrics:inc('packets/pubrel/missed'), emqx_metrics:inc('packets/pubrel/missed'),
@ -639,8 +638,9 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
case emqx_inflight:is_empty(Inflight) of case emqx_inflight:is_empty(Inflight) of
true -> State; true -> State;
false -> false ->
InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end,
retry_delivery(Force, InflightMsgs, os:timestamp(), State) Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)),
retry_delivery(Force, Msgs, os:timestamp(), State)
end. end.
retry_delivery(_Force, [], _Now, State) -> retry_delivery(_Force, [], _Now, State) ->
@ -650,9 +650,9 @@ retry_delivery(_Force, [], _Now, State) ->
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
State = #state{inflight = Inflight, retry_interval = Interval}) -> State = #state{inflight = Inflight, retry_interval = Interval}) ->
%% Microseconds -> MilliSeconds %% Microseconds -> MilliSeconds
Diff = timer:now_diff(Now, Ts) div 1000, Age = timer:now_diff(Now, Ts) div 1000,
if if
Force orelse (Diff >= Interval) -> Force orelse (Age >= Interval) ->
Inflight1 = case {Type, Msg0} of Inflight1 = case {Type, Msg0} of
{publish, {PacketId, Msg}} -> {publish, {PacketId, Msg}} ->
case emqx_message:is_expired(Msg) of case emqx_message:is_expired(Msg) of
@ -669,7 +669,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
end, end,
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
true -> true ->
ensure_retry_timer(Interval - Diff, State) ensure_retry_timer(Interval - max(0, Age), State)
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -679,36 +679,21 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
case maps:size(AwaitingRel) of case maps:size(AwaitingRel) of
0 -> State; 0 -> State;
_ -> Msgs = lists:sort(sortfun(awaiting_rel), maps:to_list(AwaitingRel)), _ -> expire_awaiting_rel(lists:keysort(2, maps:to_list(AwaitingRel)), os:timestamp(), State)
expire_awaiting_rel(Msgs, os:timestamp(), State)
end. end.
expire_awaiting_rel([], _Now, State) -> expire_awaiting_rel([], _Now, State) ->
State#state{await_rel_timer = undefined}; State#state{await_rel_timer = undefined};
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now, expire_awaiting_rel([{PacketId, Ts} | More], Now,
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, TS) div 1000) of case (timer:now_diff(Now, Ts) div 1000) of
Diff when Diff >= Timeout -> Age when Age >= Timeout ->
emqx_metrics:inc('messages/qos2/dropped'), emqx_metrics:inc('messages/qos2/expired'),
?LOG(warning, "Dropped message for await_rel_timeout: ~p", ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State),
[emqx_message:format(Msg)], State), expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Age ->
Diff -> ensure_await_rel_timer(Timeout - max(0, Age), State)
ensure_await_rel_timer(Timeout - Diff, State)
end.
%%------------------------------------------------------------------------------
%% Sort Inflight, AwaitingRel
%%------------------------------------------------------------------------------
sortfun(inflight) ->
fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end;
sortfun(awaiting_rel) ->
fun({_, #message{timestamp = Ts1}},
{_, #message{timestamp = Ts2}}) ->
Ts1 < Ts2
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------