chore: cluster_call early aborted

This commit is contained in:
zhongwencool 2021-08-27 11:15:46 +08:00
parent c1c24af002
commit e35a6c7350
1 changed files with 7 additions and 12 deletions

View File

@ -156,7 +156,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
catch_up(#{node := Node, retry_interval := RetryMs} = State) -> catch_up(#{node := Node, retry_interval := RetryMs} = State) ->
case transaction(fun get_next_mfa/1, [Node]) of case transaction(fun read_next_mfa/1, [Node]) of
{atomic, caught_up} -> ?TIMEOUT; {atomic, caught_up} -> ?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA), {Succeed, _} = apply_mfa(NextId, MFA),
@ -166,22 +166,19 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) ->
{atomic, ok} -> catch_up(State); {atomic, ok} -> catch_up(State);
Error -> Error ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "mnesia write transaction failed", msg => "failed to commit applied call",
node => Node, applied_id => NextId,
nextId => NextId,
error => Error}), error => Error}),
RetryMs RetryMs
end; end;
false -> RetryMs false -> RetryMs
end; end;
{aborted, Reason} -> {aborted, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}),
msg => "get_next_mfa transaction failed",
node => Node, error => Reason}),
RetryMs RetryMs
end. end.
get_next_mfa(Node) -> read_next_mfa(Node) ->
NextId = NextId =
case mnesia:wread({?CLUSTER_COMMIT, Node}) of case mnesia:wread({?CLUSTER_COMMIT, Node}) of
[] -> [] ->
@ -219,10 +216,9 @@ do_catch_up(ToTnxId, Node) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "catch up failed!", msg => "catch up failed!",
last_applied_id => LastAppliedId, last_applied_id => LastAppliedId,
node => Node,
to_tnx_id => ToTnxId to_tnx_id => ToTnxId
}), }),
{error, Reason} mnesia:abort(Reason)
end. end.
commit(Node, TnxId) -> commit(Node, TnxId) ->
@ -250,8 +246,7 @@ init_mfa(Node, MFA) ->
do_catch_up_in_one_trans(LatestId, Node) -> do_catch_up_in_one_trans(LatestId, Node) ->
case do_catch_up(LatestId, Node) of case do_catch_up(LatestId, Node) of
caught_up -> ok; caught_up -> ok;
ok -> do_catch_up_in_one_trans(LatestId, Node); ok -> do_catch_up_in_one_trans(LatestId, Node)
{error, Reason} -> mnesia:abort(Reason)
end. end.
transaction(Func, Args) -> transaction(Func, Args) ->