From 9b194baf69a5c1b3aafd2676071bad0cd1c5633f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 5 Nov 2022 17:11:12 +0100 Subject: [PATCH] fix(emqx_rule_monitor): sleep before retry but not after --- .../emqx_rule_engine/src/emqx_rule_engine.erl | 5 +-- .../src/emqx_rule_monitor.erl | 40 +++++++++++++++---- .../test/emqx_rule_monitor_SUITE.erl | 5 ++- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index e642932c2..b458469da 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -79,8 +79,6 @@ , action_instance_params/0 ]). --define(T_RETRY, 60000). - %% redefine this macro to confine the appup scope -undef(RAISE). -define(RAISE(_EXP_, _ERROR_CONTEXT_), @@ -684,8 +682,7 @@ init_resource_with_retrier(Module, OnCreate, ResId, Config) -> status = #{is_alive => true}}, emqx_rule_registry:add_resource_params(ResParams) catch Class:Reason:ST -> - Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY), - emqx_rule_monitor:ensure_resource_retrier(ResId, Interval), + emqx_rule_monitor:ensure_resource_retrier(ResId), erlang:raise(Class, {init_resource, Reason}, ST) end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index 8af8aa7ff..82a93d0be 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -32,10 +32,18 @@ -export([ start_link/0 , stop/0 , async_refresh_resources_rules/0 - , ensure_resource_retrier/2 + , ensure_resource_retrier/1 , retry_loop/3 ]). +%% fot test +-export([ put_retry_interval/1 + , get_retry_interval/0 + , erase_retry_interval/0 + ]). + +-define(T_RETRY, 60000). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -46,10 +54,22 @@ init([]) -> _ = erlang:process_flag(trap_exit, true), {ok, #{retryers => #{}}}. +put_retry_interval(I) when is_integer(I) andalso I >= 10 -> + _ = persistent_term:put({?MODULE, resource_restart_interval}, I), + ok. + +erase_retry_interval() -> + _ = persistent_term:erase({?MODULE, resource_restart_interval}), + ok. + +get_retry_interval() -> + persistent_term:get({?MODULE, resource_restart_interval}, ?T_RETRY). + async_refresh_resources_rules() -> gen_server:cast(?MODULE, async_refresh). -ensure_resource_retrier(ResId, Interval) -> +ensure_resource_retrier(ResId) -> + Interval = get_retry_interval(), gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). handle_call(_Msg, _From, State) -> @@ -111,11 +131,12 @@ update_object(Tag, Obj, Retryer, State) -> }. create_restart_handler(Tag, Obj, Interval) -> - ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), + ?LOG(info, "starting_a_retry_loop for ~p ~p, with delay interval: ~p", [Tag, Obj, Interval]), %% spawn a dedicated process to handle the restarting asynchronously spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). retry_loop(resource, ResId, Interval) -> + timer:sleep(Interval), case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = Type, config = Config}} -> try @@ -124,10 +145,15 @@ retry_loop(resource, ResId, Interval) -> ok = emqx_rule_engine:init_resource(M, F, ResId, Config), refresh_and_enable_rules_of_resource(ResId) catch - Err:Reason:ST -> - ?LOG(warning, "init_resource failed: ~p, ~0p", - [{Err, Reason}, ST]), - timer:sleep(Interval), + Err:Reason:Stacktrace -> + %% do not log stacktrace if it's a throw + LogContext = + case Err of + throw -> Reason; + _ -> {Reason, Stacktrace} + end, + ?LOG_SENSITIVE(warning, "init_resource_retry_failed ~p, ~0p", [ResId, LogContext]), + %% keep looping ?MODULE:retry_loop(resource, ResId, Interval) end; not_found -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 6389c1a2e..fac76d235 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -48,7 +48,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_restart_resource, Config) -> - persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100), + emqx_rule_monitor:put_retry_interval(100), Opts = [public, named_table, set, {read_concurrency, true}], _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), ets:new(t_restart_resource, [named_table, public]), @@ -77,7 +77,6 @@ init_per_testcase(_, Config) -> Config. end_per_testcase(t_restart_resource, Config) -> - persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000), ets:delete(t_restart_resource), common_end_per_testcases(), Config; @@ -91,7 +90,9 @@ end_per_testcase(_, Config) -> common_init_per_testcase() -> {ok, _} = emqx_rule_monitor:start_link(). + common_end_per_testcases() -> + emqx_rule_monitor:erase_retry_interval(), emqx_rule_monitor:stop(). t_restart_resource(_) ->