diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index a73bd1e63..d5721c8c2 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -85,7 +85,7 @@ multicall(M, F, A) -> TnxId :: pos_integer(), Timeout :: timeout(), Reason :: string(). -multicall(M, F, A, SucceedNum, Timeout) -> +multicall(M, F, A, RequireNum, Timeout)when RequireNum >= 1 -> MFA = {initiate, {M, F, A}}, Begin = erlang:monotonic_time(), InitRes = @@ -101,9 +101,23 @@ multicall(M, F, A, SucceedNum, Timeout) -> end end, End = erlang:monotonic_time(), - Gap = erlang:convert_time_unit(Begin - End, native, millisecond) + 50, - RetryMs = get_retry_ms(), - confirm_commit(InitRes, SucceedNum, Gap, 3 * max(Gap, RetryMs)). + MinDelay = erlang:convert_time_unit(Begin - End, native, millisecond) + 50, + %% Failed after 3 attempts. + RetryTimeout = 3 * max(MinDelay, get_retry_ms()), + OkOrFailed = + case InitRes of + {ok, _TnxId, _} when RequireNum =:= 1 -> + ok; + {ok, TnxId, _} when RequireNum =:= all -> + wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout); + {ok, TnxId, _} when is_integer(RequireNum) -> + wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout); + Error -> Error + end, + case OkOrFailed of + ok -> InitRes; + _ -> OkOrFailed + end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> @@ -294,83 +308,63 @@ trans_query(TnxId) -> -define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))). -apply_mfa(TnxId, {M, F, A} = MFA) -> - Result = - try - Meta0 = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)}, - Res = erlang:apply(M, F, A), - Succeed = - case Res of - ok -> - OkMeta = Meta0#{msg => <<"succeeded to apply MFA">>, result => Res}, - ?SLOG(notice, OkMeta), - {true, OkMeta}; - {ok, _} -> - OkMeta1 = Meta0#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)}, - ?SLOG(notice, OkMeta1), - {true, OkMeta1}; - _ -> - NotOkMeta = Meta0#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)}, - ?SLOG(error, NotOkMeta), - {false, NotOkMeta} - end, - {Succeed, Res} +apply_mfa(TnxId, {M, F, A}) -> + Res = + try erlang:apply(M, F, A) catch - C : E -> - CrashMeta = #{msg => <<"crash to apply MFA">>, tnx_id => TnxId, exception => C, reason => ?TO_BIN(E), - module => M, function => F, args => ?TO_BIN(A)}, - ?SLOG(critical, CrashMeta), - {{false, CrashMeta}, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} - end, - case Result of - {{true, Meta}, OkRes} -> - emqx_alarm:deactivate(cluster_rpc_apply_failed, Meta), - {true, OkRes}; - {{false, Meta}, NotOkRes} -> - emqx_alarm:activate(cluster_rpc_apply_failed, Meta), - {false, NotOkRes} + C : E -> {crash, C, E} + end, + Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)}, + log_and_alarm(Res, Meta), + Succeed = (Res =:= ok orelse (is_tuple(Res) andalso Res =/= {} andalso element(1, Res) =:= ok)), + {Succeed, Res}. + +log_and_alarm(ok, Meta) -> + OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => <<"ok">>}, + ?SLOG(notice, OkMeta), + emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta); +log_and_alarm({ok, _} = Res, Meta) -> + OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)}, + ?SLOG(notice, OkMeta), + emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta); +log_and_alarm({crash, C, E}, Meta) -> + CrashMeta = Meta#{msg => <<"crash to apply MFA">>, exception => C, reason => ?TO_BIN(E)}, + ?SLOG(critical, CrashMeta), + emqx_alarm:activate(cluster_rpc_apply_failed, CrashMeta); +log_and_alarm(Res, Meta) -> + NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)}, + ?SLOG(error, NotOkMeta), + emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta). + +wait_for_all_nodes_commit(TnxId, Delay, Remain) -> + ok = timer:sleep(Delay), + case legging_node(TnxId) of + [_|_] when Remain > 0 -> + wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); + [] -> ok; + Nodes -> {error, Nodes} end. -confirm_commit({ok, TnxId, _Res} = Init, all, _Gap, Timeout) when Timeout =< 0 -> - case node_not_commit(TnxId) of - ok -> Init; - Error -> Error - end; -confirm_commit({ok, TnxId, _Res} = Init, all, Gap, Timeout) -> - timer:sleep(Gap), - case node_not_commit(TnxId) of - ok -> Init; - _Error -> confirm_commit(Init, all, Gap, Timeout - Gap) - end; -confirm_commit(Init, 1, _Gap, _Timeout) -> Init; -confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, _Gap, Timeout) when Timeout =< 0 -> - case node_already_commit(TnxId, SucceedNum) of - ok -> Init; - _Error -> - case node_not_commit(TnxId) of - ok -> Init; %% All commit but The succeedNum is greater than the length(nodes()). - Error -> Error +wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> + ok = timer:sleep(Delay), + case length(synced_nodes(TnxId)) >= RequiredNum of + true -> ok; + false when Remain > 0 -> + wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay); + false -> + case legging_node(TnxId) of + [] -> ok; %% All commit but The succeedNum > length(nodes()). + Nodes -> {error, Nodes} end - end; -confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, Gap, Timeout) -> - timer:sleep(Gap), - case node_already_commit(TnxId, SucceedNum) of - ok -> Init; - _Error -> confirm_commit(Init, SucceedNum, Gap, Timeout - Gap) - end; -confirm_commit(Error, _SucceedNum, _Gap, _Timeout) -> Error. - -node_not_commit(TnxId) -> - case transaction(fun commit_status_trans/2, ['<', TnxId]) of - {atomic, []} -> ok; - {atomic, Nodes} -> {error, Nodes} end. -node_already_commit(TnxId, SucceedNum) -> - case transaction(fun commit_status_trans/2, ['>=', TnxId]) of - {atomic, Nodes} when length(Nodes) >= SucceedNum -> ok; - {atomic, Nodes} -> {error, Nodes} - end. +legging_node(TnxId) -> + {atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]), + Nodes. + +synced_nodes(TnxId) -> + {atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]), + Nodes. commit_status_trans(Operator, TnxId) -> MatchHead = #cluster_rpc_commit{tnx_id = '$1', node = '$2', _ = '_'}, diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index d790c7069..ebb654af6 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -94,7 +94,7 @@ t_commit_crash_test(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, no_exist_function, []}, Error = emqx_cluster_rpc:multicall(M, F, A), - ?assertEqual({error, "TnxId(1) apply MFA({emqx_cluster_rpc_SUITE,no_exist_function,[]}) crash"}, Error), + ?assertEqual({error, {crash,error,undef}}, Error), ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), ok.