fix(emqx_slow_subs): add compatibility for old code

This commit is contained in:
lafirest 2022-01-18 10:19:23 +08:00
parent 696acbfc5c
commit 6414f7e55a
1 changed files with 8 additions and 3 deletions

View File

@ -650,7 +650,7 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
replay(ClientInfo, Session = #session{inflight = Inflight}) ->
Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) ->
Pubs = lists:map(fun({PacketId, {Pubrel, _Msg}}) when is_record(Pubrel, pubrel_await) ->
{pubrel, PacketId};
({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}
@ -713,11 +713,16 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
%% Message Latency Stats
%%--------------------------------------------------------------------
on_delivery_completed(Msg,
#session{clientid = ClientId, created_at = CreateAt}) ->
#session{clientid = ClientId,
created_at = CreateAt}) when is_record(Msg, message) ->
emqx:run_hook('delivery.completed',
[Msg, #{ session_birth_time => CreateAt
, clientid => ClientId
}]).
}]);
%% in 4.4.0, timestamp are stored in pubrel_await, not message
on_delivery_completed(_Ts, _Session) ->
ok.
mark_begin_deliver(Msg) ->
emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg).