Ignore the expired messages

This commit is contained in:
Feng Lee 2019-12-07 16:45:20 +08:00
parent bbcd2bffc5
commit dc3e7dc21c
1 changed files with 14 additions and 9 deletions

View File

@ -400,9 +400,16 @@ dequeue(Cnt, Msgs, Q) ->
case emqx_mqueue:out(Q) of case emqx_mqueue:out(Q) of
{empty, _Q} -> dequeue(0, Msgs, Q); {empty, _Q} -> dequeue(0, Msgs, Q);
{{value, Msg = #message{qos = ?QOS_0}}, Q1} -> {{value, Msg = #message{qos = ?QOS_0}}, Q1} ->
dequeue(Cnt, [Msg|Msgs], Q1); dequeue(Cnt, acc_msg(Msg, Msgs), Q1);
{{value, Msg}, Q1} -> {{value, Msg}, Q1} ->
dequeue(Cnt-1, [Msg|Msgs], Q1) dequeue(Cnt-1, acc_msg(Msg, Msgs), Q1)
end.
-compile({inline, [acc_msg/2]}).
acc_msg(Msg, Msgs) ->
case emqx_message:is_expired(Msg) of
true -> Msgs;
false -> [Msg|Msgs]
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -590,19 +597,17 @@ resume(ClientId, #session{subscriptions = Subs}) ->
-spec(redeliver(session()) -> {ok, replies(), session()}). -spec(redeliver(session()) -> {ok, replies(), session()}).
redeliver(Session = #session{inflight = Inflight}) -> redeliver(Session = #session{inflight = Inflight}) ->
Pubs = replay(Inflight), Pubs = lists:map(fun to_pub/1, emqx_inflight:to_list(Inflight)),
case dequeue(Session) of case dequeue(Session) of
{ok, NSession} -> {ok, Pubs, NSession}; {ok, NSession} -> {ok, Pubs, NSession};
{ok, More, NSession} -> {ok, More, NSession} ->
{ok, lists:append(Pubs, More), NSession} {ok, lists:append(Pubs, More), NSession}
end. end.
replay(Inflight) -> to_pub({PacketId, {pubrel, _Ts}}) ->
lists:map(fun({PacketId, {pubrel, _Ts}}) ->
{pubrel, PacketId}; {pubrel, PacketId};
({PacketId, {Msg, _Ts}}) -> to_pub({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)} {PacketId, emqx_message:set_flag(dup, true, Msg)}.
end, emqx_inflight:to_list(Inflight)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Next Packet Id %% Next Packet Id