fix: add retry for rules

This commit is contained in:
Shawn 2023-02-04 12:48:12 +08:00
parent 6a7b0bd1f8
commit c22e2a0d18
3 changed files with 82 additions and 35 deletions

View File

@ -529,19 +529,27 @@ refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
refresh_rules_when_boot() ->
lists:foreach(fun
(#rule{enabled = true} = Rule) ->
try refresh_rule(Rule)
catch _:_ ->
%% We set the enable = false when rule init failed to avoid bad rules running
%% without actions created properly.
%% The init failure might be caused by a disconnected resource, in this case the
%% actions can not be created, so the rules won't work.
%% After the user fixed the problem he can enable it manually,
%% doing so will also recreate the actions.
emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup})
end;
(_) -> ok
ensure_rule_retrier(Rule);
(#rule{enabled = false, state = refresh_failed_at_bootup} = Rule) ->
%% the rule was previously disabled by emqx so we need to retry it
ensure_rule_retrier(Rule);
(#rule{enabled = false, id = RuleId}) ->
?LOG(warning, "rule ~s was disabled by the user, won't re-enable it", [RuleId])
end, emqx_rule_registry:get_rules()).
ensure_rule_retrier(#rule{id = RuleId} = Rule) ->
try refresh_rule(Rule)
catch _:_ ->
%% We set the enable = false when rule init failed to avoid bad rules running
%% without actions created properly.
%% The init failure might be caused by a disconnected resource, in this case the
%% actions can not be created, so the rules won't work.
%% After the user fixed the problem he can enable it manually,
%% doing so will also recreate the actions.
emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}),
emqx_rule_monitor:ensure_rule_retrier(RuleId)
end.
refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) ->
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
lists:foreach(fun emqx_rule_events:load/1, Topics),

View File

@ -33,16 +33,21 @@
, stop/0
, async_refresh_resources_rules/0
, ensure_resource_retrier/1
, ensure_rule_retrier/1
, retry_loop/2
, retry_loop/3
]).
%% fot test
-export([ put_retry_interval/1
, get_retry_interval/0
, erase_retry_interval/0
-export([ put_resource_retry_interval/1
, put_rule_retry_interval/1
, get_resource_retry_interval/0
, get_rule_retry_interval/0
, erase_resource_retry_interval/0
, erase_rule_retry_interval/0
]).
-define(T_RETRY, 60000).
-define(T_RESOURCE_RETRY, 15000).
-define(T_RULE_RETRY, 20000).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -54,23 +59,33 @@ init([]) ->
_ = erlang:process_flag(trap_exit, true),
{ok, #{retryers => #{}}}.
put_retry_interval(I) when is_integer(I) andalso I >= 10 ->
put_resource_retry_interval(I) when is_integer(I) andalso I >= 10 ->
_ = persistent_term:put({?MODULE, resource_restart_interval}, I),
ok.
erase_retry_interval() ->
_ = persistent_term:erase({?MODULE, resource_restart_interval}),
put_rule_retry_interval(I) when is_integer(I) andalso I >= 10 ->
_ = persistent_term:put({?MODULE, rule_restart_interval}, I),
ok.
get_retry_interval() ->
persistent_term:get({?MODULE, resource_restart_interval}, ?T_RETRY).
erase_resource_retry_interval() ->
_ = persistent_term:erase({?MODULE, resource_restart_interval}),
ok.
erase_rule_retry_interval() ->
_ = persistent_term:erase({?MODULE, rule_restart_interval}),
ok.
get_resource_retry_interval() ->
persistent_term:get({?MODULE, resource_restart_interval}, ?T_RESOURCE_RETRY).
get_rule_retry_interval() ->
persistent_term:get({?MODULE, rule_restart_interval}, ?T_RULE_RETRY).
async_refresh_resources_rules() ->
gen_server:cast(?MODULE, async_refresh).
ensure_resource_retrier(ResId) ->
Interval = get_retry_interval(),
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId}).
ensure_rule_retrier(RuleId) ->
gen_server:cast(?MODULE, {create_restart_handler, rule, RuleId}).
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
@ -82,12 +97,12 @@ handle_cast(async_refresh, State) ->
Pid = spawn_link(fun do_async_refresh/0),
{noreply, State#{boot_refresh_pid => Pid}};
handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
handle_cast({create_restart_handler, Tag, Obj}, State) ->
Objects = maps:get(Tag, State, #{}),
NewState = case maps:find(Obj, Objects) of
error ->
update_object(Tag, Obj,
create_restart_handler(Tag, Obj, Interval), State);
create_restart_handler(Tag, Obj), State);
{ok, _Pid} ->
State
end,
@ -130,13 +145,17 @@ update_object(Tag, Obj, Retryer, State) ->
retryers => Retryers#{Retryer => {Tag, Obj}}
}.
create_restart_handler(Tag, Obj, Interval) ->
?LOG(info, "starting_a_retry_loop for ~p ~p, with delay interval: ~p", [Tag, Obj, Interval]),
create_restart_handler(Tag, Obj) ->
?LOG(warning, "starting_a_retry_loop for ~p ~p", [Tag, Obj]),
%% spawn a dedicated process to handle the restarting asynchronously
spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
spawn_link(?MODULE, retry_loop, [Tag, Obj]).
retry_loop(resource, ResId, Interval) ->
timer:sleep(Interval),
%% retry_loop/3 is to avoid crashes during relup
retry_loop(Tag, ResId, _Interval) ->
retry_loop(Tag, ResId).
retry_loop(resource, ResId) ->
timer:sleep(get_resource_retry_interval()),
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = Type, config = Config}} ->
try
@ -154,10 +173,30 @@ retry_loop(resource, ResId, Interval) ->
end,
?LOG_SENSITIVE(warning, "init_resource_retry_failed ~p, ~0p", [ResId, LogContext]),
%% keep looping
?MODULE:retry_loop(resource, ResId, Interval)
?MODULE:retry_loop(resource, ResId)
end;
not_found ->
ok
end;
retry_loop(rule, RuleId) ->
timer:sleep(get_rule_retry_interval()),
case emqx_rule_registry:get_rule(RuleId) of
{ok, #rule{enabled = false, state = refresh_failed_at_bootup} = Rule} ->
try
emqx_rule_engine:refresh_rule(Rule),
emqx_rule_registry:add_rule(Rule#rule{enabled = true, state = normal}),
?LOG(warning, "rule ~s has been refreshed and re-enabled", [RuleId])
catch
Err:Reason:ST ->
?LOG(warning, "init_rule failed: ~p, ~0p",
[{Err, Reason}, ST]),
?MODULE:retry_loop(rule, RuleId)
end;
{ok, #rule{enabled = false, state = State}} when State =/= refresh_failed_at_bootup ->
?LOG(warning, "rule ~s was disabled by the user, won't re-enable it", [RuleId]);
_ ->
ok
end.
do_async_refresh() ->
@ -171,6 +210,6 @@ refresh_and_enable_rules_of_resource(ResId) ->
fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) ->
emqx_rule_engine:refresh_rule(Rule),
emqx_rule_registry:add_rule(Rule#rule{enabled = true, state = normal}),
?LOG(info, "rule ~s is refreshed and re-enabled", [Id]);
?LOG(warning, "rule ~s is refreshed and re-enabled", [Id]);
(_) -> ok
end, emqx_rule_registry:find_rules_depends_on_resource(ResId)).

View File

@ -48,7 +48,7 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(t_restart_resource, Config) ->
emqx_rule_monitor:put_retry_interval(100),
emqx_rule_monitor:put_resource_retry_interval(100),
Opts = [public, named_table, set, {read_concurrency, true}],
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
ets:new(t_restart_resource, [named_table, public]),
@ -95,7 +95,7 @@ common_init_per_testcase() ->
common_end_per_testcases() ->
ok = emqx_alarm:stop(),
emqx_rule_monitor:erase_retry_interval(),
emqx_rule_monitor:erase_resource_retry_interval(),
emqx_rule_monitor:stop().
t_restart_resource(_) ->