fix: rolling upgrade failed on undef funcs
This commit is contained in:
parent
15248eb069
commit
006d2e5f29
|
@ -176,10 +176,12 @@
|
|||
end
|
||||
end()).
|
||||
|
||||
-define(RPC_TIMEOUT, 30000).
|
||||
|
||||
-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
|
||||
|
||||
-define(CLUSTER_CALL(Func, Args, ResParttern),
|
||||
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of
|
||||
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of
|
||||
{ResL, []} ->
|
||||
case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
|
||||
[] -> ResL;
|
||||
|
@ -192,6 +194,37 @@
|
|||
throw({Func, {failed_on_nodes, BadNodes}})
|
||||
end end()).
|
||||
|
||||
%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined
|
||||
-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs),
|
||||
fun() ->
|
||||
RNodes = ekka_mnesia:running_nodes(),
|
||||
ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT),
|
||||
Res = lists:zip(RNodes, ResL),
|
||||
BadRes = lists:filtermap(fun
|
||||
({_Node, {ok, ResParttern}}) ->
|
||||
false;
|
||||
({Node, {error, {exception, undef, _}}}) ->
|
||||
try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of
|
||||
ResParttern ->
|
||||
false;
|
||||
OtherRes ->
|
||||
{true, #{rpc_type => call, func => FallbackFunc,
|
||||
result => OtherRes, node => Node}}
|
||||
catch
|
||||
Err:Reason ->
|
||||
{true, #{rpc_type => call, func => FallbackFunc,
|
||||
exception => {Err, Reason}, node => Node}}
|
||||
end;
|
||||
({Node, OtherRes}) ->
|
||||
{true, #{rpc_type => multicall, func => FallbackFunc,
|
||||
result => OtherRes, node => Node}}
|
||||
end, Res),
|
||||
case BadRes of
|
||||
[] -> Res;
|
||||
_ -> throw(BadRes)
|
||||
end
|
||||
end()).
|
||||
|
||||
%% Tables
|
||||
-define(RULE_TAB, emqx_rule).
|
||||
-define(ACTION_TAB, emqx_rule_action).
|
||||
|
|
|
@ -3,6 +3,15 @@
|
|||
{VSN,
|
||||
[{"4.3.16",
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -11,6 +20,14 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.15",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
@ -20,6 +37,13 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.14",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
|
@ -30,6 +54,11 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.13",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -42,6 +71,11 @@
|
|||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.12",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -54,6 +88,10 @@
|
|||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.11",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -67,6 +105,10 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.10",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
|
@ -80,6 +122,9 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.9",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
|
@ -96,6 +141,8 @@
|
|||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.8",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
|
@ -113,6 +160,8 @@
|
|||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -130,6 +179,8 @@
|
|||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.6",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -147,6 +198,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.5",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -164,6 +217,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.4",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -181,6 +236,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.3",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -198,6 +255,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.2",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -216,6 +275,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.1",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -234,6 +295,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.0",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -253,6 +316,15 @@
|
|||
{<<".*">>,[]}],
|
||||
[{"4.3.16",
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -261,6 +333,14 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.15",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
@ -270,6 +350,13 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.14",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
|
@ -280,6 +367,11 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.13",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -292,6 +384,11 @@
|
|||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.12",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -304,6 +401,10 @@
|
|||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.11",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -317,6 +418,10 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.10",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
|
@ -330,6 +435,9 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.9",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
|
@ -346,6 +454,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.8",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -363,6 +473,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -380,6 +492,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.6",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -397,6 +511,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.5",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -414,6 +530,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.4",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -431,6 +549,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.3",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
@ -448,6 +568,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.2",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
@ -466,6 +588,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.1",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
@ -484,6 +608,8 @@
|
|||
{delete_module,emqx_rule_date}]},
|
||||
{"4.3.0",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqlparser,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
, refresh_resources/0
|
||||
, refresh_resource/1
|
||||
, refresh_rule/1
|
||||
, refresh_rules/0
|
||||
, refresh_rules_when_boot/0
|
||||
, refresh_actions/1
|
||||
, refresh_actions/2
|
||||
, refresh_resource_status/0
|
||||
|
@ -256,15 +256,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
|
|||
created_at = erlang:system_time(millisecond)
|
||||
},
|
||||
ok = emqx_rule_registry:add_resource(Resource),
|
||||
InitArgs = [M, F, ResId, Config],
|
||||
case Retry of
|
||||
with_retry ->
|
||||
%% Note that we will return OK in case of resource creation failure,
|
||||
%% A timer is started to re-start the resource later.
|
||||
_ = (catch (?CLUSTER_CALL(init_resource_with_retrier, [M, F, ResId, Config]))),
|
||||
_ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok,
|
||||
init_resource, InitArgs)
|
||||
catch throw : Reason ->
|
||||
?LOG(error, "create_resource failed: ~0p", [Reason])
|
||||
end,
|
||||
{ok, Resource};
|
||||
no_retry ->
|
||||
try
|
||||
_ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
|
||||
_ = ?CLUSTER_CALL(init_resource, InitArgs),
|
||||
{ok, Resource}
|
||||
catch throw : Reason ->
|
||||
{error, Reason}
|
||||
|
@ -481,8 +486,8 @@ refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
|
|||
emqx_rule_registry:find_resource_type(Type),
|
||||
ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config).
|
||||
|
||||
-spec(refresh_rules() -> ok).
|
||||
refresh_rules() ->
|
||||
-spec(refresh_rules_when_boot() -> ok).
|
||||
refresh_rules_when_boot() ->
|
||||
lists:foreach(fun
|
||||
(#rule{enabled = true} = Rule) ->
|
||||
try refresh_rule(Rule)
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
, stop/0
|
||||
, async_refresh_resources_rules/0
|
||||
, ensure_resource_retrier/2
|
||||
, handler/3
|
||||
, retry_loop/3
|
||||
]).
|
||||
|
||||
start_link() ->
|
||||
|
@ -47,20 +47,27 @@ init([]) ->
|
|||
{ok, #{retryers => #{}}}.
|
||||
|
||||
async_refresh_resources_rules() ->
|
||||
gen_server:cast(?MODULE, {create_handler, refresh, resources_and_rules, #{}}).
|
||||
gen_server:cast(?MODULE, async_refresh).
|
||||
|
||||
ensure_resource_retrier(ResId, Interval) ->
|
||||
gen_server:cast(?MODULE, {create_handler, resource, ResId, Interval}).
|
||||
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
|
||||
|
||||
handle_call(_Msg, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast({create_handler, Tag, Obj, Args}, State) ->
|
||||
handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) ->
|
||||
%% the refresh task is already in progress, we discard the duplication
|
||||
{noreply, State};
|
||||
handle_cast(async_refresh, State) ->
|
||||
Pid = spawn_link(fun do_async_refresh/0),
|
||||
{noreply, State#{boot_refresh_pid => Pid}};
|
||||
|
||||
handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
|
||||
Objects = maps:get(Tag, State, #{}),
|
||||
NewState = case maps:find(Obj, Objects) of
|
||||
error ->
|
||||
update_object(Tag, Obj,
|
||||
create_handler(Tag, Obj, Args), State);
|
||||
create_restart_handler(Tag, Obj, Interval), State);
|
||||
{ok, _Pid} ->
|
||||
State
|
||||
end,
|
||||
|
@ -69,7 +76,13 @@ handle_cast({create_handler, Tag, Obj, Args}, State) ->
|
|||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) ->
|
||||
{noreply, State#{boot_refresh_pid => undefined}};
|
||||
handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) ->
|
||||
%% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'.
|
||||
%% Instead we rely on the user to trigger a manual retry for the resources, and then enable
|
||||
%% the rules after resources are connected.
|
||||
case maps:take(Pid, Retryers) of
|
||||
{{Tag, Obj}, Retryers2} ->
|
||||
Objects = maps:get(Tag, State, #{}),
|
||||
|
@ -97,12 +110,12 @@ update_object(Tag, Obj, Retryer, State) ->
|
|||
retryers => Retryers#{Retryer => {Tag, Obj}}
|
||||
}.
|
||||
|
||||
create_handler(Tag, Obj, Args) ->
|
||||
?LOG(info, "create monitor handler for ~p ~p, args: ~p", [Tag, Obj, Args]),
|
||||
%% spawn a dedicated process to handle the task asynchronously
|
||||
spawn_link(?MODULE, handler, [Tag, Obj, Args]).
|
||||
create_restart_handler(Tag, Obj, Interval) ->
|
||||
?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]),
|
||||
%% spawn a dedicated process to handle the restarting asynchronously
|
||||
spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
|
||||
|
||||
handler(resource, ResId, Interval) ->
|
||||
retry_loop(resource, ResId, Interval) ->
|
||||
case emqx_rule_registry:find_resource(ResId) of
|
||||
{ok, #resource{type = Type, config = Config}} ->
|
||||
try
|
||||
|
@ -115,17 +128,17 @@ handler(resource, ResId, Interval) ->
|
|||
?LOG(warning, "init_resource failed: ~p, ~0p",
|
||||
[{Err, Reason}, ST]),
|
||||
timer:sleep(Interval),
|
||||
?MODULE:handler(resource, ResId, Interval)
|
||||
?MODULE:retry_loop(resource, ResId, Interval)
|
||||
end;
|
||||
not_found ->
|
||||
ok
|
||||
end;
|
||||
end.
|
||||
|
||||
handler(refresh, resources_and_rules, _) ->
|
||||
do_async_refresh() ->
|
||||
%% NOTE: the order matters.
|
||||
%% We should always refresh the resources first and then the rules.
|
||||
ok = emqx_rule_engine:refresh_resources(),
|
||||
ok = emqx_rule_engine:refresh_rules().
|
||||
ok = emqx_rule_engine:refresh_rules_when_boot().
|
||||
|
||||
refresh_and_enable_rules_of_resource(ResId) ->
|
||||
lists:foreach(
|
||||
|
|
|
@ -65,7 +65,7 @@ init_per_testcase(t_refresh_resources_rules, Config) ->
|
|||
ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}),
|
||||
ok
|
||||
end),
|
||||
meck:expect(emqx_rule_engine, refresh_rules, fun() ->
|
||||
meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() ->
|
||||
timer:sleep(500),
|
||||
ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}),
|
||||
ok
|
||||
|
@ -128,7 +128,7 @@ t_refresh_resources_rules(_) ->
|
|||
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
||||
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
||||
%% there should be only one refresh handler at the same time
|
||||
?assertMatch(#{retryers := Pids} when map_size(Pids) =:= 1, sys:get_state(whereis(emqx_rule_monitor))),
|
||||
?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))),
|
||||
timer:sleep(1200),
|
||||
?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)),
|
||||
?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)),
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
### Enhancements
|
||||
|
||||
- Asynchronously refresh the resources and rules when emqx bootup. [#9199](https://github.com/emqx/emqx/pull/9199).
|
||||
This is to avoid blocking the start of the emqx_rule_engine app if some of the resources spend too long time on establishing the connection.
|
||||
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
|
||||
This is to avoid slowing down the boot if some resources spend long time establishing the connection.
|
||||
|
||||
- Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124).
|
||||
This is to make the ACL deny logging for subscription behave the same as for publish.
|
||||
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
### 增强
|
||||
|
||||
- 在 emqx 启动时,异步地刷新资源和规则。[#9199](https://github.com/emqx/emqx/pull/9199).
|
||||
这个改动是为了避免因为一些资源连接建立过慢,从而阻塞 emqx_rule_engine 的启动。
|
||||
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
|
||||
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。
|
||||
|
||||
- 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。
|
||||
该行为的改变主要是为了跟发布失败时的行为保持一致。
|
||||
|
||||
|
|
Loading…
Reference in New Issue