diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl index e96c2a907..11d25380e 100644 --- a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl @@ -151,8 +151,8 @@ init_topk_tab() -> init([Conf]) -> notice_tick(Conf), expire_tick(Conf), - MaxSize = get_value(top_k_num, Conf), - load(MaxSize), + update_threshold(Conf), + load(Conf), {ok, #{config => Conf, last_tick_at => ?NOW, enable => true}}. @@ -163,8 +163,8 @@ handle_call({enable, Enable}, _From, IsEnable -> State; true -> - MaxSize = get_value(max_topk_num, Cfg), - load(MaxSize), + update_threshold(Cfg), + load(Cfg), State#{enable := true}; _ -> unload(), @@ -274,7 +274,8 @@ publish(TickTime, Cfg, Notices) -> _ = emqx_broker:safe_publish(Msg), ok. -load(MaxSize) -> +load(Cfg) -> + MaxSize = get_value(top_k_num, Cfg), _ = emqx:hook('message.slow_subs_stats', fun ?MODULE:on_stats_update/2, [#{max_size => MaxSize}]), @@ -319,3 +320,8 @@ try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) -> ets:delete(?TOPK_TAB, First) end end. + +update_threshold(Conf) -> + Threshold = proplists:get_value(threshold, Conf), + _ = emqx_message_latency_stats:update_threshold(Threshold), + ok. diff --git a/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl b/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl index 1f5ebebf6..b9117fe8b 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl @@ -37,8 +37,6 @@ -spec(load(list()) -> ok). load(Env) -> - Threshold = proplists:get_value(threshold, Env), - _ = emqx_message_latency_stats:update_threshold(Threshold), emqx_mod_sup:start_child(?LIB, worker, [Env]), ok. diff --git a/src/emqx_slow_subs/emqx_message_latency_stats.erl b/src/emqx_slow_subs/emqx_message_latency_stats.erl index 49f0c8f1e..1d6158d59 100644 --- a/src/emqx_slow_subs/emqx_message_latency_stats.erl +++ b/src/emqx_slow_subs/emqx_message_latency_stats.erl @@ -98,6 +98,10 @@ call_hook(_, _, _, Latency, S) when Latency =< ?MINIMUM_THRESHOLD -> S; +call_hook(_, Now, _, _, #{last_access_time := LIT} = S) + when Now =< LIT + ?MINIMUM_INSERT_INTERVAL -> + S; + call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) -> case Latency =< get_threshold() of true ->