diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index c6d8fb2b4..234e11c3f 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -176,10 +176,12 @@ end end()). +-define(RPC_TIMEOUT, 30000). + -define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). -define(CLUSTER_CALL(Func, Args, ResParttern), - fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of + fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of {ResL, []} -> case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of [] -> ResL; @@ -192,6 +194,37 @@ throw({Func, {failed_on_nodes, BadNodes}}) end end()). +%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined +-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs), + fun() -> + RNodes = ekka_mnesia:running_nodes(), + ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT), + Res = lists:zip(RNodes, ResL), + BadRes = lists:filtermap(fun + ({_Node, {ok, ResParttern}}) -> + false; + ({Node, {error, {exception, undef, _}}}) -> + try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of + ResParttern -> + false; + OtherRes -> + {true, #{rpc_type => call, func => FallbackFunc, + result => OtherRes, node => Node}} + catch + Err:Reason -> + {true, #{rpc_type => call, func => FallbackFunc, + exception => {Err, Reason}, node => Node}} + end; + ({Node, OtherRes}) -> + {true, #{rpc_type => multicall, func => FallbackFunc, + result => OtherRes, node => Node}} + end, Res), + case BadRes of + [] -> Res; + _ -> throw(BadRes) + end + end()). + %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index cbd15a5eb..db60b666c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -3,26 +3,62 @@ {VSN, [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -30,9 +66,16 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -40,9 +83,15 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -51,9 +100,15 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -62,9 +117,14 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -76,9 +136,13 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -91,9 +155,13 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -106,9 +174,13 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -121,9 +193,13 @@ {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -136,9 +212,13 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -151,9 +231,13 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -166,9 +250,13 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -182,9 +270,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -198,9 +290,13 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -214,30 +310,68 @@ {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.16", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.15", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.14", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.3.13", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -245,9 +379,16 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -255,9 +396,15 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -266,9 +413,15 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -277,9 +430,14 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -291,9 +449,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -306,9 +468,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -321,9 +487,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -336,9 +506,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -351,9 +525,13 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -366,9 +544,13 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.3", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -381,9 +563,13 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.2", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -397,9 +583,13 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.1", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -413,9 +603,13 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.0", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -429,5 +623,7 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index ac0ccaf4f..70e9e940a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -24,7 +24,7 @@ , refresh_resources/0 , refresh_resource/1 , refresh_rule/1 - , refresh_rules/0 + , refresh_rules_when_boot/0 , refresh_actions/1 , refresh_actions/2 , refresh_resource_status/0 @@ -47,6 +47,7 @@ ]). -export([ init_resource/4 + , init_resource_with_retrier/4 , init_action/4 , clear_resource/4 , clear_rule/1 @@ -255,15 +256,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> created_at = erlang:system_time(millisecond) }, ok = emqx_rule_registry:add_resource(Resource), + InitArgs = [M, F, ResId, Config], case Retry of with_retry -> %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - _ = (catch (?CLUSTER_CALL(init_resource, [M, F, ResId, Config]))), + _ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok, + init_resource, InitArgs) + catch throw : Reason -> + ?LOG(error, "create_resource failed: ~0p", [Reason]) + end, {ok, Resource}; no_retry -> try - _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), + _ = ?CLUSTER_CALL(init_resource, InitArgs), {ok, Resource} catch throw : Reason -> {error, Reason} @@ -327,7 +333,7 @@ start_resource(ResId) -> {ok, #resource_type{on_create = {Mod, Create}}} = emqx_rule_registry:find_resource_type(ResType), try - init_resource(Mod, Create, ResId, Config), + init_resource_with_retrier(Mod, Create, ResId, Config), refresh_actions_of_a_resource(ResId) catch throw:Reason -> {error, Reason} @@ -476,20 +482,22 @@ refresh_resource(Type) when is_atom(Type) -> emqx_rule_registry:get_resources_by_type(Type)); refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> - try - {ok, #resource_type{on_create = {M, F}}} = - emqx_rule_registry:find_resource_type(Type), - ok = emqx_rule_engine:init_resource(M, F, ResId, Config) - catch _:_ -> - emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY) - end. + {ok, #resource_type{on_create = {M, F}}} = + emqx_rule_registry:find_resource_type(Type), + ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config). --spec(refresh_rules() -> ok). -refresh_rules() -> +-spec(refresh_rules_when_boot() -> ok). +refresh_rules_when_boot() -> lists:foreach(fun (#rule{enabled = true} = Rule) -> try refresh_rule(Rule) catch _:_ -> + %% We set the enable = false when rule init failed to avoid bad rules running + %% without actions created properly. + %% The init failure might be caused by a disconnected resource, in this case the + %% actions can not be created, so the rules won't work. + %% After the user fixed the problem he can enable it manually, + %% doing so will also recreate the actions. emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) end; (_) -> ok @@ -655,6 +663,19 @@ init_resource(Module, OnCreate, ResId, Config) -> status = #{is_alive => true}}, emqx_rule_registry:add_resource_params(ResParams). +init_resource_with_retrier(Module, OnCreate, ResId, Config) -> + try + Params = Module:OnCreate(ResId, Config), + ResParams = #resource_params{id = ResId, + params = Params, + status = #{is_alive => true}}, + emqx_rule_registry:add_resource_params(ResParams) + catch Class:Reason:ST -> + Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY), + emqx_rule_monitor:ensure_resource_retrier(ResId, Interval), + erlang:raise(Class, {init_resource, Reason}, ST) + end. + init_action(Module, OnCreate, ActionInstId, Params) -> ok = emqx_rule_metrics:create_metrics(ActionInstId), case ?RAISE(Module:OnCreate(ActionInstId, Params), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 202c21d1a..217d2bf2b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -28,8 +28,7 @@ start(_Type, _Args) -> {ok, Sup} = emqx_rule_engine_sup:start_link(), _ = emqx_rule_engine_sup:start_locker(), ok = emqx_rule_engine:load_providers(), - ok = emqx_rule_engine:refresh_resources(), - ok = emqx_rule_engine:refresh_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), ok = emqx_rule_engine_cli:load(), {ok, Sup}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index afff14c40..8af8aa7ff 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -31,6 +31,7 @@ -export([ start_link/0 , stop/0 + , async_refresh_resources_rules/0 , ensure_resource_retrier/2 , retry_loop/3 ]). @@ -45,12 +46,22 @@ init([]) -> _ = erlang:process_flag(trap_exit, true), {ok, #{retryers => #{}}}. +async_refresh_resources_rules() -> + gen_server:cast(?MODULE, async_refresh). + ensure_resource_retrier(ResId, Interval) -> gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). handle_call(_Msg, _From, State) -> {reply, ok, State}. +handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) -> + %% the refresh task is already in progress, we discard the duplication + {noreply, State}; +handle_cast(async_refresh, State) -> + Pid = spawn_link(fun do_async_refresh/0), + {noreply, State#{boot_refresh_pid => Pid}}; + handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> Objects = maps:get(Tag, State, #{}), NewState = case maps:find(Obj, Objects) of @@ -65,7 +76,13 @@ handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> handle_cast(_Msg, State) -> {noreply, State}. + +handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) -> + {noreply, State#{boot_refresh_pid => undefined}}; handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> + %% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'. + %% Instead we rely on the user to trigger a manual retry for the resources, and then enable + %% the rules after resources are connected. case maps:take(Pid, Retryers) of {{Tag, Obj}, Retryers2} -> Objects = maps:get(Tag, State, #{}), @@ -117,6 +134,12 @@ retry_loop(resource, ResId, Interval) -> ok end. +do_async_refresh() -> + %% NOTE: the order matters. + %% We should always refresh the resources first and then the rules. + ok = emqx_rule_engine:refresh_resources(), + ok = emqx_rule_engine:refresh_rules_when_boot(). + refresh_and_enable_rules_of_resource(ResId) -> lists:foreach( fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 121970342..6389c1a2e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -35,6 +35,7 @@ suite() -> groups() -> [{resource, [sequence], [ t_restart_resource + , t_refresh_resources_rules ]} ]. @@ -47,24 +48,53 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100), Opts = [public, named_table, set, {read_concurrency, true}], _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), ets:new(t_restart_resource, [named_table, public]), ets:insert(t_restart_resource, {failed_count, 0}), ets:insert(t_restart_resource, {succ_count, 0}), + common_init_per_testcase(), + Config; +init_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + ets:new(t_refresh_resources_rules, [named_table, public]), + ok = meck:new(emqx_rule_engine, [no_link, passthrough]), + meck:expect(emqx_rule_engine, refresh_resources, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}), + ok + end), + meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}), + ok + end), + common_init_per_testcase(), Config; - init_per_testcase(_, Config) -> + common_init_per_testcase(), Config. end_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000), ets:delete(t_restart_resource), + common_end_per_testcases(), + Config; +end_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + common_end_per_testcases(), Config; end_per_testcase(_, Config) -> + common_end_per_testcases(), Config. +common_init_per_testcase() -> + {ok, _} = emqx_rule_monitor:start_link(). +common_end_per_testcases() -> + emqx_rule_monitor:stop(). + t_restart_resource(_) -> - {ok, _} = emqx_rule_monitor:start_link(), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1, @@ -79,11 +109,12 @@ t_restart_resource(_) -> {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( #{type => test_res_1, config => #{}, + restart_interval => 100, description => <<"debug resource">>}), - [{_, 1}] = ets:lookup(t_restart_resource, failed_count), - [{_, 0}] = ets:lookup(t_restart_resource, succ_count), + ?assertMatch([{_, 0}], ets:lookup(t_restart_resource, succ_count)), + ?assertMatch([{_, N}] when N == 1 orelse N == 2 orelse N == 3, + ets:lookup(t_restart_resource, failed_count)), ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), - emqx_rule_monitor:ensure_resource_retrier(ResId, 100), timer:sleep(1000), [{_, 5}] = ets:lookup(t_restart_resource, failed_count), [{_, 1}] = ets:lookup(t_restart_resource, succ_count), @@ -91,9 +122,21 @@ t_restart_resource(_) -> ?assertEqual(0, map_size(Pids)), ok = emqx_rule_engine:unload_providers(), emqx_rule_registry:remove_resource(ResId), - emqx_rule_monitor:stop(), ok. +t_refresh_resources_rules(_) -> + ok = emqx_rule_monitor:async_refresh_resources_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + %% there should be only one refresh handler at the same time + ?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 2}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 2}], ets:lookup(t_refresh_resources_rules, refresh_rules)). + on_resource_create(Id, _) -> case ets:lookup(t_restart_resource, failed_count) of [{_, 5}] -> diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index 2bf731b8c..6a7e7b56a 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -1,5 +1,8 @@ ### Enhancements +- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). + This is to avoid slowing down the boot if some resources spend long time establishing the connection. + - Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124). This is to make the ACL deny logging for subscription behave the same as for publish. diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index 065fa6419..0ada59e35 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -1,5 +1,8 @@ ### 增强 +- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 + 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。 + - 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。 该行为的改变主要是为了跟发布失败时的行为保持一致。