diff --git a/src/emqx_session.erl b/src/emqx_session.erl index ecd5dbcda..7d903d77d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -650,7 +650,7 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}). replay(ClientInfo, Session = #session{inflight = Inflight}) -> - Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) -> + Pubs = lists:map(fun({PacketId, {Pubrel, _Msg}}) when is_record(Pubrel, pubrel_await) -> {pubrel, PacketId}; ({PacketId, {Msg, _Ts}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} @@ -713,11 +713,16 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %% Message Latency Stats %%-------------------------------------------------------------------- on_delivery_completed(Msg, - #session{clientid = ClientId, created_at = CreateAt}) -> + #session{clientid = ClientId, + created_at = CreateAt}) when is_record(Msg, message) -> emqx:run_hook('delivery.completed', [Msg, #{ session_birth_time => CreateAt , clientid => ClientId - }]). + }]); + +%% in 4.4.0, timestamp are stored in pubrel_await, not message +on_delivery_completed(_Ts, _Session) -> + ok. mark_begin_deliver(Msg) -> emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg).