From 590a87a72b74e8d1b4e136cc41fbe35cad5c20b3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 15 Jun 2022 11:16:34 +0800 Subject: [PATCH] fix: the delayed module is disbled after ./bin/emqx stop --- apps/emqx_modules/src/emqx_delayed.erl | 94 ++++++++----------- apps/emqx_modules/src/emqx_modules_app.erl | 4 +- apps/emqx_modules/test/emqx_delayed_SUITE.erl | 12 +-- .../test/emqx_delayed_api_SUITE.erl | 8 +- 4 files changed, 53 insertions(+), 65 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 72fd15d88..99ae97dd3 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -45,20 +45,24 @@ %% gen_server callbacks -export([ - enable/0, - disable/0, - set_max_delayed_messages/1, + load/0, + unload/0, + load_or_unload/1, + get_conf/1, update_config/1, list/1, get_delayed_message/1, get_delayed_message/2, delete_delayed_message/1, delete_delayed_message/2, - post_config_update/5, cluster_list/1, cluster_query/4 ]). +-export([ + post_config_update/5 +]). + -export([format_delayed/1]). %% exported for `emqx_telemetry' @@ -75,8 +79,7 @@ publish_timer := maybe(timer:tref()), publish_at := non_neg_integer(), stats_timer := maybe(reference()), - stats_fun := maybe(fun((pos_integer()) -> ok)), - max_delayed_messages := non_neg_integer() + stats_fun := maybe(fun((pos_integer()) -> ok)) }. %% sync ms with record change @@ -138,21 +141,23 @@ on_message_publish(Msg) -> -spec start_link() -> emqx_types:startlink_ret(). start_link() -> - Opts = emqx_conf:get([delayed], #{}), - gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec store(delayed_message()) -> ok | {error, atom()}. store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). -enable() -> - enable(true). +get_conf(Key) -> + emqx_conf:get([delayed, Key]). -disable() -> - enable(false). +load() -> + load_or_unload(true). -set_max_delayed_messages(Max) -> - gen_server:call(?SERVER, {set_max_delayed_messages, Max}). +unload() -> + load_or_unload(false). + +load_or_unload(Bool) -> + gen_server:call(?SERVER, {do_load_or_unload, Bool}). list(Params) -> emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). @@ -240,54 +245,46 @@ delete_delayed_message(Node, Id) -> update_config(Config) -> emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). -post_config_update(_KeyPath, Config, _NewConf, _OldConf, _AppEnvs) -> - gen_server:call(?SERVER, {update_config, Config}). +post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) -> + Enable = maps:get(enable, NewConf, undefined), + load_or_unload(Enable). %%-------------------------------------------------------------------- %% gen_server callback %%-------------------------------------------------------------------- -init([Opts]) -> +init([]) -> + ok = mria:wait_for_tables([?TAB]), erlang:process_flag(trap_exit, true), emqx_conf:add_handler([delayed], ?MODULE), - MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0), State = ensure_stats_event( ensure_publish_timer(#{ publish_timer => undefined, publish_at => 0, stats_timer => undefined, - stats_fun => undefined, - max_delayed_messages => MaxDelayedMessages + stats_fun => undefined }) ), - {ok, ensure_enable(emqx:get_config([delayed, enable]), State)}. + {ok, do_load_or_unload(emqx:get_config([delayed, enable]), State)}. -handle_call({set_max_delayed_messages, Max}, _From, State) -> - {reply, ok, State#{max_delayed_messages => Max}}; -handle_call( - {store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := 0} -) -> - ok = mria:dirty_write(?TAB, DelayedMsg), - emqx_metrics:inc('messages.delayed'), - {reply, ok, ensure_publish_timer(Key, State)}; -handle_call( - {store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := Max} -) -> +handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> Size = mnesia:table_info(?TAB, size), - case Size >= Max of - true -> + case get_conf(max_delayed_messages) of + 0 -> + ok = mria:dirty_write(?TAB, DelayedMsg), + emqx_metrics:inc('messages.delayed'), + {reply, ok, ensure_publish_timer(Key, State)}; + Max when Size >= Max -> {reply, {error, max_delayed_messages_full}, State}; - false -> + Max when Size < Max -> ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)} end; -handle_call({update_config, Config}, _From, #{max_delayed_messages := Max} = State) -> - Max2 = maps:get(<<"max_delayed_messages">>, Config, Max), - State2 = State#{max_delayed_messages := Max2}, - State3 = ensure_enable(maps:get(<<"enable">>, Config, undefined), State2), - {reply, ok, State3}; +handle_call({do_load_or_unload, Bool}, _From, State0) -> + State = do_load_or_unload(Bool, State0), + {reply, ok, State}; handle_call(Req, _From, State) -> ?tp(error, emqx_delayed_unexpected_call, #{call => Req}), {reply, ignored, State}. @@ -312,7 +309,7 @@ handle_info(Info, State) -> terminate(_Reason, #{stats_timer := StatsTimer} = State) -> emqx_conf:remove_handler([delayed]), emqx_misc:cancel_timer(StatsTimer), - ensure_enable(false, State). + do_load_or_unload(false, State). code_change(_Vsn, State, _Extra) -> {ok, State}. @@ -381,22 +378,13 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> -spec delayed_count() -> non_neg_integer(). delayed_count() -> mnesia:table_info(?TAB, size). -enable(Enable) -> - case emqx_conf:get_raw([delayed]) of - #{<<"enable">> := Enable} -> - ok; - Cfg -> - {ok, _} = update_config(Cfg#{<<"enable">> := Enable}), - ok - end. - -ensure_enable(true, State) -> +do_load_or_unload(true, State) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), State; -ensure_enable(false, #{publish_timer := PubTimer} = State) -> +do_load_or_unload(false, #{publish_timer := PubTimer} = State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), emqx_misc:cancel_timer(PubTimer), ets:delete_all_objects(?TAB), State#{publish_timer := undefined, publish_at := 0}; -ensure_enable(_, State) -> +do_load_or_unload(_, State) -> State. diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 3f5d4b187..5b5188f2b 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -33,7 +33,7 @@ stop(_State) -> ok. maybe_enable_modules() -> - emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(), + emqx_conf:get([delayed, enable], true) andalso emqx_delayed:load(), emqx_modules_conf:is_telemetry_enabled() andalso emqx_telemetry:enable(), emqx_observer_cli:enable(), emqx_conf_cli:load(), @@ -42,7 +42,7 @@ maybe_enable_modules() -> emqx_modules_conf:load(). maybe_disable_modules() -> - emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(), + emqx_conf:get([delayed, enable], true) andalso emqx_delayed:unload(), emqx_modules_conf:is_telemetry_enabled() andalso emqx_telemetry:disable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(), emqx_rewrite:disable(), diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index c38d34641..a476b6f75 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -52,24 +52,24 @@ init_per_testcase(t_load_case, Config) -> Config; init_per_testcase(_Case, Config) -> {atomic, ok} = mria:clear_table(emqx_delayed), - ok = emqx_delayed:enable(), + ok = emqx_delayed:load(), Config. end_per_testcase(_Case, _Config) -> {atomic, ok} = mria:clear_table(emqx_delayed), - ok = emqx_delayed:disable(). + ok = emqx_delayed:unload(). %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- t_enable_disable_case(_) -> - emqx_delayed:disable(), + emqx_delayed:unload(), Hooks = emqx_hooks:lookup('message.publish'), MFA = {emqx_delayed, on_message_publish, []}, ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)), - ok = emqx_delayed:enable(), + ok = emqx_delayed:load(), Hooks1 = emqx_hooks:lookup('message.publish'), ?assertNotEqual(false, lists:keyfind(MFA, 2, Hooks1)), @@ -80,7 +80,7 @@ t_enable_disable_case(_) -> _ = on_message_publish(DelayedMsg0), ?assertMatch(#{data := Datas} when Datas =/= [], emqx_delayed:list(#{})), - emqx_delayed:disable(), + emqx_delayed:unload(), ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)), ?assertMatch(#{data := []}, emqx_delayed:list(#{})), ok. @@ -144,7 +144,7 @@ t_list(_) -> ). t_max(_) -> - emqx_delayed:set_max_delayed_messages(1), + emqx:update_config([delayed, max_delayed_messages], 1), DelayedMsg0 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t0">>, <<"delayed0">>), DelayedMsg1 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t1">>, <<"delayed1">>), diff --git a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl index dbed931a2..b29cd54f4 100644 --- a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl @@ -40,11 +40,11 @@ init_per_suite(Config) -> [emqx_conf, emqx_modules, emqx_dashboard], fun set_special_configs/1 ), - emqx_delayed:enable(), + emqx_delayed:load(), Config. end_per_suite(Config) -> - ok = emqx_delayed:disable(), + ok = emqx_delayed:unload(), emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]), Config. @@ -100,7 +100,7 @@ t_status(_Config) -> t_messages(_) -> clear_all_record(), - emqx_delayed:enable(), + emqx_delayed:load(), {ok, C1} = emqtt:start_link([{clean_start, true}]), {ok, _} = emqtt:connect(C1), @@ -200,7 +200,7 @@ t_messages(_) -> t_large_payload(_) -> clear_all_record(), - emqx_delayed:enable(), + emqx_delayed:load(), {ok, C1} = emqtt:start_link([{clean_start, true}]), {ok, _} = emqtt:connect(C1),