Merge pull request #6366 from lafirest/feat/dynamic_threshold
feat(emqx_slow_subs): add dyanamic threshold
This commit is contained in:
commit
acb63eeb7c
|
@ -37,6 +37,8 @@
|
|||
|
||||
-spec(load(list()) -> ok).
|
||||
load(Env) ->
|
||||
Threshold = proplists:get_value(threshold, Env),
|
||||
_ = emqx_message_latency_stats:update_threshold(Threshold),
|
||||
emqx_mod_sup:start_child(?LIB, worker, [Env]),
|
||||
ok.
|
||||
|
||||
|
|
|
@ -88,7 +88,8 @@ t_log_and_pub(_) ->
|
|||
[Client ! stop || Client <- Clients],
|
||||
ok.
|
||||
base_conf() ->
|
||||
[ {top_k_num, 5}
|
||||
[ {threshold, 500}
|
||||
, {top_k_num, 5}
|
||||
, {expire_interval, timer:seconds(3)}
|
||||
, {notice_interval, 1500}
|
||||
, {notice_qos, 0}
|
||||
|
|
|
@ -57,7 +57,8 @@ end_per_testcase(_, Config) ->
|
|||
Config.
|
||||
|
||||
base_conf() ->
|
||||
[ {top_k_num, 5}
|
||||
[ {threshold, 500}
|
||||
, {top_k_num, 5}
|
||||
, {expire_interval, timer:seconds(60)}
|
||||
, {notice_interval, 0}
|
||||
, {notice_qos, 0}
|
||||
|
|
|
@ -1013,12 +1013,6 @@ end}.
|
|||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
%% @doc Threshold for slow subscription statistics
|
||||
{mapping, "zone.$name.latency_stats_threshold", "emqx.zones", [
|
||||
{default, "100ms"},
|
||||
{datatype, {duration, ms}}
|
||||
]}.
|
||||
|
||||
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
|
||||
{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [
|
||||
{default, 0},
|
||||
|
@ -2230,6 +2224,11 @@ end}.
|
|||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "module.slow_subs.threshold", "emqx.modules", [
|
||||
{default, "500ms"},
|
||||
{datatype, {duration, ms}}
|
||||
]}.
|
||||
|
||||
{mapping, "module.slow_subs.expire_interval", "emqx.modules", [
|
||||
{default, "5m"},
|
||||
{datatype, {duration, ms}}
|
||||
|
|
|
@ -17,17 +17,19 @@
|
|||
-module(emqx_message_latency_stats).
|
||||
|
||||
%% API
|
||||
-export([ new/1, new/2, update/3
|
||||
, check_expire/4, latency/1]).
|
||||
-export([ new/1, update/3, check_expire/4, latency/1]).
|
||||
|
||||
-export([get_threshold/0, update_threshold/1]).
|
||||
|
||||
-define(NOW, erlang:system_time(millisecond)).
|
||||
-define(MINIMUM_INSERT_INTERVAL, 1000).
|
||||
-define(MINIMUM_THRESHOLD, 100).
|
||||
-define(DEFAULT_THRESHOLD, 500).
|
||||
-define(THRESHOLD_KEY, {?MODULE, threshold}).
|
||||
|
||||
-opaque stats() :: #{ threshold := number()
|
||||
, ema := emqx_moving_average:ema()
|
||||
-opaque stats() :: #{ ema := emqx_moving_average:ema()
|
||||
, last_update_time := timestamp()
|
||||
, last_access_time := timestamp() %% timestamp of last access top-k
|
||||
, last_access_time := timestamp() %% timestamp of last try to call hook
|
||||
, last_insert_value := non_neg_integer()
|
||||
}.
|
||||
|
||||
|
@ -44,22 +46,19 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
-spec new(emqx_types:zone()) -> stats().
|
||||
new(Zone) ->
|
||||
Samples = get_env(Zone, latency_samples, 1),
|
||||
Threshold = get_env(Zone, latency_stats_threshold, ?MINIMUM_THRESHOLD),
|
||||
new(Samples, Threshold).
|
||||
|
||||
-spec new(non_neg_integer(), number()) -> stats().
|
||||
new(SamplesT, ThresholdT) ->
|
||||
-spec new(non_neg_integer() | emqx_types:zone()) -> stats().
|
||||
new(SamplesT) when is_integer(SamplesT) ->
|
||||
Samples = erlang:max(1, SamplesT),
|
||||
Threshold = erlang:max(?MINIMUM_THRESHOLD, ThresholdT),
|
||||
#{ ema => emqx_moving_average:new(exponential, #{period => Samples})
|
||||
, threshold => Threshold
|
||||
, last_update_time => 0
|
||||
, last_access_time => 0
|
||||
, last_insert_value => 0
|
||||
}.
|
||||
};
|
||||
|
||||
new(Zone) ->
|
||||
Samples = get_env(Zone, latency_samples, 1),
|
||||
new(Samples).
|
||||
|
||||
-spec update(emqx_types:clientid(), number(), stats()) -> stats().
|
||||
update(ClientId, Val, #{ema := EMA} = Stats) ->
|
||||
|
@ -82,25 +81,35 @@ check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) ->
|
|||
latency(#{ema := #{average := Average}}) ->
|
||||
Average.
|
||||
|
||||
-spec update_threshold(pos_integer()) -> pos_integer().
|
||||
update_threshold(Threshold) ->
|
||||
Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD),
|
||||
persistent_term:put(?THRESHOLD_KEY, Val),
|
||||
Val.
|
||||
|
||||
get_threshold() ->
|
||||
persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
|
||||
call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
|
||||
when LIT >= Now - ?MINIMUM_INSERT_INTERVAL ->
|
||||
S;
|
||||
|
||||
call_hook(_, _, _, Latency, #{threshold := Threshold} = S)
|
||||
when Latency =< Threshold ->
|
||||
call_hook(_, _, _, Latency, S)
|
||||
when Latency =< ?MINIMUM_THRESHOLD ->
|
||||
S;
|
||||
|
||||
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
|
||||
ToInsert = erlang:floor(Latency),
|
||||
Arg = #{clientid => ClientId,
|
||||
latency => ToInsert,
|
||||
type => Type,
|
||||
last_insert_value => LIV,
|
||||
update_time => Now},
|
||||
emqx:run_hook('message.slow_subs_stats', [Arg]),
|
||||
Stats#{last_insert_value := ToInsert,
|
||||
last_access_time := Now}.
|
||||
case Latency =< get_threshold() of
|
||||
true ->
|
||||
Stats#{last_access_time := Now};
|
||||
_ ->
|
||||
ToInsert = erlang:floor(Latency),
|
||||
Arg = #{clientid => ClientId,
|
||||
latency => ToInsert,
|
||||
type => Type,
|
||||
last_insert_value => LIV,
|
||||
update_time => Now},
|
||||
emqx:run_hook('message.slow_subs_stats', [Arg]),
|
||||
Stats#{last_insert_value := ToInsert,
|
||||
last_access_time := Now}
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue