diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index cbd15a5eb..9674c8c67 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -5,6 +5,7 @@ [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -12,6 +13,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -20,6 +22,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -30,6 +33,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -40,6 +44,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -51,6 +56,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -62,6 +68,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -76,6 +83,7 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -91,6 +99,7 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -106,6 +115,7 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -121,6 +131,7 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -136,6 +147,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -151,6 +163,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -166,6 +179,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -182,6 +196,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -198,6 +213,7 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -214,12 +230,14 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -227,6 +245,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -235,6 +254,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -245,6 +265,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -255,6 +276,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -266,6 +288,7 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -277,6 +300,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -291,6 +315,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -306,6 +331,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -321,6 +347,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -336,6 +363,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -351,6 +379,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -366,6 +395,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -381,6 +411,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -397,6 +428,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -413,6 +445,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -429,5 +462,6 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index ac0ccaf4f..f3226858e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -47,6 +47,7 @@ ]). -export([ init_resource/4 + , init_resource_with_retrier/4 , init_action/4 , clear_resource/4 , clear_rule/1 @@ -259,7 +260,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> with_retry -> %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - _ = (catch (?CLUSTER_CALL(init_resource, [M, F, ResId, Config]))), + _ = (catch (?CLUSTER_CALL(init_resource_with_retrier, [M, F, ResId, Config]))), {ok, Resource}; no_retry -> try @@ -327,7 +328,7 @@ start_resource(ResId) -> {ok, #resource_type{on_create = {Mod, Create}}} = emqx_rule_registry:find_resource_type(ResType), try - init_resource(Mod, Create, ResId, Config), + init_resource_with_retrier(Mod, Create, ResId, Config), refresh_actions_of_a_resource(ResId) catch throw:Reason -> {error, Reason} @@ -476,13 +477,9 @@ refresh_resource(Type) when is_atom(Type) -> emqx_rule_registry:get_resources_by_type(Type)); refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> - try - {ok, #resource_type{on_create = {M, F}}} = - emqx_rule_registry:find_resource_type(Type), - ok = emqx_rule_engine:init_resource(M, F, ResId, Config) - catch _:_ -> - emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY) - end. + {ok, #resource_type{on_create = {M, F}}} = + emqx_rule_registry:find_resource_type(Type), + ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config). -spec(refresh_rules() -> ok). refresh_rules() -> @@ -490,6 +487,12 @@ refresh_rules() -> (#rule{enabled = true} = Rule) -> try refresh_rule(Rule) catch _:_ -> + %% We set the enable = false when rule init failed to avoid bad rules running + %% without actions created properly. + %% The init failure might be caused by a disconnected resource, in this case the + %% actions can not be created, so the rules won't work. + %% After the user fixed the problem he can enable it manually, + %% doing so will also recreate the actions. emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) end; (_) -> ok @@ -655,6 +658,19 @@ init_resource(Module, OnCreate, ResId, Config) -> status = #{is_alive => true}}, emqx_rule_registry:add_resource_params(ResParams). +init_resource_with_retrier(Module, OnCreate, ResId, Config) -> + try + Params = Module:OnCreate(ResId, Config), + ResParams = #resource_params{id = ResId, + params = Params, + status = #{is_alive => true}}, + emqx_rule_registry:add_resource_params(ResParams) + catch Class:Reason:ST -> + Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY), + emqx_rule_monitor:ensure_resource_retrier(ResId, Interval), + erlang:raise(Class, {init_resource, Reason}, ST) + end. + init_action(Module, OnCreate, ActionInstId, Params) -> ok = emqx_rule_metrics:create_metrics(ActionInstId), case ?RAISE(Module:OnCreate(ActionInstId, Params), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 202c21d1a..217d2bf2b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -28,8 +28,7 @@ start(_Type, _Args) -> {ok, Sup} = emqx_rule_engine_sup:start_link(), _ = emqx_rule_engine_sup:start_locker(), ok = emqx_rule_engine:load_providers(), - ok = emqx_rule_engine:refresh_resources(), - ok = emqx_rule_engine:refresh_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), ok = emqx_rule_engine_cli:load(), {ok, Sup}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index afff14c40..4e16af5b6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -31,8 +31,9 @@ -export([ start_link/0 , stop/0 + , async_refresh_resources_rules/0 , ensure_resource_retrier/2 - , retry_loop/3 + , handler/3 ]). start_link() -> @@ -45,18 +46,21 @@ init([]) -> _ = erlang:process_flag(trap_exit, true), {ok, #{retryers => #{}}}. +async_refresh_resources_rules() -> + gen_server:cast(?MODULE, {create_handler, refresh, resources_and_rules, #{}}). + ensure_resource_retrier(ResId, Interval) -> - gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). + gen_server:cast(?MODULE, {create_handler, resource, ResId, Interval}). handle_call(_Msg, _From, State) -> {reply, ok, State}. -handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> +handle_cast({create_handler, Tag, Obj, Args}, State) -> Objects = maps:get(Tag, State, #{}), NewState = case maps:find(Obj, Objects) of error -> update_object(Tag, Obj, - create_restart_handler(Tag, Obj, Interval), State); + create_handler(Tag, Obj, Args), State); {ok, _Pid} -> State end, @@ -93,12 +97,12 @@ update_object(Tag, Obj, Retryer, State) -> retryers => Retryers#{Retryer => {Tag, Obj}} }. -create_restart_handler(Tag, Obj, Interval) -> - ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), - %% spawn a dedicated process to handle the restarting asynchronously - spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). +create_handler(Tag, Obj, Args) -> + ?LOG(info, "create monitor handler for ~p ~p, args: ~p", [Tag, Obj, Args]), + %% spawn a dedicated process to handle the task asynchronously + spawn_link(?MODULE, handler, [Tag, Obj, Args]). -retry_loop(resource, ResId, Interval) -> +handler(resource, ResId, Interval) -> case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = Type, config = Config}} -> try @@ -111,11 +115,17 @@ retry_loop(resource, ResId, Interval) -> ?LOG(warning, "init_resource failed: ~p, ~0p", [{Err, Reason}, ST]), timer:sleep(Interval), - ?MODULE:retry_loop(resource, ResId, Interval) + ?MODULE:handler(resource, ResId, Interval) end; not_found -> ok - end. + end; + +handler(refresh, resources_and_rules, _) -> + %% NOTE: the order matters. + %% We should always refresh the resources first and then the rules. + ok = emqx_rule_engine:refresh_resources(), + ok = emqx_rule_engine:refresh_rules(). refresh_and_enable_rules_of_resource(ResId) -> lists:foreach( diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 121970342..d996c7f73 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -35,6 +35,7 @@ suite() -> groups() -> [{resource, [sequence], [ t_restart_resource + , t_refresh_resources_rules ]} ]. @@ -47,24 +48,53 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100), Opts = [public, named_table, set, {read_concurrency, true}], _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), ets:new(t_restart_resource, [named_table, public]), ets:insert(t_restart_resource, {failed_count, 0}), ets:insert(t_restart_resource, {succ_count, 0}), + common_init_per_testcase(), + Config; +init_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + ets:new(t_refresh_resources_rules, [named_table, public]), + ok = meck:new(emqx_rule_engine, [no_link, passthrough]), + meck:expect(emqx_rule_engine, refresh_resources, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}), + ok + end), + meck:expect(emqx_rule_engine, refresh_rules, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}), + ok + end), + common_init_per_testcase(), Config; - init_per_testcase(_, Config) -> + common_init_per_testcase(), Config. end_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000), ets:delete(t_restart_resource), + common_end_per_testcases(), + Config; +end_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + common_end_per_testcases(), Config; end_per_testcase(_, Config) -> + common_end_per_testcases(), Config. +common_init_per_testcase() -> + {ok, _} = emqx_rule_monitor:start_link(). +common_end_per_testcases() -> + emqx_rule_monitor:stop(). + t_restart_resource(_) -> - {ok, _} = emqx_rule_monitor:start_link(), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1, @@ -79,11 +109,12 @@ t_restart_resource(_) -> {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( #{type => test_res_1, config => #{}, + restart_interval => 100, description => <<"debug resource">>}), - [{_, 1}] = ets:lookup(t_restart_resource, failed_count), - [{_, 0}] = ets:lookup(t_restart_resource, succ_count), + ?assertMatch([{_, 0}], ets:lookup(t_restart_resource, succ_count)), + ?assertMatch([{_, N}] when N == 1 orelse N == 2 orelse N == 3, + ets:lookup(t_restart_resource, failed_count)), ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), - emqx_rule_monitor:ensure_resource_retrier(ResId, 100), timer:sleep(1000), [{_, 5}] = ets:lookup(t_restart_resource, failed_count), [{_, 1}] = ets:lookup(t_restart_resource, succ_count), @@ -91,9 +122,21 @@ t_restart_resource(_) -> ?assertEqual(0, map_size(Pids)), ok = emqx_rule_engine:unload_providers(), emqx_rule_registry:remove_resource(ResId), - emqx_rule_monitor:stop(), ok. +t_refresh_resources_rules(_) -> + ok = emqx_rule_monitor:async_refresh_resources_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + %% there should be only one refresh handler at the same time + ?assertMatch(#{retryers := Pids} when map_size(Pids) =:= 1, sys:get_state(whereis(emqx_rule_monitor))), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 2}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 2}], ets:lookup(t_refresh_resources_rules, refresh_rules)). + on_resource_create(Id, _) -> case ets:lookup(t_restart_resource, failed_count) of [{_, 5}] ->