diff --git a/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl b/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl index b9117fe8b..1f5ebebf6 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl @@ -37,6 +37,8 @@ -spec(load(list()) -> ok). load(Env) -> + Threshold = proplists:get_value(threshold, Env), + _ = emqx_message_latency_stats:update_threshold(Threshold), emqx_mod_sup:start_child(?LIB, worker, [Env]), ok. diff --git a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl index 46f144a57..f2763ae6c 100644 --- a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl @@ -88,7 +88,8 @@ t_log_and_pub(_) -> [Client ! stop || Client <- Clients], ok. base_conf() -> - [ {top_k_num, 5} + [ {threshold, 500} + , {top_k_num, 5} , {expire_interval, timer:seconds(3)} , {notice_interval, 1500} , {notice_qos, 0} diff --git a/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl index 2efc7230a..c8f6b9cfc 100644 --- a/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl @@ -57,7 +57,8 @@ end_per_testcase(_, Config) -> Config. base_conf() -> - [ {top_k_num, 5} + [ {threshold, 500} + , {top_k_num, 5} , {expire_interval, timer:seconds(60)} , {notice_interval, 0} , {notice_qos, 0} diff --git a/priv/emqx.schema b/priv/emqx.schema index 61a98f824..facd86455 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1013,12 +1013,6 @@ end}. {datatype, integer} ]}. -%% @doc Threshold for slow subscription statistics -{mapping, "zone.$name.latency_stats_threshold", "emqx.zones", [ - {default, "100ms"}, - {datatype, {duration, ms}} -]}. - %% @doc Max Packets that Awaiting PUBREL, 0 means no limit {mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [ {default, 0}, @@ -2230,6 +2224,11 @@ end}. {datatype, string} ]}. +{mapping, "module.slow_subs.threshold", "emqx.modules", [ + {default, "500ms"}, + {datatype, {duration, ms}} +]}. + {mapping, "module.slow_subs.expire_interval", "emqx.modules", [ {default, "5m"}, {datatype, {duration, ms}} diff --git a/src/emqx_slow_subs/emqx_message_latency_stats.erl b/src/emqx_slow_subs/emqx_message_latency_stats.erl index dfeba1c68..54fe04760 100644 --- a/src/emqx_slow_subs/emqx_message_latency_stats.erl +++ b/src/emqx_slow_subs/emqx_message_latency_stats.erl @@ -17,17 +17,18 @@ -module(emqx_message_latency_stats). %% API --export([ new/1, new/2, update/3 - , check_expire/4, latency/1]). +-export([ new/1, update/3, check_expire/4, latency/1]). + +-export([get_threshold/0, update_threshold/1]). -define(NOW, erlang:system_time(millisecond)). -define(MINIMUM_INSERT_INTERVAL, 1000). --define(MINIMUM_THRESHOLD, 100). +-define(MINIMUM_THRESHOLD, 500). +-define(THRESHOLD_KEY, {?MODULE, threshold}). --opaque stats() :: #{ threshold := number() - , ema := emqx_moving_average:ema() +-opaque stats() :: #{ ema := emqx_moving_average:ema() , last_update_time := timestamp() - , last_access_time := timestamp() %% timestamp of last access top-k + , last_access_time := timestamp() %% timestamp of last try to call hook , last_insert_value := non_neg_integer() }. @@ -44,22 +45,19 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec new(emqx_types:zone()) -> stats(). -new(Zone) -> - Samples = get_env(Zone, latency_samples, 1), - Threshold = get_env(Zone, latency_stats_threshold, ?MINIMUM_THRESHOLD), - new(Samples, Threshold). --spec new(non_neg_integer(), number()) -> stats(). -new(SamplesT, ThresholdT) -> +-spec new(non_neg_integer() | emqx_types:zone()) -> stats(). +new(SamplesT) when is_integer(SamplesT) -> Samples = erlang:max(1, SamplesT), - Threshold = erlang:max(?MINIMUM_THRESHOLD, ThresholdT), #{ ema => emqx_moving_average:new(exponential, #{period => Samples}) - , threshold => Threshold , last_update_time => 0 , last_access_time => 0 , last_insert_value => 0 - }. + }; + +new(Zone) -> + Samples = get_env(Zone, latency_samples, 1), + new(Samples). -spec update(emqx_types:clientid(), number(), stats()) -> stats(). update(ClientId, Val, #{ema := EMA} = Stats) -> @@ -82,25 +80,35 @@ check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) -> latency(#{ema := #{average := Average}}) -> Average. +-spec update_threshold(pos_integer()) -> pos_integer(). +update_threshold(Threshold) -> + Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD), + persistent_term:put(?THRESHOLD_KEY, Val), + Val. + +get_threshold() -> + persistent_term:get(?THRESHOLD_KEY, ?MINIMUM_THRESHOLD). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats(). -call_hook(_, Now, _, _, #{last_access_time := LIT} = S) - when LIT >= Now - ?MINIMUM_INSERT_INTERVAL -> - S; - -call_hook(_, _, _, Latency, #{threshold := Threshold} = S) - when Latency =< Threshold -> +call_hook(_, _, _, Latency, S) + when Latency =< ?MINIMUM_THRESHOLD -> S; call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) -> - ToInsert = erlang:floor(Latency), - Arg = #{clientid => ClientId, - latency => ToInsert, - type => Type, - last_insert_value => LIV, - update_time => Now}, - emqx:run_hook('message.slow_subs_stats', [Arg]), - Stats#{last_insert_value := ToInsert, - last_access_time := Now}. + case get_threshold() >= Latency of + true -> + Stats#{last_access_time := Now}; + _ -> + ToInsert = erlang:floor(Latency), + Arg = #{clientid => ClientId, + latency => ToInsert, + type => Type, + last_insert_value => LIV, + update_time => Now}, + emqx:run_hook('message.slow_subs_stats', [Arg]), + Stats#{last_insert_value := ToInsert, + last_access_time := Now} + end.