From e35a6c73504c096c1ed08797e16543664151d3c5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 27 Aug 2021 11:15:46 +0800 Subject: [PATCH] chore: cluster_call early aborted --- apps/emqx_machine/src/emqx_cluster_rpc.erl | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index c2b2d086d..346ca5025 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -156,7 +156,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== catch_up(#{node := Node, retry_interval := RetryMs} = State) -> - case transaction(fun get_next_mfa/1, [Node]) of + case transaction(fun read_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA), @@ -166,22 +166,19 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) -> {atomic, ok} -> catch_up(State); Error -> ?SLOG(error, #{ - msg => "mnesia write transaction failed", - node => Node, - nextId => NextId, + msg => "failed to commit applied call", + applied_id => NextId, error => Error}), RetryMs end; false -> RetryMs end; {aborted, Reason} -> - ?SLOG(error, #{ - msg => "get_next_mfa transaction failed", - node => Node, error => Reason}), + ?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}), RetryMs end. -get_next_mfa(Node) -> +read_next_mfa(Node) -> NextId = case mnesia:wread({?CLUSTER_COMMIT, Node}) of [] -> @@ -219,10 +216,9 @@ do_catch_up(ToTnxId, Node) -> ?SLOG(error, #{ msg => "catch up failed!", last_applied_id => LastAppliedId, - node => Node, to_tnx_id => ToTnxId }), - {error, Reason} + mnesia:abort(Reason) end. commit(Node, TnxId) -> @@ -250,8 +246,7 @@ init_mfa(Node, MFA) -> do_catch_up_in_one_trans(LatestId, Node) -> case do_catch_up(LatestId, Node) of caught_up -> ok; - ok -> do_catch_up_in_one_trans(LatestId, Node); - {error, Reason} -> mnesia:abort(Reason) + ok -> do_catch_up_in_one_trans(LatestId, Node) end. transaction(Func, Args) ->