fix(emqx_slow_subs): change on_publish_completed to on_delivery_completed
This commit is contained in:
parent
0a85e71e09
commit
696acbfc5c
|
@ -24,7 +24,7 @@
|
|||
|
||||
-logger_header("[SLOW Subs]").
|
||||
|
||||
-export([ start_link/1, on_publish_completed/3, enable/0
|
||||
-export([ start_link/1, on_delivery_completed/3, enable/0
|
||||
, disable/0, clear_history/0, init_tab/0
|
||||
]).
|
||||
|
||||
|
@ -81,14 +81,14 @@
|
|||
start_link(Env) ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
|
||||
|
||||
on_publish_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg)
|
||||
on_delivery_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg)
|
||||
when Ts =< BirthTime ->
|
||||
ok;
|
||||
|
||||
on_publish_completed(Msg, Env, Cfg) ->
|
||||
on_publish_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
|
||||
on_delivery_completed(Msg, Env, Cfg) ->
|
||||
on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
|
||||
|
||||
on_publish_completed(#message{topic = Topic} = Msg,
|
||||
on_delivery_completed(#message{topic = Topic} = Msg,
|
||||
#{clientid := ClientId},
|
||||
Now, #{threshold := Threshold,
|
||||
stats_type := StatsType,
|
||||
|
@ -190,8 +190,8 @@ load(Cfg) ->
|
|||
MaxSize = get_value(top_k_num, Cfg),
|
||||
StatsType = get_value(stats_type, Cfg),
|
||||
Threshold = get_value(threshold, Cfg),
|
||||
_ = emqx:hook('message.publish_completed',
|
||||
fun ?MODULE:on_publish_completed/3,
|
||||
_ = emqx:hook('delivery.completed',
|
||||
fun ?MODULE:on_delivery_completed/3,
|
||||
[#{max_size => MaxSize,
|
||||
stats_type => StatsType,
|
||||
threshold => Threshold
|
||||
|
@ -199,7 +199,7 @@ load(Cfg) ->
|
|||
ok.
|
||||
|
||||
unload() ->
|
||||
emqx:unhook('message.publish_completed', fun ?MODULE:on_publish_completed/3 ),
|
||||
emqx:unhook('delivery.completed', fun ?MODULE:on_delivery_completed/3 ),
|
||||
do_clear_history().
|
||||
|
||||
do_clear(Cfg, Logs) ->
|
||||
|
|
|
@ -335,7 +335,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
|
|||
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
||||
on_publish_completed(Msg, Session),
|
||||
on_delivery_completed(Msg, Session),
|
||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||
return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
|
||||
{value, _Other} ->
|
||||
|
@ -393,7 +393,7 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
|
|||
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||
{value, {Pubrel, Msg}} when is_record(Pubrel, pubrel_await) ->
|
||||
on_publish_completed(Msg, Session),
|
||||
on_delivery_completed(Msg, Session),
|
||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||
dequeue(ClientInfo, Session#session{inflight = Inflight1});
|
||||
{value, _Other} ->
|
||||
|
@ -460,7 +460,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
|
|||
end.
|
||||
|
||||
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
|
||||
on_publish_completed(Msg, Session),
|
||||
on_delivery_completed(Msg, Session),
|
||||
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
||||
|
||||
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
||||
|
@ -712,9 +712,9 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Message Latency Stats
|
||||
%%--------------------------------------------------------------------
|
||||
on_publish_completed(Msg,
|
||||
on_delivery_completed(Msg,
|
||||
#session{clientid = ClientId, created_at = CreateAt}) ->
|
||||
emqx:run_hook('message.publish_completed',
|
||||
emqx:run_hook('delivery.completed',
|
||||
[Msg, #{ session_birth_time => CreateAt
|
||||
, clientid => ClientId
|
||||
}]).
|
||||
|
|
Loading…
Reference in New Issue