From dc3e7dc21c07384b7c287d401e942c315a4c9632 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 7 Dec 2019 16:45:20 +0800 Subject: [PATCH] Ignore the expired messages --- src/emqx_session.erl | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index a6917536a..107a539f3 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -400,9 +400,16 @@ dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> dequeue(0, Msgs, Q); {{value, Msg = #message{qos = ?QOS_0}}, Q1} -> - dequeue(Cnt, [Msg|Msgs], Q1); + dequeue(Cnt, acc_msg(Msg, Msgs), Q1); {{value, Msg}, Q1} -> - dequeue(Cnt-1, [Msg|Msgs], Q1) + dequeue(Cnt-1, acc_msg(Msg, Msgs), Q1) + end. + +-compile({inline, [acc_msg/2]}). +acc_msg(Msg, Msgs) -> + case emqx_message:is_expired(Msg) of + true -> Msgs; + false -> [Msg|Msgs] end. %%-------------------------------------------------------------------- @@ -590,19 +597,17 @@ resume(ClientId, #session{subscriptions = Subs}) -> -spec(redeliver(session()) -> {ok, replies(), session()}). redeliver(Session = #session{inflight = Inflight}) -> - Pubs = replay(Inflight), + Pubs = lists:map(fun to_pub/1, emqx_inflight:to_list(Inflight)), case dequeue(Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession} end. -replay(Inflight) -> - lists:map(fun({PacketId, {pubrel, _Ts}}) -> - {pubrel, PacketId}; - ({PacketId, {Msg, _Ts}}) -> - {PacketId, emqx_message:set_flag(dup, true, Msg)} - end, emqx_inflight:to_list(Inflight)). +to_pub({PacketId, {pubrel, _Ts}}) -> + {pubrel, PacketId}; +to_pub({PacketId, {Msg, _Ts}}) -> + {PacketId, emqx_message:set_flag(dup, true, Msg)}. %%-------------------------------------------------------------------- %% Next Packet Id