diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index e0eb5e3eb..1da28d4fa 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -301,13 +301,16 @@ do_clear_history() -> ets:delete_all_objects(?TOPK_TAB). check_enable(Enable, #{enable := IsEnable} = State) -> - case Enable of - IsEnable -> - State; - true -> + case {IsEnable, Enable} of + {false, true} -> load(State); + {true, false} -> + unload(State); + {true, true} -> + S1 = unload(State), + load(S1); _ -> - unload(State) + State end. start_timer(Name, Fun, State) -> diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl index 21ca43e87..1ddf59058 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl @@ -22,15 +22,16 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). --define(TOPK_TAB, emqx_slow_subs_topk). -define(NOW, erlang:system_time(millisecond)). +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(BASE_CONF, <<""" slow_subs { enable = true top_k_num = 5, - expire_interval = 3000 + expire_interval = 5m stats_type = whole }""">>). @@ -38,23 +39,48 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + application:load(emqx_conf), + ok = ekka:start(), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), + + ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?BASE_CONF), emqx_common_test_helpers:start_apps([emqx_slow_subs]), Config. end_per_suite(_Config) -> + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + meck:unload(emqx_alarm), + emqx_common_test_helpers:stop_apps([emqx_slow_subs]). +init_per_testcase(t_expire, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), + Cfg = emqx_config:get([slow_subs]), + emqx_slow_subs:update_settings(Cfg#{expire_interval := 1500}), + Config; + init_per_testcase(_, Config) -> Config. end_per_testcase(_, _) -> + case erlang:whereis(node()) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end, ok. %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- -t_log_and_pub(_) -> +t_pub(_) -> %% Sub topic first Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Clients = start_client(Subs), @@ -78,14 +104,26 @@ t_log_and_pub(_) -> timer:sleep(1000), Size = ets:info(?TOPK_TAB, size), - %% some time record maybe delete due to it expired - ?assert(Size =< 6 andalso Size > 3), + ?assert(Size =< 6 andalso Size >= 5), - timer:sleep(4000), - ?assert(ets:info(?TOPK_TAB, size) =:= 0), [Client ! stop || Client <- Clients], ok. +t_expire(_) -> + Now = ?NOW, + Each = fun(I) -> + ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), + last_update_time = Now - timer:minutes(5)}) + end, + + lists:foreach(Each, lists:seq(1, 5)), + + timer:sleep(3000), + Size = ets:info(?TOPK_TAB, size), + ?assertEqual(0, Size), + ok. + start_client(Subs) -> [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].