diff --git a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl index ac1604c9b..21cf2692e 100644 --- a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl +++ b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl @@ -20,11 +20,11 @@ -include_lib("include/emqx.hrl"). -include_lib("include/logger.hrl"). --include("include/emqx_st_statistics.hrl"). +-include_lib("emqx_plugin_libs/include/emqx_st_statistics.hrl"). -logger_header("[SLOW TOPICS]"). --export([ start_link/1, on_publish_done/3, enable/0 +-export([ start_link/1, on_publish_done/5, enable/0 , disable/0, clear_history/0 ]). @@ -90,8 +90,13 @@ start_link(Env) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). --spec on_publish_done(message(), pos_integer(), counters:counters_ref()) -> ok. -on_publish_done(#message{timestamp = Timestamp} = Msg, Threshold, Counter) -> +-spec on_publish_done(message(), + pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok. +on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _) + when IgnoreBeforeCreate, Timestamp < Created -> + ok; + +on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) -> case ?NOW - Timestamp of Elapsed when Elapsed > Threshold -> case get_log_quota(Counter) of @@ -125,7 +130,8 @@ init([Env]) -> Counter = counters:new(1, [write_concurrency]), set_log_quota(Env, Counter), Threshold = get_value(threshold_time, Env), - load(Threshold, Counter), + IgnoreBeforeCreate = get_value(ignore_before_create, Env), + load(IgnoreBeforeCreate, Threshold, Counter), {ok, #{config => Env, period => 1, last_tick_at => ?NOW, @@ -139,7 +145,8 @@ handle_call({enable, Enable}, _From, State; true -> Threshold = get_value(threshold_time, Cfg), - load(Threshold, Counter), + IgnoreBeforeCreate = get_value(ignore_before_create, Cfg), + load(IgnoreBeforeCreate, Threshold, Counter), State#{enable := true}; _ -> unload(), @@ -319,12 +326,14 @@ publish(TickTime, Cfg, Notices) -> }), ok. -load(Threshold, Counter) -> - _ = emqx:hook('message.publish_done', fun ?MODULE:on_publish_done/3, [Threshold, Counter]), +load(IgnoreBeforeCreate, Threshold, Counter) -> + _ = emqx:hook('message.publish_done', + fun ?MODULE:on_publish_done/5, + [IgnoreBeforeCreate, Threshold, Counter]), ok. unload() -> - emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3). + emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5). -spec get_topic(proplists:proplist()) -> binary(). get_topic(Cfg) -> diff --git a/etc/emqx.conf b/etc/emqx.conf index a94afd961..7b81e40cc 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2219,6 +2219,11 @@ module.presence.qos = 1 ## Default: 10 seconds #module.st_statistics.threshold_time = 10S +## ignore the messages that before than session created +## +## Default: true +#module.st_statistics.ignore_before_create = true + ## Time window of slow topics statistics ## ## Value: 5 minutes diff --git a/priv/emqx.schema b/priv/emqx.schema index 69f3acc41..602de56b9 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2193,6 +2193,11 @@ end}. {datatype, {duration, ms}} ]}. +{mapping, "module.st_statistics.ignore_before_create", "emqx.modules", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + {mapping, "module.st_statistics.time_window", "emqx.modules", [ {default, "5M"}, {datatype, {duration, ms}} diff --git a/src/emqx_session.erl b/src/emqx_session.erl index db09bf8c9..d4b671e71 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -269,7 +269,8 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), - ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), + ok = emqx_hooks:run('session.unsubscribed', + [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> {error, ?RC_NO_SUBSCRIPTION_EXISTED} @@ -316,10 +317,10 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, -> {ok, emqx_types:message(), session()} | {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}). -puback(PacketId, Session = #session{inflight = Inflight}) -> +puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - emqx:run_hook('message.publish_done', [Msg]), + emqx:run_hook('message.publish_done', [Msg, CreatedAt]), Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> @@ -341,11 +342,11 @@ return_with(Msg, {ok, Publishes, Session}) -> -spec(pubrec(emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}). -pubrec(PacketId, Session = #session{inflight = Inflight}) -> +pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> %% execute hook here, because message record will be replaced by pubrel - emqx:run_hook('message.publish_done', [Msg]), + emqx:run_hook('message.publish_done', [Msg, CreatedAt]), Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> @@ -408,7 +409,7 @@ dequeue(Cnt, Msgs, Q) -> case emqx_message:is_expired(Msg) of true -> ok = inc_expired_cnt(delivery), dequeue(Cnt, Msgs, Q1); - false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) + false -> dequeue(acc_cnt(Msg, Cnt), [Msg | Msgs], Q1) end end. @@ -438,11 +439,11 @@ deliver([Msg | More], Acc, Session) -> {ok, Session1} -> deliver(More, Acc, Session1); {ok, [Publish], Session1} -> - deliver(More, [Publish|Acc], Session1) + deliver(More, [Publish | Acc], Session1) end. deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> - emqx:run_hook('message.publish_done', [Msg]), + emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(Msg = #message{qos = QoS}, Session = @@ -461,7 +462,7 @@ deliver_msg(Msg = #message{qos = QoS}, Session = {ok, [Publish], next_pkt_id(Session1)} end. --spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), +-spec(enqueue(list(emqx_types:deliver()) | emqx_types:message(), session()) -> session()). enqueue([Deliver], Session) -> %% Optimize Enrich = enrich_fun(Session), @@ -512,23 +513,23 @@ get_subopts(Topic, SubMap) -> end. enrich_subopts([], Msg, _Session) -> Msg; -enrich_subopts([{nl, 1}|Opts], Msg, Session) -> +enrich_subopts([{nl, 1} | Opts], Msg, Session) -> enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session); -enrich_subopts([{nl, 0}|Opts], Msg, Session) -> +enrich_subopts([{nl, 0} | Opts], Msg, Session) -> enrich_subopts(Opts, Msg, Session); -enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, +enrich_subopts([{qos, SubQoS} | Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos = true}) -> enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session); -enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, +enrich_subopts([{qos, SubQoS} | Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos = false}) -> enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); -enrich_subopts([{rap, 1}|Opts], Msg, Session) -> +enrich_subopts([{rap, 1} | Opts], Msg, Session) -> enrich_subopts(Opts, Msg, Session); -enrich_subopts([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> +enrich_subopts([{rap, 0} | Opts], Msg = #message{headers = #{retained := true}}, Session) -> enrich_subopts(Opts, Msg, Session); -enrich_subopts([{rap, 0}|Opts], Msg, Session) -> +enrich_subopts([{rap, 0} | Opts], Msg, Session) -> enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session); -enrich_subopts([{subid, SubId}|Opts], Msg, Session) -> +enrich_subopts([{subid, SubId} | Opts], Msg, Session) -> Props = emqx_message:get_header(properties, Msg, #{}), Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). @@ -556,8 +557,8 @@ retry(Session = #session{inflight = Inflight}) -> retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> {ok, lists:reverse(Acc), Interval, Session}; -retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = - #session{retry_interval = Interval, inflight = Inflight}) -> +retry_delivery([{PacketId, {Msg, Ts}} | More], Acc, Now, Session = + #session{retry_interval = Interval, inflight = Inflight}) -> case (Age = age(Now, Ts)) >= Interval of true -> {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight), @@ -574,12 +575,12 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) - false -> Msg1 = emqx_message:set_flag(dup, true, Msg), Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight), - {[{PacketId, Msg1}|Acc], Inflight1} + {[{PacketId, Msg1} | Acc], Inflight1} end; retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), - {[{pubrel, PacketId}|Acc], Inflight1}. + {[{pubrel, PacketId} | Acc], Inflight1}. %%-------------------------------------------------------------------- %% Expire Awaiting Rel