From 1175008a747201785247040363dc4e4a9052aabb Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Thu, 1 Sep 2022 10:56:12 +0800 Subject: [PATCH 1/4] feat: cluster-rpc failed fast when some nodes is down --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 40 ++++++++++++++++--------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index ddc4eccc5..ed6f32508 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -72,6 +72,7 @@ -define(TIMEOUT, timer:minutes(1)). -define(APPLY_KIND_REPLICATE, replicate). -define(APPLY_KIND_INITIATE, initiate). +-define(IS_ACTION(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)). -type tnx_id() :: pos_integer(). @@ -123,13 +124,13 @@ start_link(Node, Name, RetryMs) -> %% the result is expected to be `ok | {ok, _}' to indicate success, %% and `{error, _}' to indicate failure. %% -%% The excpetion of the MFA evaluation is captured and translated +%% The exception of the MFA evaluation is captured and translated %% into an `{error, _}' tuple. %% This call tries to wait for all peer nodes to be in-sync before %% returning the result. %% %% In case of partial success, an `error' level log is emitted -%% but the initial localy apply result is returned. +%% but the initial locally apply result is returned. -spec multicall(module(), atom(), list()) -> term(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). @@ -141,11 +142,12 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req Result; {init_failure, Error} -> Error; - {peers_lagging, TnxId, Res, Nodes} -> + {Action, TnxId, Res, Nodes} when ?IS_ACTION(Action) -> %% The init MFA return ok, but some other nodes failed. ?SLOG(error, #{ msg => "cluster_rpc_peers_lagging", - lagging_nodes => Nodes, + action => Action, + nodes => Nodes, tnx_id => TnxId }), Res @@ -193,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) -> InitRes; {init_failure, Error0} -> {init_failure, Error0}; - {peers_lagging, Nodes} -> + {Action, Nodes} when ?IS_ACTION(Action) -> {ok, TnxId0, MFARes} = InitRes, - {peers_lagging, TnxId0, MFARes, Nodes} + {Action, TnxId0, MFARes, Nodes} end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. @@ -509,14 +511,18 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) -> emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg). wait_for_all_nodes_commit(TnxId, Delay, Remain) -> - case lagging_node(TnxId) of + Lagging = lagging_node(TnxId), + Stopped = stopped_nodes(), + case Lagging -- Stopped of + [] when Stopped =:= [] -> + ok; + [] -> + {stopped_nodes, Stopped}; [_ | _] when Remain > 0 -> ok = timer:sleep(Delay), wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); - [] -> - ok; - Nodes -> - {peers_lagging, Nodes} + [_ | _] -> + {peers_lagging, Lagging} end. wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> @@ -527,10 +533,13 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> false when Remain > 0 -> wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); false -> - case lagging_node(TnxId) of + Lagging = lagging_node(TnxId), + Stopped = stopped_nodes(), + case Lagging -- Stopped of %% All commit but The succeedNum > length(nodes()). - [] -> ok; - Nodes -> {peers_lagging, Nodes} + [] when Stopped =:= [] -> ok; + [] -> {stopped_nodes, Stopped}; + [_ | _] -> {peers_lagging, Lagging} end end. @@ -548,6 +557,9 @@ commit_status_trans(Operator, TnxId) -> Result = '$2', mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). +stopped_nodes() -> + ekka_cluster:info(stopped_nodes). + get_retry_ms() -> emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)). From 758b1979ab7912cc205c65926dfcd09c30265fc5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 6 Sep 2022 16:09:08 +0800 Subject: [PATCH 2/4] chore: Apply suggestions from code review Co-authored-by: Zaiming (Stone) Shi --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index ed6f32508..5de2932f9 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -72,7 +72,7 @@ -define(TIMEOUT, timer:minutes(1)). -define(APPLY_KIND_REPLICATE, replicate). -define(APPLY_KIND_INITIATE, initiate). --define(IS_ACTION(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)). +-define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)). -type tnx_id() :: pos_integer(). @@ -130,7 +130,7 @@ start_link(Node, Name, RetryMs) -> %% returning the result. %% %% In case of partial success, an `error' level log is emitted -%% but the initial locally apply result is returned. +%% but the initial local apply result is returned. -spec multicall(module(), atom(), list()) -> term(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). @@ -142,11 +142,11 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req Result; {init_failure, Error} -> Error; - {Action, TnxId, Res, Nodes} when ?IS_ACTION(Action) -> + {Status, TnxId, Res, Nodes} when ?IS_STATUS(Status) -> %% The init MFA return ok, but some other nodes failed. ?SLOG(error, #{ msg => "cluster_rpc_peers_lagging", - action => Action, + status=> Status, nodes => Nodes, tnx_id => TnxId }), @@ -195,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) -> InitRes; {init_failure, Error0} -> {init_failure, Error0}; - {Action, Nodes} when ?IS_ACTION(Action) -> + {Status, Nodes} when ?IS_STATUS(Status) -> {ok, TnxId0, MFARes} = InitRes, - {Action, TnxId0, MFARes, Nodes} + {Status, TnxId0, MFARes, Nodes} end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. From 33341011d8bdbb2c3ee9373e6eb3c3b646ebf6ee Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 6 Sep 2022 16:17:51 +0800 Subject: [PATCH 3/4] chore: improve wait_for_nodes_commit/4 function --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 5de2932f9..4b3886798 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -146,7 +146,7 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req %% The init MFA return ok, but some other nodes failed. ?SLOG(error, #{ msg => "cluster_rpc_peers_lagging", - status=> Status, + status => Status, nodes => Nodes, tnx_id => TnxId }), @@ -511,7 +511,7 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) -> emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg). wait_for_all_nodes_commit(TnxId, Delay, Remain) -> - Lagging = lagging_node(TnxId), + Lagging = lagging_nodes(TnxId), Stopped = stopped_nodes(), case Lagging -- Stopped of [] when Stopped =:= [] -> @@ -533,17 +533,18 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> false when Remain > 0 -> wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); false -> - Lagging = lagging_node(TnxId), - Stopped = stopped_nodes(), - case Lagging -- Stopped of - %% All commit but The succeedNum > length(nodes()). - [] when Stopped =:= [] -> ok; - [] -> {stopped_nodes, Stopped}; - [_ | _] -> {peers_lagging, Lagging} + case lagging_nodes(TnxId) of + [] -> + ok; + Lagging -> + case stopped_nodes() of + [] -> {peers_lagging, Lagging}; + Stopped -> {stopped_nodes, Stopped} + end end end. -lagging_node(TnxId) -> +lagging_nodes(TnxId) -> {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]), Nodes. From fdbf8c1c27d8ac358eb84f39b20ee328d9cafbad Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 7 Sep 2022 10:26:26 +0800 Subject: [PATCH 4/4] chore: update changelog --- CHANGES-5.0.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index 82d907458..a9e694081 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -6,6 +6,7 @@ * Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867) * Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887) * Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893) +* Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857) ## Enhancements