fix(cluster_rpc): Detect stopped replicant nodes

This commit is contained in:
ieQu1 2022-12-14 10:39:44 +01:00 committed by Zaiming (Stone) Shi
parent 7985cd3536
commit 2f208c3ab9
1 changed files with 4 additions and 6 deletions

View File

@ -512,7 +512,7 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->
wait_for_all_nodes_commit(TnxId, Delay, Remain) -> wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
Lagging = lagging_nodes(TnxId), Lagging = lagging_nodes(TnxId),
Stopped = stopped_nodes(), Stopped = Lagging -- mria_mnesia:running_nodes(),
case Lagging -- Stopped of case Lagging -- Stopped of
[] when Stopped =:= [] -> [] when Stopped =:= [] ->
ok; ok;
@ -537,9 +537,10 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
[] -> [] ->
ok; ok;
Lagging -> Lagging ->
case stopped_nodes() of Stopped = Lagging -- mria_mnesia:running_nodes(),
case Stopped of
[] -> {peers_lagging, Lagging}; [] -> {peers_lagging, Lagging};
Stopped -> {stopped_nodes, Stopped} _ -> {stopped_nodes, Stopped}
end end
end end
end. end.
@ -558,9 +559,6 @@ commit_status_trans(Operator, TnxId) ->
Result = '$2', Result = '$2',
mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
stopped_nodes() ->
ekka_cluster:info(stopped_nodes).
get_retry_ms() -> get_retry_ms() ->
emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)). emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)).