Merge pull request #8857 from zhongwencool/cluster-rpc-when-node-down

feat: cluster-rpc failed fast when some nodes is down
This commit is contained in:
zhongwencool 2022-09-07 14:34:36 +08:00 committed by GitHub
commit 2943cbc261
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 16 deletions

View File

@ -6,6 +6,7 @@
* Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867) * Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867)
* Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887) * Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887)
* Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893) * Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893)
* Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857)
## Enhancements ## Enhancements

View File

@ -72,6 +72,7 @@
-define(TIMEOUT, timer:minutes(1)). -define(TIMEOUT, timer:minutes(1)).
-define(APPLY_KIND_REPLICATE, replicate). -define(APPLY_KIND_REPLICATE, replicate).
-define(APPLY_KIND_INITIATE, initiate). -define(APPLY_KIND_INITIATE, initiate).
-define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)).
-type tnx_id() :: pos_integer(). -type tnx_id() :: pos_integer().
@ -123,13 +124,13 @@ start_link(Node, Name, RetryMs) ->
%% the result is expected to be `ok | {ok, _}' to indicate success, %% the result is expected to be `ok | {ok, _}' to indicate success,
%% and `{error, _}' to indicate failure. %% and `{error, _}' to indicate failure.
%% %%
%% The excpetion of the MFA evaluation is captured and translated %% The exception of the MFA evaluation is captured and translated
%% into an `{error, _}' tuple. %% into an `{error, _}' tuple.
%% This call tries to wait for all peer nodes to be in-sync before %% This call tries to wait for all peer nodes to be in-sync before
%% returning the result. %% returning the result.
%% %%
%% In case of partial success, an `error' level log is emitted %% In case of partial success, an `error' level log is emitted
%% but the initial localy apply result is returned. %% but the initial local apply result is returned.
-spec multicall(module(), atom(), list()) -> term(). -spec multicall(module(), atom(), list()) -> term().
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)). multicall(M, F, A, all, timer:minutes(2)).
@ -141,11 +142,12 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req
Result; Result;
{init_failure, Error} -> {init_failure, Error} ->
Error; Error;
{peers_lagging, TnxId, Res, Nodes} -> {Status, TnxId, Res, Nodes} when ?IS_STATUS(Status) ->
%% The init MFA return ok, but some other nodes failed. %% The init MFA return ok, but some other nodes failed.
?SLOG(error, #{ ?SLOG(error, #{
msg => "cluster_rpc_peers_lagging", msg => "cluster_rpc_peers_lagging",
lagging_nodes => Nodes, status => Status,
nodes => Nodes,
tnx_id => TnxId tnx_id => TnxId
}), }),
Res Res
@ -193,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) ->
InitRes; InitRes;
{init_failure, Error0} -> {init_failure, Error0} ->
{init_failure, Error0}; {init_failure, Error0};
{peers_lagging, Nodes} -> {Status, Nodes} when ?IS_STATUS(Status) ->
{ok, TnxId0, MFARes} = InitRes, {ok, TnxId0, MFARes} = InitRes,
{peers_lagging, TnxId0, MFARes, Nodes} {Status, TnxId0, MFARes, Nodes}
end. end.
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@ -509,14 +511,18 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->
emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg). emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg).
wait_for_all_nodes_commit(TnxId, Delay, Remain) -> wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
case lagging_node(TnxId) of Lagging = lagging_nodes(TnxId),
Stopped = stopped_nodes(),
case Lagging -- Stopped of
[] when Stopped =:= [] ->
ok;
[] ->
{stopped_nodes, Stopped};
[_ | _] when Remain > 0 -> [_ | _] when Remain > 0 ->
ok = timer:sleep(Delay), ok = timer:sleep(Delay),
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
[] -> [_ | _] ->
ok; {peers_lagging, Lagging}
Nodes ->
{peers_lagging, Nodes}
end. end.
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
@ -527,14 +533,18 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
false when Remain > 0 -> false when Remain > 0 ->
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
false -> false ->
case lagging_node(TnxId) of case lagging_nodes(TnxId) of
%% All commit but The succeedNum > length(nodes()). [] ->
[] -> ok; ok;
Nodes -> {peers_lagging, Nodes} Lagging ->
case stopped_nodes() of
[] -> {peers_lagging, Lagging};
Stopped -> {stopped_nodes, Stopped}
end
end end
end. end.
lagging_node(TnxId) -> lagging_nodes(TnxId) ->
{atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]), {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]),
Nodes. Nodes.
@ -548,6 +558,9 @@ commit_status_trans(Operator, TnxId) ->
Result = '$2', Result = '$2',
mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
stopped_nodes() ->
ekka_cluster:info(stopped_nodes).
get_retry_ms() -> get_retry_ms() ->
emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)). emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)).