diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index e5225af3e..0a387df50 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -6,6 +6,7 @@ * Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867) * Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887) * Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893) +* Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857) ## Enhancements diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index ddc4eccc5..4b3886798 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -72,6 +72,7 @@ -define(TIMEOUT, timer:minutes(1)). -define(APPLY_KIND_REPLICATE, replicate). -define(APPLY_KIND_INITIATE, initiate). +-define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)). -type tnx_id() :: pos_integer(). @@ -123,13 +124,13 @@ start_link(Node, Name, RetryMs) -> %% the result is expected to be `ok | {ok, _}' to indicate success, %% and `{error, _}' to indicate failure. %% -%% The excpetion of the MFA evaluation is captured and translated +%% The exception of the MFA evaluation is captured and translated %% into an `{error, _}' tuple. %% This call tries to wait for all peer nodes to be in-sync before %% returning the result. %% %% In case of partial success, an `error' level log is emitted -%% but the initial localy apply result is returned. +%% but the initial local apply result is returned. -spec multicall(module(), atom(), list()) -> term(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). @@ -141,11 +142,12 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req Result; {init_failure, Error} -> Error; - {peers_lagging, TnxId, Res, Nodes} -> + {Status, TnxId, Res, Nodes} when ?IS_STATUS(Status) -> %% The init MFA return ok, but some other nodes failed. ?SLOG(error, #{ msg => "cluster_rpc_peers_lagging", - lagging_nodes => Nodes, + status => Status, + nodes => Nodes, tnx_id => TnxId }), Res @@ -193,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) -> InitRes; {init_failure, Error0} -> {init_failure, Error0}; - {peers_lagging, Nodes} -> + {Status, Nodes} when ?IS_STATUS(Status) -> {ok, TnxId0, MFARes} = InitRes, - {peers_lagging, TnxId0, MFARes, Nodes} + {Status, TnxId0, MFARes, Nodes} end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. @@ -509,14 +511,18 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) -> emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg). wait_for_all_nodes_commit(TnxId, Delay, Remain) -> - case lagging_node(TnxId) of + Lagging = lagging_nodes(TnxId), + Stopped = stopped_nodes(), + case Lagging -- Stopped of + [] when Stopped =:= [] -> + ok; + [] -> + {stopped_nodes, Stopped}; [_ | _] when Remain > 0 -> ok = timer:sleep(Delay), wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); - [] -> - ok; - Nodes -> - {peers_lagging, Nodes} + [_ | _] -> + {peers_lagging, Lagging} end. wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> @@ -527,14 +533,18 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> false when Remain > 0 -> wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); false -> - case lagging_node(TnxId) of - %% All commit but The succeedNum > length(nodes()). - [] -> ok; - Nodes -> {peers_lagging, Nodes} + case lagging_nodes(TnxId) of + [] -> + ok; + Lagging -> + case stopped_nodes() of + [] -> {peers_lagging, Lagging}; + Stopped -> {stopped_nodes, Stopped} + end end end. -lagging_node(TnxId) -> +lagging_nodes(TnxId) -> {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]), Nodes. @@ -548,6 +558,9 @@ commit_status_trans(Operator, TnxId) -> Result = '$2', mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). +stopped_nodes() -> + ekka_cluster:info(stopped_nodes). + get_retry_ms() -> emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)).