feat(emqx_slow_subs): add dyanamic threshold
This commit is contained in:
parent
fb1bfcac8e
commit
48f8c735ea
|
@ -37,6 +37,8 @@
|
||||||
|
|
||||||
-spec(load(list()) -> ok).
|
-spec(load(list()) -> ok).
|
||||||
load(Env) ->
|
load(Env) ->
|
||||||
|
Threshold = proplists:get_value(threshold, Env),
|
||||||
|
_ = emqx_message_latency_stats:update_threshold(Threshold),
|
||||||
emqx_mod_sup:start_child(?LIB, worker, [Env]),
|
emqx_mod_sup:start_child(?LIB, worker, [Env]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,8 @@ t_log_and_pub(_) ->
|
||||||
[Client ! stop || Client <- Clients],
|
[Client ! stop || Client <- Clients],
|
||||||
ok.
|
ok.
|
||||||
base_conf() ->
|
base_conf() ->
|
||||||
[ {top_k_num, 5}
|
[ {threshold, 500}
|
||||||
|
, {top_k_num, 5}
|
||||||
, {expire_interval, timer:seconds(3)}
|
, {expire_interval, timer:seconds(3)}
|
||||||
, {notice_interval, 1500}
|
, {notice_interval, 1500}
|
||||||
, {notice_qos, 0}
|
, {notice_qos, 0}
|
||||||
|
|
|
@ -57,7 +57,8 @@ end_per_testcase(_, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
base_conf() ->
|
base_conf() ->
|
||||||
[ {top_k_num, 5}
|
[ {threshold, 500}
|
||||||
|
, {top_k_num, 5}
|
||||||
, {expire_interval, timer:seconds(60)}
|
, {expire_interval, timer:seconds(60)}
|
||||||
, {notice_interval, 0}
|
, {notice_interval, 0}
|
||||||
, {notice_qos, 0}
|
, {notice_qos, 0}
|
||||||
|
|
|
@ -1013,12 +1013,6 @@ end}.
|
||||||
{datatype, integer}
|
{datatype, integer}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Threshold for slow subscription statistics
|
|
||||||
{mapping, "zone.$name.latency_stats_threshold", "emqx.zones", [
|
|
||||||
{default, "100ms"},
|
|
||||||
{datatype, {duration, ms}}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
|
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
|
||||||
{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [
|
{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [
|
||||||
{default, 0},
|
{default, 0},
|
||||||
|
@ -2230,6 +2224,11 @@ end}.
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.slow_subs.threshold", "emqx.modules", [
|
||||||
|
{default, "500ms"},
|
||||||
|
{datatype, {duration, ms}}
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "module.slow_subs.expire_interval", "emqx.modules", [
|
{mapping, "module.slow_subs.expire_interval", "emqx.modules", [
|
||||||
{default, "5m"},
|
{default, "5m"},
|
||||||
{datatype, {duration, ms}}
|
{datatype, {duration, ms}}
|
||||||
|
|
|
@ -17,17 +17,18 @@
|
||||||
-module(emqx_message_latency_stats).
|
-module(emqx_message_latency_stats).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([ new/1, new/2, update/3
|
-export([ new/1, update/3, check_expire/4, latency/1]).
|
||||||
, check_expire/4, latency/1]).
|
|
||||||
|
-export([get_threshold/0, update_threshold/1]).
|
||||||
|
|
||||||
-define(NOW, erlang:system_time(millisecond)).
|
-define(NOW, erlang:system_time(millisecond)).
|
||||||
-define(MINIMUM_INSERT_INTERVAL, 1000).
|
-define(MINIMUM_INSERT_INTERVAL, 1000).
|
||||||
-define(MINIMUM_THRESHOLD, 100).
|
-define(MINIMUM_THRESHOLD, 500).
|
||||||
|
-define(THRESHOLD_KEY, {?MODULE, threshold}).
|
||||||
|
|
||||||
-opaque stats() :: #{ threshold := number()
|
-opaque stats() :: #{ ema := emqx_moving_average:ema()
|
||||||
, ema := emqx_moving_average:ema()
|
|
||||||
, last_update_time := timestamp()
|
, last_update_time := timestamp()
|
||||||
, last_access_time := timestamp() %% timestamp of last access top-k
|
, last_access_time := timestamp() %% timestamp of last try to call hook
|
||||||
, last_insert_value := non_neg_integer()
|
, last_insert_value := non_neg_integer()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -44,22 +45,19 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-spec new(emqx_types:zone()) -> stats().
|
|
||||||
new(Zone) ->
|
|
||||||
Samples = get_env(Zone, latency_samples, 1),
|
|
||||||
Threshold = get_env(Zone, latency_stats_threshold, ?MINIMUM_THRESHOLD),
|
|
||||||
new(Samples, Threshold).
|
|
||||||
|
|
||||||
-spec new(non_neg_integer(), number()) -> stats().
|
-spec new(non_neg_integer() | emqx_types:zone()) -> stats().
|
||||||
new(SamplesT, ThresholdT) ->
|
new(SamplesT) when is_integer(SamplesT) ->
|
||||||
Samples = erlang:max(1, SamplesT),
|
Samples = erlang:max(1, SamplesT),
|
||||||
Threshold = erlang:max(?MINIMUM_THRESHOLD, ThresholdT),
|
|
||||||
#{ ema => emqx_moving_average:new(exponential, #{period => Samples})
|
#{ ema => emqx_moving_average:new(exponential, #{period => Samples})
|
||||||
, threshold => Threshold
|
|
||||||
, last_update_time => 0
|
, last_update_time => 0
|
||||||
, last_access_time => 0
|
, last_access_time => 0
|
||||||
, last_insert_value => 0
|
, last_insert_value => 0
|
||||||
}.
|
};
|
||||||
|
|
||||||
|
new(Zone) ->
|
||||||
|
Samples = get_env(Zone, latency_samples, 1),
|
||||||
|
new(Samples).
|
||||||
|
|
||||||
-spec update(emqx_types:clientid(), number(), stats()) -> stats().
|
-spec update(emqx_types:clientid(), number(), stats()) -> stats().
|
||||||
update(ClientId, Val, #{ema := EMA} = Stats) ->
|
update(ClientId, Val, #{ema := EMA} = Stats) ->
|
||||||
|
@ -82,25 +80,35 @@ check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) ->
|
||||||
latency(#{ema := #{average := Average}}) ->
|
latency(#{ema := #{average := Average}}) ->
|
||||||
Average.
|
Average.
|
||||||
|
|
||||||
|
-spec update_threshold(pos_integer()) -> pos_integer().
|
||||||
|
update_threshold(Threshold) ->
|
||||||
|
Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD),
|
||||||
|
persistent_term:put(?THRESHOLD_KEY, Val),
|
||||||
|
Val.
|
||||||
|
|
||||||
|
get_threshold() ->
|
||||||
|
persistent_term:get(?THRESHOLD_KEY, ?MINIMUM_THRESHOLD).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
|
-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
|
||||||
call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
|
call_hook(_, _, _, Latency, S)
|
||||||
when LIT >= Now - ?MINIMUM_INSERT_INTERVAL ->
|
when Latency =< ?MINIMUM_THRESHOLD ->
|
||||||
S;
|
|
||||||
|
|
||||||
call_hook(_, _, _, Latency, #{threshold := Threshold} = S)
|
|
||||||
when Latency =< Threshold ->
|
|
||||||
S;
|
S;
|
||||||
|
|
||||||
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
|
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
|
||||||
ToInsert = erlang:floor(Latency),
|
case get_threshold() >= Latency of
|
||||||
Arg = #{clientid => ClientId,
|
true ->
|
||||||
latency => ToInsert,
|
Stats#{last_access_time := Now};
|
||||||
type => Type,
|
_ ->
|
||||||
last_insert_value => LIV,
|
ToInsert = erlang:floor(Latency),
|
||||||
update_time => Now},
|
Arg = #{clientid => ClientId,
|
||||||
emqx:run_hook('message.slow_subs_stats', [Arg]),
|
latency => ToInsert,
|
||||||
Stats#{last_insert_value := ToInsert,
|
type => Type,
|
||||||
last_access_time := Now}.
|
last_insert_value => LIV,
|
||||||
|
update_time => Now},
|
||||||
|
emqx:run_hook('message.slow_subs_stats', [Arg]),
|
||||||
|
Stats#{last_insert_value := ToInsert,
|
||||||
|
last_access_time := Now}
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue