From 2c61f92eec29797e769cd2ebbe506e8db56bb85e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 21 Oct 2022 09:05:04 +0800 Subject: [PATCH 1/3] fix: refresh resources and rules asynchronously --- .../src/emqx_rule_engine.appup.src | 34 ++++++++++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 34 +++++++++--- .../src/emqx_rule_engine_app.erl | 3 +- .../src/emqx_rule_monitor.erl | 32 +++++++---- .../test/emqx_rule_monitor_SUITE.erl | 55 +++++++++++++++++-- 5 files changed, 130 insertions(+), 28 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index cbd15a5eb..9674c8c67 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -5,6 +5,7 @@ [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -12,6 +13,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -20,6 +22,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -30,6 +33,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -40,6 +44,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -51,6 +56,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -62,6 +68,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -76,6 +83,7 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -91,6 +99,7 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -106,6 +115,7 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -121,6 +131,7 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -136,6 +147,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -151,6 +163,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -166,6 +179,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -182,6 +196,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -198,6 +213,7 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -214,12 +230,14 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -227,6 +245,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -235,6 +254,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -245,6 +265,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -255,6 +276,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -266,6 +288,7 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -277,6 +300,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -291,6 +315,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -306,6 +331,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -321,6 +347,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -336,6 +363,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -351,6 +379,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -366,6 +395,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -381,6 +411,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -397,6 +428,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -413,6 +445,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -429,5 +462,6 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index ac0ccaf4f..f3226858e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -47,6 +47,7 @@ ]). -export([ init_resource/4 + , init_resource_with_retrier/4 , init_action/4 , clear_resource/4 , clear_rule/1 @@ -259,7 +260,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> with_retry -> %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - _ = (catch (?CLUSTER_CALL(init_resource, [M, F, ResId, Config]))), + _ = (catch (?CLUSTER_CALL(init_resource_with_retrier, [M, F, ResId, Config]))), {ok, Resource}; no_retry -> try @@ -327,7 +328,7 @@ start_resource(ResId) -> {ok, #resource_type{on_create = {Mod, Create}}} = emqx_rule_registry:find_resource_type(ResType), try - init_resource(Mod, Create, ResId, Config), + init_resource_with_retrier(Mod, Create, ResId, Config), refresh_actions_of_a_resource(ResId) catch throw:Reason -> {error, Reason} @@ -476,13 +477,9 @@ refresh_resource(Type) when is_atom(Type) -> emqx_rule_registry:get_resources_by_type(Type)); refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> - try - {ok, #resource_type{on_create = {M, F}}} = - emqx_rule_registry:find_resource_type(Type), - ok = emqx_rule_engine:init_resource(M, F, ResId, Config) - catch _:_ -> - emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY) - end. + {ok, #resource_type{on_create = {M, F}}} = + emqx_rule_registry:find_resource_type(Type), + ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config). -spec(refresh_rules() -> ok). refresh_rules() -> @@ -490,6 +487,12 @@ refresh_rules() -> (#rule{enabled = true} = Rule) -> try refresh_rule(Rule) catch _:_ -> + %% We set the enable = false when rule init failed to avoid bad rules running + %% without actions created properly. + %% The init failure might be caused by a disconnected resource, in this case the + %% actions can not be created, so the rules won't work. + %% After the user fixed the problem he can enable it manually, + %% doing so will also recreate the actions. emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) end; (_) -> ok @@ -655,6 +658,19 @@ init_resource(Module, OnCreate, ResId, Config) -> status = #{is_alive => true}}, emqx_rule_registry:add_resource_params(ResParams). +init_resource_with_retrier(Module, OnCreate, ResId, Config) -> + try + Params = Module:OnCreate(ResId, Config), + ResParams = #resource_params{id = ResId, + params = Params, + status = #{is_alive => true}}, + emqx_rule_registry:add_resource_params(ResParams) + catch Class:Reason:ST -> + Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY), + emqx_rule_monitor:ensure_resource_retrier(ResId, Interval), + erlang:raise(Class, {init_resource, Reason}, ST) + end. + init_action(Module, OnCreate, ActionInstId, Params) -> ok = emqx_rule_metrics:create_metrics(ActionInstId), case ?RAISE(Module:OnCreate(ActionInstId, Params), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 202c21d1a..217d2bf2b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -28,8 +28,7 @@ start(_Type, _Args) -> {ok, Sup} = emqx_rule_engine_sup:start_link(), _ = emqx_rule_engine_sup:start_locker(), ok = emqx_rule_engine:load_providers(), - ok = emqx_rule_engine:refresh_resources(), - ok = emqx_rule_engine:refresh_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), ok = emqx_rule_engine_cli:load(), {ok, Sup}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index afff14c40..4e16af5b6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -31,8 +31,9 @@ -export([ start_link/0 , stop/0 + , async_refresh_resources_rules/0 , ensure_resource_retrier/2 - , retry_loop/3 + , handler/3 ]). start_link() -> @@ -45,18 +46,21 @@ init([]) -> _ = erlang:process_flag(trap_exit, true), {ok, #{retryers => #{}}}. +async_refresh_resources_rules() -> + gen_server:cast(?MODULE, {create_handler, refresh, resources_and_rules, #{}}). + ensure_resource_retrier(ResId, Interval) -> - gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). + gen_server:cast(?MODULE, {create_handler, resource, ResId, Interval}). handle_call(_Msg, _From, State) -> {reply, ok, State}. -handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> +handle_cast({create_handler, Tag, Obj, Args}, State) -> Objects = maps:get(Tag, State, #{}), NewState = case maps:find(Obj, Objects) of error -> update_object(Tag, Obj, - create_restart_handler(Tag, Obj, Interval), State); + create_handler(Tag, Obj, Args), State); {ok, _Pid} -> State end, @@ -93,12 +97,12 @@ update_object(Tag, Obj, Retryer, State) -> retryers => Retryers#{Retryer => {Tag, Obj}} }. -create_restart_handler(Tag, Obj, Interval) -> - ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), - %% spawn a dedicated process to handle the restarting asynchronously - spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). +create_handler(Tag, Obj, Args) -> + ?LOG(info, "create monitor handler for ~p ~p, args: ~p", [Tag, Obj, Args]), + %% spawn a dedicated process to handle the task asynchronously + spawn_link(?MODULE, handler, [Tag, Obj, Args]). -retry_loop(resource, ResId, Interval) -> +handler(resource, ResId, Interval) -> case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = Type, config = Config}} -> try @@ -111,11 +115,17 @@ retry_loop(resource, ResId, Interval) -> ?LOG(warning, "init_resource failed: ~p, ~0p", [{Err, Reason}, ST]), timer:sleep(Interval), - ?MODULE:retry_loop(resource, ResId, Interval) + ?MODULE:handler(resource, ResId, Interval) end; not_found -> ok - end. + end; + +handler(refresh, resources_and_rules, _) -> + %% NOTE: the order matters. + %% We should always refresh the resources first and then the rules. + ok = emqx_rule_engine:refresh_resources(), + ok = emqx_rule_engine:refresh_rules(). refresh_and_enable_rules_of_resource(ResId) -> lists:foreach( diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 121970342..d996c7f73 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -35,6 +35,7 @@ suite() -> groups() -> [{resource, [sequence], [ t_restart_resource + , t_refresh_resources_rules ]} ]. @@ -47,24 +48,53 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100), Opts = [public, named_table, set, {read_concurrency, true}], _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), ets:new(t_restart_resource, [named_table, public]), ets:insert(t_restart_resource, {failed_count, 0}), ets:insert(t_restart_resource, {succ_count, 0}), + common_init_per_testcase(), + Config; +init_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + ets:new(t_refresh_resources_rules, [named_table, public]), + ok = meck:new(emqx_rule_engine, [no_link, passthrough]), + meck:expect(emqx_rule_engine, refresh_resources, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}), + ok + end), + meck:expect(emqx_rule_engine, refresh_rules, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}), + ok + end), + common_init_per_testcase(), Config; - init_per_testcase(_, Config) -> + common_init_per_testcase(), Config. end_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000), ets:delete(t_restart_resource), + common_end_per_testcases(), + Config; +end_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + common_end_per_testcases(), Config; end_per_testcase(_, Config) -> + common_end_per_testcases(), Config. +common_init_per_testcase() -> + {ok, _} = emqx_rule_monitor:start_link(). +common_end_per_testcases() -> + emqx_rule_monitor:stop(). + t_restart_resource(_) -> - {ok, _} = emqx_rule_monitor:start_link(), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1, @@ -79,11 +109,12 @@ t_restart_resource(_) -> {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( #{type => test_res_1, config => #{}, + restart_interval => 100, description => <<"debug resource">>}), - [{_, 1}] = ets:lookup(t_restart_resource, failed_count), - [{_, 0}] = ets:lookup(t_restart_resource, succ_count), + ?assertMatch([{_, 0}], ets:lookup(t_restart_resource, succ_count)), + ?assertMatch([{_, N}] when N == 1 orelse N == 2 orelse N == 3, + ets:lookup(t_restart_resource, failed_count)), ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), - emqx_rule_monitor:ensure_resource_retrier(ResId, 100), timer:sleep(1000), [{_, 5}] = ets:lookup(t_restart_resource, failed_count), [{_, 1}] = ets:lookup(t_restart_resource, succ_count), @@ -91,9 +122,21 @@ t_restart_resource(_) -> ?assertEqual(0, map_size(Pids)), ok = emqx_rule_engine:unload_providers(), emqx_rule_registry:remove_resource(ResId), - emqx_rule_monitor:stop(), ok. +t_refresh_resources_rules(_) -> + ok = emqx_rule_monitor:async_refresh_resources_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + %% there should be only one refresh handler at the same time + ?assertMatch(#{retryers := Pids} when map_size(Pids) =:= 1, sys:get_state(whereis(emqx_rule_monitor))), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 2}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 2}], ets:lookup(t_refresh_resources_rules, refresh_rules)). + on_resource_create(Id, _) -> case ets:lookup(t_restart_resource, failed_count) of [{_, 5}] -> From 15248eb069d4de2e31b9cbbfa71a7320132b37b1 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 21 Oct 2022 15:32:58 +0800 Subject: [PATCH 2/3] chore: update the change log --- .../src/emqx_rule_engine.appup.src | 36 +++++++++++++++++++ changes/v4.3.22-en.md | 2 ++ changes/v4.3.22-zh.md | 2 ++ 3 files changed, 40 insertions(+) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 9674c8c67..01c0ec68d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -5,6 +5,8 @@ [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", @@ -13,6 +15,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", @@ -22,6 +25,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", @@ -33,6 +37,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", @@ -44,6 +49,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", @@ -56,6 +62,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", @@ -68,6 +75,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", @@ -83,6 +91,7 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.8", @@ -99,6 +108,7 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", @@ -115,6 +125,7 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", @@ -131,6 +142,7 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", @@ -147,6 +159,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", @@ -163,6 +176,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", @@ -179,6 +193,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -196,6 +211,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", @@ -213,6 +229,7 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", @@ -230,6 +247,7 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], @@ -237,6 +255,8 @@ [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", @@ -245,6 +265,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", @@ -254,6 +275,7 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", @@ -265,6 +287,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", @@ -276,6 +299,7 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", @@ -288,6 +312,7 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", @@ -300,6 +325,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", @@ -315,6 +341,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", @@ -331,6 +358,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", @@ -347,6 +375,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", @@ -363,6 +392,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", @@ -379,6 +409,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", @@ -395,6 +426,7 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.3", @@ -411,6 +443,7 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.2", @@ -428,6 +461,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.1", @@ -445,6 +479,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.0", @@ -462,6 +497,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {<<".*">>,[]}]}. diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index 77a197624..bb6147e5a 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -1,5 +1,7 @@ ### Enhancements +- Asynchronously refresh the resources and rules when emqx bootup. [#9199](https://github.com/emqx/emqx/pull/9199). + This is to avoid blocking the start of the emqx_rule_engine app if some of the resources spend too long time on establishing the connection. - Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124). This is to make the ACL deny logging for subscription behave the same as for publish. diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index 58c82588d..b40843dff 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -1,5 +1,7 @@ ### 增强 +- 在 emqx 启动时,异步地刷新资源和规则。[#9199](https://github.com/emqx/emqx/pull/9199). + 这个改动是为了避免因为一些资源连接建立过慢,从而阻塞 emqx_rule_engine 的启动。 - 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。 该行为的改变主要是为了跟发布失败时的行为保持一致。 From 006d2e5f29b7ef6490392e5ac7ad255ae077b441 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 24 Oct 2022 10:29:15 +0800 Subject: [PATCH 3/3] fix: rolling upgrade failed on undef funcs --- apps/emqx_rule_engine/include/rule_engine.hrl | 35 ++++- .../src/emqx_rule_engine.appup.src | 126 ++++++++++++++++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 15 ++- .../src/emqx_rule_monitor.erl | 41 ++++-- .../test/emqx_rule_monitor_SUITE.erl | 4 +- changes/v4.3.22-en.md | 5 +- changes/v4.3.22-zh.md | 5 +- 7 files changed, 205 insertions(+), 26 deletions(-) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index c6d8fb2b4..234e11c3f 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -176,10 +176,12 @@ end end()). +-define(RPC_TIMEOUT, 30000). + -define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). -define(CLUSTER_CALL(Func, Args, ResParttern), - fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of + fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of {ResL, []} -> case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of [] -> ResL; @@ -192,6 +194,37 @@ throw({Func, {failed_on_nodes, BadNodes}}) end end()). +%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined +-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs), + fun() -> + RNodes = ekka_mnesia:running_nodes(), + ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT), + Res = lists:zip(RNodes, ResL), + BadRes = lists:filtermap(fun + ({_Node, {ok, ResParttern}}) -> + false; + ({Node, {error, {exception, undef, _}}}) -> + try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of + ResParttern -> + false; + OtherRes -> + {true, #{rpc_type => call, func => FallbackFunc, + result => OtherRes, node => Node}} + catch + Err:Reason -> + {true, #{rpc_type => call, func => FallbackFunc, + exception => {Err, Reason}, node => Node}} + end; + ({Node, OtherRes}) -> + {true, #{rpc_type => multicall, func => FallbackFunc, + result => OtherRes, node => Node}} + end, Res), + case BadRes of + [] -> Res; + _ -> throw(BadRes) + end + end()). + %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 01c0ec68d..db60b666c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -3,6 +3,15 @@ {VSN, [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -11,6 +20,14 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -20,6 +37,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -30,6 +54,11 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -42,6 +71,11 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -54,6 +88,10 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -67,6 +105,10 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -80,6 +122,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -96,6 +141,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -113,6 +160,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -130,6 +179,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -147,6 +198,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -164,6 +217,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -181,6 +236,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -198,6 +255,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -216,6 +275,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -234,6 +295,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -253,6 +316,15 @@ {<<".*">>,[]}], [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -261,6 +333,14 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -270,6 +350,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -280,6 +367,11 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -292,6 +384,11 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -304,6 +401,10 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -317,6 +418,10 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -330,6 +435,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -346,6 +454,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -363,6 +473,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -380,6 +492,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -397,6 +511,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -414,6 +530,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -431,6 +549,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -448,6 +568,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -466,6 +588,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -484,6 +608,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f3226858e..70e9e940a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -24,7 +24,7 @@ , refresh_resources/0 , refresh_resource/1 , refresh_rule/1 - , refresh_rules/0 + , refresh_rules_when_boot/0 , refresh_actions/1 , refresh_actions/2 , refresh_resource_status/0 @@ -256,15 +256,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> created_at = erlang:system_time(millisecond) }, ok = emqx_rule_registry:add_resource(Resource), + InitArgs = [M, F, ResId, Config], case Retry of with_retry -> %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - _ = (catch (?CLUSTER_CALL(init_resource_with_retrier, [M, F, ResId, Config]))), + _ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok, + init_resource, InitArgs) + catch throw : Reason -> + ?LOG(error, "create_resource failed: ~0p", [Reason]) + end, {ok, Resource}; no_retry -> try - _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), + _ = ?CLUSTER_CALL(init_resource, InitArgs), {ok, Resource} catch throw : Reason -> {error, Reason} @@ -481,8 +486,8 @@ refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> emqx_rule_registry:find_resource_type(Type), ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config). --spec(refresh_rules() -> ok). -refresh_rules() -> +-spec(refresh_rules_when_boot() -> ok). +refresh_rules_when_boot() -> lists:foreach(fun (#rule{enabled = true} = Rule) -> try refresh_rule(Rule) diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index 4e16af5b6..8af8aa7ff 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -33,7 +33,7 @@ , stop/0 , async_refresh_resources_rules/0 , ensure_resource_retrier/2 - , handler/3 + , retry_loop/3 ]). start_link() -> @@ -47,20 +47,27 @@ init([]) -> {ok, #{retryers => #{}}}. async_refresh_resources_rules() -> - gen_server:cast(?MODULE, {create_handler, refresh, resources_and_rules, #{}}). + gen_server:cast(?MODULE, async_refresh). ensure_resource_retrier(ResId, Interval) -> - gen_server:cast(?MODULE, {create_handler, resource, ResId, Interval}). + gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). handle_call(_Msg, _From, State) -> {reply, ok, State}. -handle_cast({create_handler, Tag, Obj, Args}, State) -> +handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) -> + %% the refresh task is already in progress, we discard the duplication + {noreply, State}; +handle_cast(async_refresh, State) -> + Pid = spawn_link(fun do_async_refresh/0), + {noreply, State#{boot_refresh_pid => Pid}}; + +handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> Objects = maps:get(Tag, State, #{}), NewState = case maps:find(Obj, Objects) of error -> update_object(Tag, Obj, - create_handler(Tag, Obj, Args), State); + create_restart_handler(Tag, Obj, Interval), State); {ok, _Pid} -> State end, @@ -69,7 +76,13 @@ handle_cast({create_handler, Tag, Obj, Args}, State) -> handle_cast(_Msg, State) -> {noreply, State}. + +handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) -> + {noreply, State#{boot_refresh_pid => undefined}}; handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> + %% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'. + %% Instead we rely on the user to trigger a manual retry for the resources, and then enable + %% the rules after resources are connected. case maps:take(Pid, Retryers) of {{Tag, Obj}, Retryers2} -> Objects = maps:get(Tag, State, #{}), @@ -97,12 +110,12 @@ update_object(Tag, Obj, Retryer, State) -> retryers => Retryers#{Retryer => {Tag, Obj}} }. -create_handler(Tag, Obj, Args) -> - ?LOG(info, "create monitor handler for ~p ~p, args: ~p", [Tag, Obj, Args]), - %% spawn a dedicated process to handle the task asynchronously - spawn_link(?MODULE, handler, [Tag, Obj, Args]). +create_restart_handler(Tag, Obj, Interval) -> + ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), + %% spawn a dedicated process to handle the restarting asynchronously + spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). -handler(resource, ResId, Interval) -> +retry_loop(resource, ResId, Interval) -> case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = Type, config = Config}} -> try @@ -115,17 +128,17 @@ handler(resource, ResId, Interval) -> ?LOG(warning, "init_resource failed: ~p, ~0p", [{Err, Reason}, ST]), timer:sleep(Interval), - ?MODULE:handler(resource, ResId, Interval) + ?MODULE:retry_loop(resource, ResId, Interval) end; not_found -> ok - end; + end. -handler(refresh, resources_and_rules, _) -> +do_async_refresh() -> %% NOTE: the order matters. %% We should always refresh the resources first and then the rules. ok = emqx_rule_engine:refresh_resources(), - ok = emqx_rule_engine:refresh_rules(). + ok = emqx_rule_engine:refresh_rules_when_boot(). refresh_and_enable_rules_of_resource(ResId) -> lists:foreach( diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index d996c7f73..6389c1a2e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -65,7 +65,7 @@ init_per_testcase(t_refresh_resources_rules, Config) -> ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}), ok end), - meck:expect(emqx_rule_engine, refresh_rules, fun() -> + meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() -> timer:sleep(500), ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}), ok @@ -128,7 +128,7 @@ t_refresh_resources_rules(_) -> ok = emqx_rule_monitor:async_refresh_resources_rules(), ok = emqx_rule_monitor:async_refresh_resources_rules(), %% there should be only one refresh handler at the same time - ?assertMatch(#{retryers := Pids} when map_size(Pids) =:= 1, sys:get_state(whereis(emqx_rule_monitor))), + ?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))), timer:sleep(1200), ?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)), ?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)), diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index bb6147e5a..b52ff4532 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -1,7 +1,8 @@ ### Enhancements -- Asynchronously refresh the resources and rules when emqx bootup. [#9199](https://github.com/emqx/emqx/pull/9199). - This is to avoid blocking the start of the emqx_rule_engine app if some of the resources spend too long time on establishing the connection. +- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). + This is to avoid slowing down the boot if some resources spend long time establishing the connection. + - Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124). This is to make the ACL deny logging for subscription behave the same as for publish. diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index b40843dff..63f164aef 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -1,7 +1,8 @@ ### 增强 -- 在 emqx 启动时,异步地刷新资源和规则。[#9199](https://github.com/emqx/emqx/pull/9199). - 这个改动是为了避免因为一些资源连接建立过慢,从而阻塞 emqx_rule_engine 的启动。 +- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 + 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。 + - 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。 该行为的改变主要是为了跟发布失败时的行为保持一致。