From c22e2a0d185d108ad5f1b0f86ec700df753d6987 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 4 Feb 2023 12:48:12 +0800 Subject: [PATCH] fix: add retry for rules --- .../emqx_rule_engine/src/emqx_rule_engine.erl | 30 ++++--- .../src/emqx_rule_monitor.erl | 83 ++++++++++++++----- .../test/emqx_rule_monitor_SUITE.erl | 4 +- 3 files changed, 82 insertions(+), 35 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 5b3570cf2..b7467a81f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -529,19 +529,27 @@ refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> refresh_rules_when_boot() -> lists:foreach(fun (#rule{enabled = true} = Rule) -> - try refresh_rule(Rule) - catch _:_ -> - %% We set the enable = false when rule init failed to avoid bad rules running - %% without actions created properly. - %% The init failure might be caused by a disconnected resource, in this case the - %% actions can not be created, so the rules won't work. - %% After the user fixed the problem he can enable it manually, - %% doing so will also recreate the actions. - emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) - end; - (_) -> ok + ensure_rule_retrier(Rule); + (#rule{enabled = false, state = refresh_failed_at_bootup} = Rule) -> + %% the rule was previously disabled by emqx so we need to retry it + ensure_rule_retrier(Rule); + (#rule{enabled = false, id = RuleId}) -> + ?LOG(warning, "rule ~s was disabled by the user, won't re-enable it", [RuleId]) end, emqx_rule_registry:get_rules()). +ensure_rule_retrier(#rule{id = RuleId} = Rule) -> + try refresh_rule(Rule) + catch _:_ -> + %% We set the enable = false when rule init failed to avoid bad rules running + %% without actions created properly. + %% The init failure might be caused by a disconnected resource, in this case the + %% actions can not be created, so the rules won't work. + %% After the user fixed the problem he can enable it manually, + %% doing so will also recreate the actions. + emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}), + emqx_rule_monitor:ensure_rule_retrier(RuleId) + end. + refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) -> ok = emqx_rule_metrics:create_rule_metrics(RuleId), lists:foreach(fun emqx_rule_events:load/1, Topics), diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index f45684053..4e4d4ebe1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -33,16 +33,21 @@ , stop/0 , async_refresh_resources_rules/0 , ensure_resource_retrier/1 + , ensure_rule_retrier/1 + , retry_loop/2 , retry_loop/3 ]). -%% fot test --export([ put_retry_interval/1 - , get_retry_interval/0 - , erase_retry_interval/0 +-export([ put_resource_retry_interval/1 + , put_rule_retry_interval/1 + , get_resource_retry_interval/0 + , get_rule_retry_interval/0 + , erase_resource_retry_interval/0 + , erase_rule_retry_interval/0 ]). --define(T_RETRY, 60000). +-define(T_RESOURCE_RETRY, 15000). +-define(T_RULE_RETRY, 20000). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -54,23 +59,33 @@ init([]) -> _ = erlang:process_flag(trap_exit, true), {ok, #{retryers => #{}}}. -put_retry_interval(I) when is_integer(I) andalso I >= 10 -> +put_resource_retry_interval(I) when is_integer(I) andalso I >= 10 -> _ = persistent_term:put({?MODULE, resource_restart_interval}, I), ok. - -erase_retry_interval() -> - _ = persistent_term:erase({?MODULE, resource_restart_interval}), +put_rule_retry_interval(I) when is_integer(I) andalso I >= 10 -> + _ = persistent_term:put({?MODULE, rule_restart_interval}, I), ok. -get_retry_interval() -> - persistent_term:get({?MODULE, resource_restart_interval}, ?T_RETRY). +erase_resource_retry_interval() -> + _ = persistent_term:erase({?MODULE, resource_restart_interval}), + ok. +erase_rule_retry_interval() -> + _ = persistent_term:erase({?MODULE, rule_restart_interval}), + ok. + +get_resource_retry_interval() -> + persistent_term:get({?MODULE, resource_restart_interval}, ?T_RESOURCE_RETRY). +get_rule_retry_interval() -> + persistent_term:get({?MODULE, rule_restart_interval}, ?T_RULE_RETRY). async_refresh_resources_rules() -> gen_server:cast(?MODULE, async_refresh). ensure_resource_retrier(ResId) -> - Interval = get_retry_interval(), - gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). + gen_server:cast(?MODULE, {create_restart_handler, resource, ResId}). + +ensure_rule_retrier(RuleId) -> + gen_server:cast(?MODULE, {create_restart_handler, rule, RuleId}). handle_call(_Msg, _From, State) -> {reply, ok, State}. @@ -82,12 +97,12 @@ handle_cast(async_refresh, State) -> Pid = spawn_link(fun do_async_refresh/0), {noreply, State#{boot_refresh_pid => Pid}}; -handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> +handle_cast({create_restart_handler, Tag, Obj}, State) -> Objects = maps:get(Tag, State, #{}), NewState = case maps:find(Obj, Objects) of error -> update_object(Tag, Obj, - create_restart_handler(Tag, Obj, Interval), State); + create_restart_handler(Tag, Obj), State); {ok, _Pid} -> State end, @@ -130,13 +145,17 @@ update_object(Tag, Obj, Retryer, State) -> retryers => Retryers#{Retryer => {Tag, Obj}} }. -create_restart_handler(Tag, Obj, Interval) -> - ?LOG(info, "starting_a_retry_loop for ~p ~p, with delay interval: ~p", [Tag, Obj, Interval]), +create_restart_handler(Tag, Obj) -> + ?LOG(warning, "starting_a_retry_loop for ~p ~p", [Tag, Obj]), %% spawn a dedicated process to handle the restarting asynchronously - spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). + spawn_link(?MODULE, retry_loop, [Tag, Obj]). -retry_loop(resource, ResId, Interval) -> - timer:sleep(Interval), +%% retry_loop/3 is to avoid crashes during relup +retry_loop(Tag, ResId, _Interval) -> + retry_loop(Tag, ResId). + +retry_loop(resource, ResId) -> + timer:sleep(get_resource_retry_interval()), case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = Type, config = Config}} -> try @@ -154,10 +173,30 @@ retry_loop(resource, ResId, Interval) -> end, ?LOG_SENSITIVE(warning, "init_resource_retry_failed ~p, ~0p", [ResId, LogContext]), %% keep looping - ?MODULE:retry_loop(resource, ResId, Interval) + ?MODULE:retry_loop(resource, ResId) end; not_found -> ok + end; + +retry_loop(rule, RuleId) -> + timer:sleep(get_rule_retry_interval()), + case emqx_rule_registry:get_rule(RuleId) of + {ok, #rule{enabled = false, state = refresh_failed_at_bootup} = Rule} -> + try + emqx_rule_engine:refresh_rule(Rule), + emqx_rule_registry:add_rule(Rule#rule{enabled = true, state = normal}), + ?LOG(warning, "rule ~s has been refreshed and re-enabled", [RuleId]) + catch + Err:Reason:ST -> + ?LOG(warning, "init_rule failed: ~p, ~0p", + [{Err, Reason}, ST]), + ?MODULE:retry_loop(rule, RuleId) + end; + {ok, #rule{enabled = false, state = State}} when State =/= refresh_failed_at_bootup -> + ?LOG(warning, "rule ~s was disabled by the user, won't re-enable it", [RuleId]); + _ -> + ok end. do_async_refresh() -> @@ -171,6 +210,6 @@ refresh_and_enable_rules_of_resource(ResId) -> fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) -> emqx_rule_engine:refresh_rule(Rule), emqx_rule_registry:add_rule(Rule#rule{enabled = true, state = normal}), - ?LOG(info, "rule ~s is refreshed and re-enabled", [Id]); + ?LOG(warning, "rule ~s is refreshed and re-enabled", [Id]); (_) -> ok end, emqx_rule_registry:find_rules_depends_on_resource(ResId)). diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 2bab6c4d8..1c478fb63 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -48,7 +48,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_restart_resource, Config) -> - emqx_rule_monitor:put_retry_interval(100), + emqx_rule_monitor:put_resource_retry_interval(100), Opts = [public, named_table, set, {read_concurrency, true}], _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), ets:new(t_restart_resource, [named_table, public]), @@ -95,7 +95,7 @@ common_init_per_testcase() -> common_end_per_testcases() -> ok = emqx_alarm:stop(), - emqx_rule_monitor:erase_retry_interval(), + emqx_rule_monitor:erase_resource_retry_interval(), emqx_rule_monitor:stop(). t_restart_resource(_) ->