diff --git a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl index 21cf2692e..f22aec41c 100644 --- a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl +++ b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl @@ -24,7 +24,7 @@ -logger_header("[SLOW TOPICS]"). --export([ start_link/1, on_publish_done/5, enable/0 +-export([ start_link/1, on_publish_done/3, enable/0 , disable/0, clear_history/0 ]). @@ -42,7 +42,7 @@ -type state() :: #{ config := proplist:proplist() , period := pos_integer() , last_tick_at := pos_integer() - , counter := counters:counter_ref() + , counter := counters:counters_ref() , enable := boolean() }. @@ -70,6 +70,13 @@ -type slow_log() :: #slow_log{}. -type top_k_map() :: #{emqx_types:topic() => top_k()}. +-type publish_done_env() :: #{ ignore_before_create := boolean() + , threshold := pos_integer() + , counter := counters:counters_ref() + }. + +-type publish_done_args() :: #{session_rebirth_time => pos_integer()}. + -ifdef(TEST). -define(TOPK_ACCESS, public). -else. @@ -90,13 +97,16 @@ start_link(Env) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). --spec on_publish_done(message(), - pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok. -on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _) +-spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok. +on_publish_done(#message{timestamp = Timestamp}, + #{session_rebirth_time := Created}, + #{ignore_before_create := IgnoreBeforeCreate}) when IgnoreBeforeCreate, Timestamp < Created -> ok; -on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) -> +on_publish_done(#message{timestamp = Timestamp} = Msg, + _, + #{threshold := Threshold, counter := Counter}) -> case ?NOW - Timestamp of Elapsed when Elapsed > Threshold -> case get_log_quota(Counter) of @@ -202,7 +212,7 @@ init_topk_tab(_) -> , {read_concurrency, true} ]). --spec get_log_quota(counter:counter_ref()) -> boolean(). +-spec get_log_quota(counters:counters_ref()) -> boolean(). get_log_quota(Counter) -> case counters:get(Counter, ?QUOTA_IDX) of Quota when Quota > 0 -> @@ -212,7 +222,7 @@ get_log_quota(Counter) -> false end. --spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok. +-spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok. set_log_quota(Cfg, Counter) -> MaxLogNum = get_value(max_log_num, Cfg), counters:put(Counter, ?QUOTA_IDX, MaxLogNum). @@ -328,12 +338,15 @@ publish(TickTime, Cfg, Notices) -> load(IgnoreBeforeCreate, Threshold, Counter) -> _ = emqx:hook('message.publish_done', - fun ?MODULE:on_publish_done/5, - [IgnoreBeforeCreate, Threshold, Counter]), + fun ?MODULE:on_publish_done/3, + [#{ignore_before_create => IgnoreBeforeCreate, + threshold => Threshold, + counter => Counter} + ]), ok. unload() -> - emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5). + emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3). -spec get_topic(proplists:proplist()) -> binary(). get_topic(Cfg) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d4b671e71..6122982ae 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -320,7 +320,8 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - emqx:run_hook('message.publish_done', [Msg, CreatedAt]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => CreatedAt}]), Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> @@ -346,7 +347,8 @@ pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt} case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> %% execute hook here, because message record will be replaced by pubrel - emqx:run_hook('message.publish_done', [Msg, CreatedAt]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => CreatedAt}]), Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> @@ -443,7 +445,8 @@ deliver([Msg | More], Acc, Session) -> end. deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> - emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => Session#session.created_at}]), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(Msg = #message{qos = QoS}, Session =