From 696acbfc5cc99a4755471cff93c110e6c0aaef4c Mon Sep 17 00:00:00 2001 From: lafirest Date: Mon, 17 Jan 2022 16:15:16 +0800 Subject: [PATCH] fix(emqx_slow_subs): change on_publish_completed to on_delivery_completed --- .../src/emqx_slow_subs/emqx_slow_subs.erl | 24 +++++++++---------- src/emqx_session.erl | 12 +++++----- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl index 254d902bd..50769a8fc 100644 --- a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl @@ -24,7 +24,7 @@ -logger_header("[SLOW Subs]"). --export([ start_link/1, on_publish_completed/3, enable/0 +-export([ start_link/1, on_delivery_completed/3, enable/0 , disable/0, clear_history/0, init_tab/0 ]). @@ -81,18 +81,18 @@ start_link(Env) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). -on_publish_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg) +on_delivery_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> ok; -on_publish_completed(Msg, Env, Cfg) -> - on_publish_completed(Msg, Env, erlang:system_time(millisecond), Cfg). +on_delivery_completed(Msg, Env, Cfg) -> + on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg). -on_publish_completed(#message{topic = Topic} = Msg, - #{clientid := ClientId}, - Now, #{threshold := Threshold, - stats_type := StatsType, - max_size := MaxSize}) -> +on_delivery_completed(#message{topic = Topic} = Msg, + #{clientid := ClientId}, + Now, #{threshold := Threshold, + stats_type := StatsType, + max_size := MaxSize}) -> TimeSpan = calc_timespan(StatsType, Msg, Now), case TimeSpan =< Threshold of true -> ok; @@ -190,8 +190,8 @@ load(Cfg) -> MaxSize = get_value(top_k_num, Cfg), StatsType = get_value(stats_type, Cfg), Threshold = get_value(threshold, Cfg), - _ = emqx:hook('message.publish_completed', - fun ?MODULE:on_publish_completed/3, + _ = emqx:hook('delivery.completed', + fun ?MODULE:on_delivery_completed/3, [#{max_size => MaxSize, stats_type => StatsType, threshold => Threshold @@ -199,7 +199,7 @@ load(Cfg) -> ok. unload() -> - emqx:unhook('message.publish_completed', fun ?MODULE:on_publish_completed/3 ), + emqx:unhook('delivery.completed', fun ?MODULE:on_delivery_completed/3 ), do_clear_history(). do_clear(Cfg, Logs) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index e4487234c..ecd5dbcda 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -335,7 +335,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - on_publish_completed(Msg, Session), + on_delivery_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1})); {value, _Other} -> @@ -393,7 +393,7 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Pubrel, Msg}} when is_record(Pubrel, pubrel_await) -> - on_publish_completed(Msg, Session), + on_delivery_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), dequeue(ClientInfo, Session#session{inflight = Inflight1}); {value, _Other} -> @@ -460,7 +460,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) -> end. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> - on_publish_completed(Msg, Session), + on_delivery_completed(Msg, Session), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = @@ -712,9 +712,9 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %%-------------------------------------------------------------------- %% Message Latency Stats %%-------------------------------------------------------------------- -on_publish_completed(Msg, - #session{clientid = ClientId, created_at = CreateAt}) -> - emqx:run_hook('message.publish_completed', +on_delivery_completed(Msg, + #session{clientid = ClientId, created_at = CreateAt}) -> + emqx:run_hook('delivery.completed', [Msg, #{ session_birth_time => CreateAt , clientid => ClientId }]).