From 6bc7378e63f7b20933ec56fdcbac4754aca9e38a Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 10 Sep 2021 09:40:30 +0800 Subject: [PATCH] fix(cluster-call): fix typo and add is_success/1 help function --- apps/emqx_machine/src/emqx_cluster_rpc.erl | 51 ++++++++++------------ 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index d5721c8c2..c747f7654 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -85,7 +85,7 @@ multicall(M, F, A) -> TnxId :: pos_integer(), Timeout :: timeout(), Reason :: string(). -multicall(M, F, A, RequireNum, Timeout)when RequireNum >= 1 -> +multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 -> MFA = {initiate, {M, F, A}}, Begin = erlang:monotonic_time(), InitRes = @@ -102,7 +102,7 @@ multicall(M, F, A, RequireNum, Timeout)when RequireNum >= 1 -> end, End = erlang:monotonic_time(), MinDelay = erlang:convert_time_unit(Begin - End, native, millisecond) + 50, - %% Failed after 3 attempts. + %% Fail after 3 attempts. RetryTimeout = 3 * max(MinDelay, get_retry_ms()), OkOrFailed = case InitRes of @@ -191,7 +191,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA), - case Succeed orelse SkipResult of + case Succeed orelse SkipResult of true -> case transaction(fun commit/2, [Node, NextId]) of {atomic, ok} -> catch_up(State, false); @@ -312,34 +312,31 @@ apply_mfa(TnxId, {M, F, A}) -> Res = try erlang:apply(M, F, A) catch - C : E -> {crash, C, E} + Class:Reason:Stacktrace -> + {error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}} end, Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)}, - log_and_alarm(Res, Meta), - Succeed = (Res =:= ok orelse (is_tuple(Res) andalso Res =/= {} andalso element(1, Res) =:= ok)), - {Succeed, Res}. + IsSuccess = is_success(Res), + log_and_alarm(IsSuccess, Res, Meta), + {IsSuccess, Res}. -log_and_alarm(ok, Meta) -> - OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => <<"ok">>}, - ?SLOG(notice, OkMeta), - emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta); -log_and_alarm({ok, _} = Res, Meta) -> - OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)}, - ?SLOG(notice, OkMeta), - emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta); -log_and_alarm({crash, C, E}, Meta) -> - CrashMeta = Meta#{msg => <<"crash to apply MFA">>, exception => C, reason => ?TO_BIN(E)}, - ?SLOG(critical, CrashMeta), - emqx_alarm:activate(cluster_rpc_apply_failed, CrashMeta); -log_and_alarm(Res, Meta) -> - NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)}, +is_success(ok) -> true; +is_success({ok, _}) -> true; +is_success(_) -> false. + +log_and_alarm(true, Res, Meta) -> + OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => Res}, + ?SLOG(debug, OkMeta), + emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)}); +log_and_alarm(false, Res, Meta) -> + NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => Res}, ?SLOG(error, NotOkMeta), - emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta). + emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}). wait_for_all_nodes_commit(TnxId, Delay, Remain) -> ok = timer:sleep(Delay), - case legging_node(TnxId) of - [_|_] when Remain > 0 -> + case lagging_node(TnxId) of + [_ | _] when Remain > 0 -> wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); [] -> ok; Nodes -> {error, Nodes} @@ -352,18 +349,18 @@ wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> false when Remain > 0 -> wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay); false -> - case legging_node(TnxId) of + case lagging_node(TnxId) of [] -> ok; %% All commit but The succeedNum > length(nodes()). Nodes -> {error, Nodes} end end. -legging_node(TnxId) -> +lagging_node(TnxId) -> {atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]), Nodes. synced_nodes(TnxId) -> - {atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]), + {atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]), Nodes. commit_status_trans(Operator, TnxId) ->