fix(emqx_slow_subs): fix threshold related bugs
1. limit the interval between calling hooks 2. improve the code of update threshold
This commit is contained in:
parent
9965288947
commit
e651becd99
|
@ -151,8 +151,8 @@ init_topk_tab() ->
|
||||||
init([Conf]) ->
|
init([Conf]) ->
|
||||||
notice_tick(Conf),
|
notice_tick(Conf),
|
||||||
expire_tick(Conf),
|
expire_tick(Conf),
|
||||||
MaxSize = get_value(top_k_num, Conf),
|
update_threshold(Conf),
|
||||||
load(MaxSize),
|
load(Conf),
|
||||||
{ok, #{config => Conf,
|
{ok, #{config => Conf,
|
||||||
last_tick_at => ?NOW,
|
last_tick_at => ?NOW,
|
||||||
enable => true}}.
|
enable => true}}.
|
||||||
|
@ -163,8 +163,8 @@ handle_call({enable, Enable}, _From,
|
||||||
IsEnable ->
|
IsEnable ->
|
||||||
State;
|
State;
|
||||||
true ->
|
true ->
|
||||||
MaxSize = get_value(max_topk_num, Cfg),
|
update_threshold(Cfg),
|
||||||
load(MaxSize),
|
load(Cfg),
|
||||||
State#{enable := true};
|
State#{enable := true};
|
||||||
_ ->
|
_ ->
|
||||||
unload(),
|
unload(),
|
||||||
|
@ -274,7 +274,8 @@ publish(TickTime, Cfg, Notices) ->
|
||||||
_ = emqx_broker:safe_publish(Msg),
|
_ = emqx_broker:safe_publish(Msg),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
load(MaxSize) ->
|
load(Cfg) ->
|
||||||
|
MaxSize = get_value(top_k_num, Cfg),
|
||||||
_ = emqx:hook('message.slow_subs_stats',
|
_ = emqx:hook('message.slow_subs_stats',
|
||||||
fun ?MODULE:on_stats_update/2,
|
fun ?MODULE:on_stats_update/2,
|
||||||
[#{max_size => MaxSize}]),
|
[#{max_size => MaxSize}]),
|
||||||
|
@ -319,3 +320,8 @@ try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) ->
|
||||||
ets:delete(?TOPK_TAB, First)
|
ets:delete(?TOPK_TAB, First)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
update_threshold(Conf) ->
|
||||||
|
Threshold = proplists:get_value(threshold, Conf),
|
||||||
|
_ = emqx_message_latency_stats:update_threshold(Threshold),
|
||||||
|
ok.
|
||||||
|
|
|
@ -37,8 +37,6 @@
|
||||||
|
|
||||||
-spec(load(list()) -> ok).
|
-spec(load(list()) -> ok).
|
||||||
load(Env) ->
|
load(Env) ->
|
||||||
Threshold = proplists:get_value(threshold, Env),
|
|
||||||
_ = emqx_message_latency_stats:update_threshold(Threshold),
|
|
||||||
emqx_mod_sup:start_child(?LIB, worker, [Env]),
|
emqx_mod_sup:start_child(?LIB, worker, [Env]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -98,6 +98,10 @@ call_hook(_, _, _, Latency, S)
|
||||||
when Latency =< ?MINIMUM_THRESHOLD ->
|
when Latency =< ?MINIMUM_THRESHOLD ->
|
||||||
S;
|
S;
|
||||||
|
|
||||||
|
call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
|
||||||
|
when Now =< LIT + ?MINIMUM_INSERT_INTERVAL ->
|
||||||
|
S;
|
||||||
|
|
||||||
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
|
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
|
||||||
case Latency =< get_threshold() of
|
case Latency =< get_threshold() of
|
||||||
true ->
|
true ->
|
||||||
|
|
Loading…
Reference in New Issue