diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index bd61ff5a6..acb8c2b9c 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -299,13 +299,16 @@ do_clear_history() -> ets:delete_all_objects(?TOPK_TAB). check_enable(Enable, #{enable := IsEnable} = State) -> - case Enable of - IsEnable -> - State; - true -> + case {IsEnable, Enable} of + {false, true} -> load(State); + {true, false} -> + unload(State); + {true, true} -> + S1 = unload(State), + load(S1); _ -> - unload(State) + State end. start_timer(Name, Fun, State) -> diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl index 10d858d44..1ddf59058 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl @@ -25,6 +25,7 @@ -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). -define(NOW, erlang:system_time(millisecond)). +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(BASE_CONF, <<""" slow_subs { @@ -38,14 +39,28 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + application:load(emqx_conf), + ok = ekka:start(), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), + + ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?BASE_CONF), emqx_common_test_helpers:start_apps([emqx_slow_subs]), Config. end_per_suite(_Config) -> + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + meck:unload(emqx_alarm), + emqx_common_test_helpers:stop_apps([emqx_slow_subs]). init_per_testcase(t_expire, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), Cfg = emqx_config:get([slow_subs]), emqx_slow_subs:update_settings(Cfg#{expire_interval := 1500}), Config; @@ -54,6 +69,12 @@ init_per_testcase(_, Config) -> Config. end_per_testcase(_, _) -> + case erlang:whereis(node()) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end, ok. %%-------------------------------------------------------------------- @@ -93,7 +114,7 @@ t_expire(_) -> Each = fun(I) -> ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), - last_update_time = Now}) + last_update_time = Now - timer:minutes(5)}) end, lists:foreach(Each, lists:seq(1, 5)),