diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index 445465e1f..10cb2a4e7 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -23,7 +23,7 @@ -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). -export([ start_link/0, on_stats_update/2, update_settings/1 - , clear_history/0, init_topk_tab/0 + , clear_history/0, init_topk_tab/0, post_config_update/5 ]). %% gen_server callbacks @@ -123,8 +123,8 @@ on_stats_update(#{clientid := ClientId, clear_history() -> gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT). -update_settings(Enable) -> - gen_server:call(?MODULE, {?FUNCTION_NAME, Enable}, ?DEF_CALL_TIMEOUT). +update_settings(Conf) -> + emqx_conf:update([emqx_slow_subs], Conf, #{override_to => cluster}). init_topk_tab() -> case ets:whereis(?TOPK_TAB) of @@ -138,11 +138,16 @@ init_topk_tab() -> ?TOPK_TAB end. +post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) -> + gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> + emqx_conf:add_handler([emqx_slow_subs], ?MODULE), + InitState = #{enable => false, last_tick_at => 0, expire_timer => undefined, @@ -152,7 +157,8 @@ init([]) -> Enable = emqx:get_config([emqx_slow_subs, enable]), {ok, check_enable(Enable, InitState)}. -handle_call({update_settings, Enable}, _From, State) -> +handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) -> + emqx_config:put([emqx_slow_subs], Conf), State2 = check_enable(Enable, State), {reply, ok, State2}; diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 4c5873ab9..fb102d80c 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -107,6 +107,5 @@ settings(get, _) -> {200, emqx:get_raw_config([?APP_NAME], #{})}; settings(put, #{body := Body}) -> - {ok, #{config := #{enable := Enable}}} = emqx:update_config([?APP], Body), - _ = emqx_slow_subs:update_settings(Enable), + _ = emqx_slow_subs:update_settings(Body), {200, emqx:get_raw_config([?APP_NAME], #{})}. diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl index 009feda01..01bfd7f26 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -32,6 +32,7 @@ -define(BASE_PATH, "api"). -define(NOW, erlang:system_time(millisecond)). +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, <<""" emqx_slow_subs @@ -49,23 +50,42 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + application:load(emqx_conf), + ok = ekka:start(), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), + ok = emqx_config:init_load(emqx_slow_subs_schema, ?CONF_DEFAULT), emqx_mgmt_api_test_util:init_suite([emqx_slow_subs]), {ok, _} = application:ensure_all_started(emqx_authn), Config. end_per_suite(Config) -> + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + meck:unload(emqx_alarm), + application:stop(emqx_authn), emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]), Config. init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), application:ensure_all_started(emqx_slow_subs), timer:sleep(500), Config. end_per_testcase(_, Config) -> application:stop(emqx_slow_subs), + case erlang:whereis(node()) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end, Config. t_get_history(_) -> @@ -119,6 +139,8 @@ t_settting(_) -> auth_header_() ), + timer:sleep(1000), + GetReturn = decode_json(GetData), ?assertEqual(Conf2, GetReturn),