chore(cluster-call): more clear function/var name
This commit is contained in:
parent
ee2fccac02
commit
24aaa5349b
|
@ -85,7 +85,7 @@ multicall(M, F, A) ->
|
||||||
TnxId :: pos_integer(),
|
TnxId :: pos_integer(),
|
||||||
Timeout :: timeout(),
|
Timeout :: timeout(),
|
||||||
Reason :: string().
|
Reason :: string().
|
||||||
multicall(M, F, A, SucceedNum, Timeout) ->
|
multicall(M, F, A, RequireNum, Timeout)when RequireNum >= 1 ->
|
||||||
MFA = {initiate, {M, F, A}},
|
MFA = {initiate, {M, F, A}},
|
||||||
Begin = erlang:monotonic_time(),
|
Begin = erlang:monotonic_time(),
|
||||||
InitRes =
|
InitRes =
|
||||||
|
@ -101,9 +101,23 @@ multicall(M, F, A, SucceedNum, Timeout) ->
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
End = erlang:monotonic_time(),
|
End = erlang:monotonic_time(),
|
||||||
Gap = erlang:convert_time_unit(Begin - End, native, millisecond) + 50,
|
MinDelay = erlang:convert_time_unit(Begin - End, native, millisecond) + 50,
|
||||||
RetryMs = get_retry_ms(),
|
%% Failed after 3 attempts.
|
||||||
confirm_commit(InitRes, SucceedNum, Gap, 3 * max(Gap, RetryMs)).
|
RetryTimeout = 3 * max(MinDelay, get_retry_ms()),
|
||||||
|
OkOrFailed =
|
||||||
|
case InitRes of
|
||||||
|
{ok, _TnxId, _} when RequireNum =:= 1 ->
|
||||||
|
ok;
|
||||||
|
{ok, TnxId, _} when RequireNum =:= all ->
|
||||||
|
wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout);
|
||||||
|
{ok, TnxId, _} when is_integer(RequireNum) ->
|
||||||
|
wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout);
|
||||||
|
Error -> Error
|
||||||
|
end,
|
||||||
|
case OkOrFailed of
|
||||||
|
ok -> InitRes;
|
||||||
|
_ -> OkOrFailed
|
||||||
|
end.
|
||||||
|
|
||||||
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
|
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
|
||||||
query(TnxId) ->
|
query(TnxId) ->
|
||||||
|
@ -294,83 +308,63 @@ trans_query(TnxId) ->
|
||||||
|
|
||||||
-define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))).
|
-define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))).
|
||||||
|
|
||||||
apply_mfa(TnxId, {M, F, A} = MFA) ->
|
apply_mfa(TnxId, {M, F, A}) ->
|
||||||
Result =
|
Res =
|
||||||
try
|
try erlang:apply(M, F, A)
|
||||||
Meta0 = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)},
|
|
||||||
Res = erlang:apply(M, F, A),
|
|
||||||
Succeed =
|
|
||||||
case Res of
|
|
||||||
ok ->
|
|
||||||
OkMeta = Meta0#{msg => <<"succeeded to apply MFA">>, result => Res},
|
|
||||||
?SLOG(notice, OkMeta),
|
|
||||||
{true, OkMeta};
|
|
||||||
{ok, _} ->
|
|
||||||
OkMeta1 = Meta0#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)},
|
|
||||||
?SLOG(notice, OkMeta1),
|
|
||||||
{true, OkMeta1};
|
|
||||||
_ ->
|
|
||||||
NotOkMeta = Meta0#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)},
|
|
||||||
?SLOG(error, NotOkMeta),
|
|
||||||
{false, NotOkMeta}
|
|
||||||
end,
|
|
||||||
{Succeed, Res}
|
|
||||||
catch
|
catch
|
||||||
C : E ->
|
C : E -> {crash, C, E}
|
||||||
CrashMeta = #{msg => <<"crash to apply MFA">>, tnx_id => TnxId, exception => C, reason => ?TO_BIN(E),
|
end,
|
||||||
module => M, function => F, args => ?TO_BIN(A)},
|
Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)},
|
||||||
?SLOG(critical, CrashMeta),
|
log_and_alarm(Res, Meta),
|
||||||
{{false, CrashMeta}, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))}
|
Succeed = (Res =:= ok orelse (is_tuple(Res) andalso Res =/= {} andalso element(1, Res) =:= ok)),
|
||||||
end,
|
{Succeed, Res}.
|
||||||
case Result of
|
|
||||||
{{true, Meta}, OkRes} ->
|
log_and_alarm(ok, Meta) ->
|
||||||
emqx_alarm:deactivate(cluster_rpc_apply_failed, Meta),
|
OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => <<"ok">>},
|
||||||
{true, OkRes};
|
?SLOG(notice, OkMeta),
|
||||||
{{false, Meta}, NotOkRes} ->
|
emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta);
|
||||||
emqx_alarm:activate(cluster_rpc_apply_failed, Meta),
|
log_and_alarm({ok, _} = Res, Meta) ->
|
||||||
{false, NotOkRes}
|
OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)},
|
||||||
|
?SLOG(notice, OkMeta),
|
||||||
|
emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta);
|
||||||
|
log_and_alarm({crash, C, E}, Meta) ->
|
||||||
|
CrashMeta = Meta#{msg => <<"crash to apply MFA">>, exception => C, reason => ?TO_BIN(E)},
|
||||||
|
?SLOG(critical, CrashMeta),
|
||||||
|
emqx_alarm:activate(cluster_rpc_apply_failed, CrashMeta);
|
||||||
|
log_and_alarm(Res, Meta) ->
|
||||||
|
NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)},
|
||||||
|
?SLOG(error, NotOkMeta),
|
||||||
|
emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta).
|
||||||
|
|
||||||
|
wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
|
||||||
|
ok = timer:sleep(Delay),
|
||||||
|
case legging_node(TnxId) of
|
||||||
|
[_|_] when Remain > 0 ->
|
||||||
|
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
|
||||||
|
[] -> ok;
|
||||||
|
Nodes -> {error, Nodes}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
confirm_commit({ok, TnxId, _Res} = Init, all, _Gap, Timeout) when Timeout =< 0 ->
|
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
|
||||||
case node_not_commit(TnxId) of
|
ok = timer:sleep(Delay),
|
||||||
ok -> Init;
|
case length(synced_nodes(TnxId)) >= RequiredNum of
|
||||||
Error -> Error
|
true -> ok;
|
||||||
end;
|
false when Remain > 0 ->
|
||||||
confirm_commit({ok, TnxId, _Res} = Init, all, Gap, Timeout) ->
|
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay);
|
||||||
timer:sleep(Gap),
|
false ->
|
||||||
case node_not_commit(TnxId) of
|
case legging_node(TnxId) of
|
||||||
ok -> Init;
|
[] -> ok; %% All commit but The succeedNum > length(nodes()).
|
||||||
_Error -> confirm_commit(Init, all, Gap, Timeout - Gap)
|
Nodes -> {error, Nodes}
|
||||||
end;
|
|
||||||
confirm_commit(Init, 1, _Gap, _Timeout) -> Init;
|
|
||||||
confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, _Gap, Timeout) when Timeout =< 0 ->
|
|
||||||
case node_already_commit(TnxId, SucceedNum) of
|
|
||||||
ok -> Init;
|
|
||||||
_Error ->
|
|
||||||
case node_not_commit(TnxId) of
|
|
||||||
ok -> Init; %% All commit but The succeedNum is greater than the length(nodes()).
|
|
||||||
Error -> Error
|
|
||||||
end
|
end
|
||||||
end;
|
|
||||||
confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, Gap, Timeout) ->
|
|
||||||
timer:sleep(Gap),
|
|
||||||
case node_already_commit(TnxId, SucceedNum) of
|
|
||||||
ok -> Init;
|
|
||||||
_Error -> confirm_commit(Init, SucceedNum, Gap, Timeout - Gap)
|
|
||||||
end;
|
|
||||||
confirm_commit(Error, _SucceedNum, _Gap, _Timeout) -> Error.
|
|
||||||
|
|
||||||
node_not_commit(TnxId) ->
|
|
||||||
case transaction(fun commit_status_trans/2, ['<', TnxId]) of
|
|
||||||
{atomic, []} -> ok;
|
|
||||||
{atomic, Nodes} -> {error, Nodes}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
node_already_commit(TnxId, SucceedNum) ->
|
legging_node(TnxId) ->
|
||||||
case transaction(fun commit_status_trans/2, ['>=', TnxId]) of
|
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]),
|
||||||
{atomic, Nodes} when length(Nodes) >= SucceedNum -> ok;
|
Nodes.
|
||||||
{atomic, Nodes} -> {error, Nodes}
|
|
||||||
end.
|
synced_nodes(TnxId) ->
|
||||||
|
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]),
|
||||||
|
Nodes.
|
||||||
|
|
||||||
commit_status_trans(Operator, TnxId) ->
|
commit_status_trans(Operator, TnxId) ->
|
||||||
MatchHead = #cluster_rpc_commit{tnx_id = '$1', node = '$2', _ = '_'},
|
MatchHead = #cluster_rpc_commit{tnx_id = '$1', node = '$2', _ = '_'},
|
||||||
|
|
|
@ -94,7 +94,7 @@ t_commit_crash_test(_Config) ->
|
||||||
{atomic, []} = emqx_cluster_rpc:status(),
|
{atomic, []} = emqx_cluster_rpc:status(),
|
||||||
{M, F, A} = {?MODULE, no_exist_function, []},
|
{M, F, A} = {?MODULE, no_exist_function, []},
|
||||||
Error = emqx_cluster_rpc:multicall(M, F, A),
|
Error = emqx_cluster_rpc:multicall(M, F, A),
|
||||||
?assertEqual({error, "TnxId(1) apply MFA({emqx_cluster_rpc_SUITE,no_exist_function,[]}) crash"}, Error),
|
?assertEqual({error, {crash,error,undef}}, Error),
|
||||||
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
|
?assertEqual({atomic, []}, emqx_cluster_rpc:status()),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue