From 1175008a747201785247040363dc4e4a9052aabb Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Thu, 1 Sep 2022 10:56:12 +0800 Subject: [PATCH] feat: cluster-rpc failed fast when some nodes is down --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 40 ++++++++++++++++--------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index ddc4eccc5..ed6f32508 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -72,6 +72,7 @@ -define(TIMEOUT, timer:minutes(1)). -define(APPLY_KIND_REPLICATE, replicate). -define(APPLY_KIND_INITIATE, initiate). +-define(IS_ACTION(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)). -type tnx_id() :: pos_integer(). @@ -123,13 +124,13 @@ start_link(Node, Name, RetryMs) -> %% the result is expected to be `ok | {ok, _}' to indicate success, %% and `{error, _}' to indicate failure. %% -%% The excpetion of the MFA evaluation is captured and translated +%% The exception of the MFA evaluation is captured and translated %% into an `{error, _}' tuple. %% This call tries to wait for all peer nodes to be in-sync before %% returning the result. %% %% In case of partial success, an `error' level log is emitted -%% but the initial localy apply result is returned. +%% but the initial locally apply result is returned. -spec multicall(module(), atom(), list()) -> term(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). @@ -141,11 +142,12 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req Result; {init_failure, Error} -> Error; - {peers_lagging, TnxId, Res, Nodes} -> + {Action, TnxId, Res, Nodes} when ?IS_ACTION(Action) -> %% The init MFA return ok, but some other nodes failed. ?SLOG(error, #{ msg => "cluster_rpc_peers_lagging", - lagging_nodes => Nodes, + action => Action, + nodes => Nodes, tnx_id => TnxId }), Res @@ -193,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) -> InitRes; {init_failure, Error0} -> {init_failure, Error0}; - {peers_lagging, Nodes} -> + {Action, Nodes} when ?IS_ACTION(Action) -> {ok, TnxId0, MFARes} = InitRes, - {peers_lagging, TnxId0, MFARes, Nodes} + {Action, TnxId0, MFARes, Nodes} end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. @@ -509,14 +511,18 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) -> emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg). wait_for_all_nodes_commit(TnxId, Delay, Remain) -> - case lagging_node(TnxId) of + Lagging = lagging_node(TnxId), + Stopped = stopped_nodes(), + case Lagging -- Stopped of + [] when Stopped =:= [] -> + ok; + [] -> + {stopped_nodes, Stopped}; [_ | _] when Remain > 0 -> ok = timer:sleep(Delay), wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); - [] -> - ok; - Nodes -> - {peers_lagging, Nodes} + [_ | _] -> + {peers_lagging, Lagging} end. wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> @@ -527,10 +533,13 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> false when Remain > 0 -> wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); false -> - case lagging_node(TnxId) of + Lagging = lagging_node(TnxId), + Stopped = stopped_nodes(), + case Lagging -- Stopped of %% All commit but The succeedNum > length(nodes()). - [] -> ok; - Nodes -> {peers_lagging, Nodes} + [] when Stopped =:= [] -> ok; + [] -> {stopped_nodes, Stopped}; + [_ | _] -> {peers_lagging, Lagging} end end. @@ -548,6 +557,9 @@ commit_status_trans(Operator, TnxId) -> Result = '$2', mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). +stopped_nodes() -> + ekka_cluster:info(stopped_nodes). + get_retry_ms() -> emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)).