fix(cluster-call): fix typo and add is_success/1 help function

This commit is contained in:
zhongwencool 2021-09-10 09:40:30 +08:00
parent 24aaa5349b
commit 6bc7378e63
1 changed files with 24 additions and 27 deletions

View File

@ -85,7 +85,7 @@ multicall(M, F, A) ->
TnxId :: pos_integer(), TnxId :: pos_integer(),
Timeout :: timeout(), Timeout :: timeout(),
Reason :: string(). Reason :: string().
multicall(M, F, A, RequireNum, Timeout)when RequireNum >= 1 -> multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
MFA = {initiate, {M, F, A}}, MFA = {initiate, {M, F, A}},
Begin = erlang:monotonic_time(), Begin = erlang:monotonic_time(),
InitRes = InitRes =
@ -102,7 +102,7 @@ multicall(M, F, A, RequireNum, Timeout)when RequireNum >= 1 ->
end, end,
End = erlang:monotonic_time(), End = erlang:monotonic_time(),
MinDelay = erlang:convert_time_unit(Begin - End, native, millisecond) + 50, MinDelay = erlang:convert_time_unit(Begin - End, native, millisecond) + 50,
%% Failed after 3 attempts. %% Fail after 3 attempts.
RetryTimeout = 3 * max(MinDelay, get_retry_ms()), RetryTimeout = 3 * max(MinDelay, get_retry_ms()),
OkOrFailed = OkOrFailed =
case InitRes of case InitRes of
@ -191,7 +191,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
{atomic, caught_up} -> ?TIMEOUT; {atomic, caught_up} -> ?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA), {Succeed, _} = apply_mfa(NextId, MFA),
case Succeed orelse SkipResult of case Succeed orelse SkipResult of
true -> true ->
case transaction(fun commit/2, [Node, NextId]) of case transaction(fun commit/2, [Node, NextId]) of
{atomic, ok} -> catch_up(State, false); {atomic, ok} -> catch_up(State, false);
@ -312,34 +312,31 @@ apply_mfa(TnxId, {M, F, A}) ->
Res = Res =
try erlang:apply(M, F, A) try erlang:apply(M, F, A)
catch catch
C : E -> {crash, C, E} Class:Reason:Stacktrace ->
{error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
end, end,
Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)}, Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)},
log_and_alarm(Res, Meta), IsSuccess = is_success(Res),
Succeed = (Res =:= ok orelse (is_tuple(Res) andalso Res =/= {} andalso element(1, Res) =:= ok)), log_and_alarm(IsSuccess, Res, Meta),
{Succeed, Res}. {IsSuccess, Res}.
log_and_alarm(ok, Meta) -> is_success(ok) -> true;
OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => <<"ok">>}, is_success({ok, _}) -> true;
?SLOG(notice, OkMeta), is_success(_) -> false.
emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta);
log_and_alarm({ok, _} = Res, Meta) -> log_and_alarm(true, Res, Meta) ->
OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)}, OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => Res},
?SLOG(notice, OkMeta), ?SLOG(debug, OkMeta),
emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta); emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)});
log_and_alarm({crash, C, E}, Meta) -> log_and_alarm(false, Res, Meta) ->
CrashMeta = Meta#{msg => <<"crash to apply MFA">>, exception => C, reason => ?TO_BIN(E)}, NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => Res},
?SLOG(critical, CrashMeta),
emqx_alarm:activate(cluster_rpc_apply_failed, CrashMeta);
log_and_alarm(Res, Meta) ->
NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)},
?SLOG(error, NotOkMeta), ?SLOG(error, NotOkMeta),
emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta). emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}).
wait_for_all_nodes_commit(TnxId, Delay, Remain) -> wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
ok = timer:sleep(Delay), ok = timer:sleep(Delay),
case legging_node(TnxId) of case lagging_node(TnxId) of
[_|_] when Remain > 0 -> [_ | _] when Remain > 0 ->
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
[] -> ok; [] -> ok;
Nodes -> {error, Nodes} Nodes -> {error, Nodes}
@ -352,18 +349,18 @@ wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
false when Remain > 0 -> false when Remain > 0 ->
wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay); wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay);
false -> false ->
case legging_node(TnxId) of case lagging_node(TnxId) of
[] -> ok; %% All commit but The succeedNum > length(nodes()). [] -> ok; %% All commit but The succeedNum > length(nodes()).
Nodes -> {error, Nodes} Nodes -> {error, Nodes}
end end
end. end.
legging_node(TnxId) -> lagging_node(TnxId) ->
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]), {atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]),
Nodes. Nodes.
synced_nodes(TnxId) -> synced_nodes(TnxId) ->
{atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]), {atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]),
Nodes. Nodes.
commit_status_trans(Operator, TnxId) -> commit_status_trans(Operator, TnxId) ->