From 86001765cb5b7acbb0025d2690753955e216890b Mon Sep 17 00:00:00 2001 From: lafirest Date: Thu, 30 Dec 2021 14:40:25 +0800 Subject: [PATCH] fix(emqx_retainer): fix config update error --- apps/emqx_retainer/src/emqx_retainer.erl | 26 +++++++++++------- apps/emqx_retainer/src/emqx_retainer_api.erl | 2 +- .../test/emqx_retainer_SUITE.erl | 27 +++++++++++++++++++ 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 9be449b60..e1780cc08 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -36,7 +36,8 @@ , update_config/1 , clean/0 , delete/1 - , page_read/3]). + , page_read/3 + , post_config_update/5]). %% gen_server callbacks -export([ init/1 @@ -165,24 +166,31 @@ get_expiry_time(#message{timestamp = Ts}) -> get_stop_publish_clear_msg() -> emqx_conf:get([?APP, stop_publish_clear_msg], false). --spec update_config(hocon:config()) -> ok. +-spec update_config(hocon:config()) -> {ok, _} | {error, _}. update_config(Conf) -> - gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}). + emqx_conf:update([emqx_retainer], Conf, #{override_to => cluster}). clean() -> - gen_server:call(?MODULE, ?FUNCTION_NAME). + call(?FUNCTION_NAME). delete(Topic) -> - gen_server:call(?MODULE, {?FUNCTION_NAME, Topic}). + call({?FUNCTION_NAME, Topic}). page_read(Topic, Page, Limit) -> - gen_server:call(?MODULE, {?FUNCTION_NAME, Topic, Page, Limit}). + call({?FUNCTION_NAME, Topic, Page, Limit}). + +post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> + call({update_config, NewConf, OldConf}). + +call(Req) -> + gen_server:call(?MODULE, Req, infinity). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> + emqx_conf:add_handler([emqx_retainer], ?MODULE), init_shared_context(), State = new_state(), #{enable := Enable} = Cfg = emqx:get_config([?APP]), @@ -194,9 +202,7 @@ init([]) -> State end}. -handle_call({update_config, Conf}, _, State) -> - OldConf = emqx:get_config([?APP]), - {ok, #{config := NewConf}} = emqx:update_config([?APP], Conf), +handle_call({update_config, NewConf, OldConf}, _, State) -> State2 = update_config(State, NewConf, OldConf), {reply, ok, State2}; @@ -326,7 +332,7 @@ require_semaphore(Semaphore, Id) -> -spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean(). wait_semaphore(X, Id) when X < 0 -> - gen_server:call(?MODULE, {?FUNCTION_NAME, Id}, infinity); + call({?FUNCTION_NAME, Id}); wait_semaphore(_, _) -> true. diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 5424629d9..b60ac5627 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -130,7 +130,7 @@ config(get, _) -> config(put, #{body := Body}) -> try - ok = emqx_retainer:update_config(Body), + {ok, _} = emqx_retainer:update_config(Body), {200, emqx:get_raw_config([emqx_retainer])} catch _:Reason:_ -> {400, diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 7191bacc0..51db25ab2 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -define(APP, emqx_retainer). +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -49,13 +50,39 @@ emqx_retainer { %%-------------------------------------------------------------------- init_per_suite(Config) -> + application:load(emqx_conf), + ok = ekka:start(), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), + ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF), emqx_common_test_helpers:start_apps([emqx_retainer]), Config. end_per_suite(_Config) -> + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + meck:unload(emqx_alarm), + emqx_common_test_helpers:stop_apps([emqx_retainer]). +init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), + timer:sleep(200), + Config. + +end_per_testcase(_, Config) -> + case erlang:whereis(node()) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end, + Config. + %%-------------------------------------------------------------------- %% Test Cases %%--------------------------------------------------------------------