From 006d2e5f29b7ef6490392e5ac7ad255ae077b441 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 24 Oct 2022 10:29:15 +0800 Subject: [PATCH] fix: rolling upgrade failed on undef funcs --- apps/emqx_rule_engine/include/rule_engine.hrl | 35 ++++- .../src/emqx_rule_engine.appup.src | 126 ++++++++++++++++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 15 ++- .../src/emqx_rule_monitor.erl | 41 ++++-- .../test/emqx_rule_monitor_SUITE.erl | 4 +- changes/v4.3.22-en.md | 5 +- changes/v4.3.22-zh.md | 5 +- 7 files changed, 205 insertions(+), 26 deletions(-) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index c6d8fb2b4..234e11c3f 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -176,10 +176,12 @@ end end()). +-define(RPC_TIMEOUT, 30000). + -define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). -define(CLUSTER_CALL(Func, Args, ResParttern), - fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of + fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of {ResL, []} -> case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of [] -> ResL; @@ -192,6 +194,37 @@ throw({Func, {failed_on_nodes, BadNodes}}) end end()). +%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined +-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs), + fun() -> + RNodes = ekka_mnesia:running_nodes(), + ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT), + Res = lists:zip(RNodes, ResL), + BadRes = lists:filtermap(fun + ({_Node, {ok, ResParttern}}) -> + false; + ({Node, {error, {exception, undef, _}}}) -> + try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of + ResParttern -> + false; + OtherRes -> + {true, #{rpc_type => call, func => FallbackFunc, + result => OtherRes, node => Node}} + catch + Err:Reason -> + {true, #{rpc_type => call, func => FallbackFunc, + exception => {Err, Reason}, node => Node}} + end; + ({Node, OtherRes}) -> + {true, #{rpc_type => multicall, func => FallbackFunc, + result => OtherRes, node => Node}} + end, Res), + case BadRes of + [] -> Res; + _ -> throw(BadRes) + end + end()). + %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 01c0ec68d..db60b666c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -3,6 +3,15 @@ {VSN, [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -11,6 +20,14 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -20,6 +37,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -30,6 +54,11 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -42,6 +71,11 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -54,6 +88,10 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -67,6 +105,10 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -80,6 +122,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -96,6 +141,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -113,6 +160,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -130,6 +179,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -147,6 +198,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -164,6 +217,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -181,6 +236,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -198,6 +255,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -216,6 +275,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -234,6 +295,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -253,6 +316,15 @@ {<<".*">>,[]}], [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -261,6 +333,14 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -270,6 +350,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -280,6 +367,11 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -292,6 +384,11 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -304,6 +401,10 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -317,6 +418,10 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -330,6 +435,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -346,6 +454,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -363,6 +473,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -380,6 +492,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -397,6 +511,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -414,6 +530,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -431,6 +549,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -448,6 +568,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -466,6 +588,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -484,6 +608,8 @@ {delete_module,emqx_rule_date}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f3226858e..70e9e940a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -24,7 +24,7 @@ , refresh_resources/0 , refresh_resource/1 , refresh_rule/1 - , refresh_rules/0 + , refresh_rules_when_boot/0 , refresh_actions/1 , refresh_actions/2 , refresh_resource_status/0 @@ -256,15 +256,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> created_at = erlang:system_time(millisecond) }, ok = emqx_rule_registry:add_resource(Resource), + InitArgs = [M, F, ResId, Config], case Retry of with_retry -> %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - _ = (catch (?CLUSTER_CALL(init_resource_with_retrier, [M, F, ResId, Config]))), + _ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok, + init_resource, InitArgs) + catch throw : Reason -> + ?LOG(error, "create_resource failed: ~0p", [Reason]) + end, {ok, Resource}; no_retry -> try - _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), + _ = ?CLUSTER_CALL(init_resource, InitArgs), {ok, Resource} catch throw : Reason -> {error, Reason} @@ -481,8 +486,8 @@ refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> emqx_rule_registry:find_resource_type(Type), ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config). --spec(refresh_rules() -> ok). -refresh_rules() -> +-spec(refresh_rules_when_boot() -> ok). +refresh_rules_when_boot() -> lists:foreach(fun (#rule{enabled = true} = Rule) -> try refresh_rule(Rule) diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index 4e16af5b6..8af8aa7ff 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -33,7 +33,7 @@ , stop/0 , async_refresh_resources_rules/0 , ensure_resource_retrier/2 - , handler/3 + , retry_loop/3 ]). start_link() -> @@ -47,20 +47,27 @@ init([]) -> {ok, #{retryers => #{}}}. async_refresh_resources_rules() -> - gen_server:cast(?MODULE, {create_handler, refresh, resources_and_rules, #{}}). + gen_server:cast(?MODULE, async_refresh). ensure_resource_retrier(ResId, Interval) -> - gen_server:cast(?MODULE, {create_handler, resource, ResId, Interval}). + gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). handle_call(_Msg, _From, State) -> {reply, ok, State}. -handle_cast({create_handler, Tag, Obj, Args}, State) -> +handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) -> + %% the refresh task is already in progress, we discard the duplication + {noreply, State}; +handle_cast(async_refresh, State) -> + Pid = spawn_link(fun do_async_refresh/0), + {noreply, State#{boot_refresh_pid => Pid}}; + +handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> Objects = maps:get(Tag, State, #{}), NewState = case maps:find(Obj, Objects) of error -> update_object(Tag, Obj, - create_handler(Tag, Obj, Args), State); + create_restart_handler(Tag, Obj, Interval), State); {ok, _Pid} -> State end, @@ -69,7 +76,13 @@ handle_cast({create_handler, Tag, Obj, Args}, State) -> handle_cast(_Msg, State) -> {noreply, State}. + +handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) -> + {noreply, State#{boot_refresh_pid => undefined}}; handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> + %% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'. + %% Instead we rely on the user to trigger a manual retry for the resources, and then enable + %% the rules after resources are connected. case maps:take(Pid, Retryers) of {{Tag, Obj}, Retryers2} -> Objects = maps:get(Tag, State, #{}), @@ -97,12 +110,12 @@ update_object(Tag, Obj, Retryer, State) -> retryers => Retryers#{Retryer => {Tag, Obj}} }. -create_handler(Tag, Obj, Args) -> - ?LOG(info, "create monitor handler for ~p ~p, args: ~p", [Tag, Obj, Args]), - %% spawn a dedicated process to handle the task asynchronously - spawn_link(?MODULE, handler, [Tag, Obj, Args]). +create_restart_handler(Tag, Obj, Interval) -> + ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), + %% spawn a dedicated process to handle the restarting asynchronously + spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). -handler(resource, ResId, Interval) -> +retry_loop(resource, ResId, Interval) -> case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = Type, config = Config}} -> try @@ -115,17 +128,17 @@ handler(resource, ResId, Interval) -> ?LOG(warning, "init_resource failed: ~p, ~0p", [{Err, Reason}, ST]), timer:sleep(Interval), - ?MODULE:handler(resource, ResId, Interval) + ?MODULE:retry_loop(resource, ResId, Interval) end; not_found -> ok - end; + end. -handler(refresh, resources_and_rules, _) -> +do_async_refresh() -> %% NOTE: the order matters. %% We should always refresh the resources first and then the rules. ok = emqx_rule_engine:refresh_resources(), - ok = emqx_rule_engine:refresh_rules(). + ok = emqx_rule_engine:refresh_rules_when_boot(). refresh_and_enable_rules_of_resource(ResId) -> lists:foreach( diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index d996c7f73..6389c1a2e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -65,7 +65,7 @@ init_per_testcase(t_refresh_resources_rules, Config) -> ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}), ok end), - meck:expect(emqx_rule_engine, refresh_rules, fun() -> + meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() -> timer:sleep(500), ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}), ok @@ -128,7 +128,7 @@ t_refresh_resources_rules(_) -> ok = emqx_rule_monitor:async_refresh_resources_rules(), ok = emqx_rule_monitor:async_refresh_resources_rules(), %% there should be only one refresh handler at the same time - ?assertMatch(#{retryers := Pids} when map_size(Pids) =:= 1, sys:get_state(whereis(emqx_rule_monitor))), + ?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))), timer:sleep(1200), ?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)), ?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)), diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index bb6147e5a..b52ff4532 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -1,7 +1,8 @@ ### Enhancements -- Asynchronously refresh the resources and rules when emqx bootup. [#9199](https://github.com/emqx/emqx/pull/9199). - This is to avoid blocking the start of the emqx_rule_engine app if some of the resources spend too long time on establishing the connection. +- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). + This is to avoid slowing down the boot if some resources spend long time establishing the connection. + - Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124). This is to make the ACL deny logging for subscription behave the same as for publish. diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index b40843dff..63f164aef 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -1,7 +1,8 @@ ### 增强 -- 在 emqx 启动时,异步地刷新资源和规则。[#9199](https://github.com/emqx/emqx/pull/9199). - 这个改动是为了避免因为一些资源连接建立过慢,从而阻塞 emqx_rule_engine 的启动。 +- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 + 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。 + - 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。 该行为的改变主要是为了跟发布失败时的行为保持一致。