From 679edbc29c5697120cc5502787bbe347e94f89c6 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 2 Sep 2021 10:03:03 +0800 Subject: [PATCH] feat(cluster-call): support confirm success after all mfa apply ok --- apps/emqx/src/emqx_alarm.erl | 2 + .../emqx_machine/include/emqx_cluster_rpc.hrl | 2 +- apps/emqx_machine/src/emqx_cluster_rpc.erl | 164 +++++++++++++----- .../test/emqx_cluster_rpc_SUITE.erl | 38 ++-- 4 files changed, 150 insertions(+), 56 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 11a2805f3..10bb6facc 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -402,6 +402,8 @@ normalize_message(high_cpu_usage, #{usage := Usage}) -> list_to_binary(io_lib:format("~s cpu usage", [Usage])); normalize_message(too_many_processes, #{usage := Usage}) -> list_to_binary(io_lib:format("~s process usage", [Usage])); +normalize_message(cluster_rpc_apply_failed, #{tnx_id := TnxId}) -> + list_to_binary(io_lib:format("cluster_rpc_apply_failed:~w", [TnxId])); normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> diff --git a/apps/emqx_machine/include/emqx_cluster_rpc.hrl b/apps/emqx_machine/include/emqx_cluster_rpc.hrl index 5c04346b7..046331871 100644 --- a/apps/emqx_machine/include/emqx_cluster_rpc.hrl +++ b/apps/emqx_machine/include/emqx_cluster_rpc.hrl @@ -29,7 +29,7 @@ -record(cluster_rpc_commit, { node :: node(), - tnx_id :: pos_integer() + tnx_id :: pos_integer() | '$1' }). -endif. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index 346ca5025..a73bd1e63 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -18,7 +18,7 @@ %% API -export([start_link/0, mnesia/1]). --export([multicall/3, multicall/4, query/1, reset/0, status/0]). +-export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, handle_continue/2, code_change/3]). @@ -63,8 +63,7 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). start_link() -> - RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), - start_link(node(), ?MODULE, RetryMs). + start_link(node(), ?MODULE, get_retry_ms()). start_link(Node, Name, RetryMs) -> gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). @@ -76,28 +75,35 @@ start_link(Node, Name, RetryMs) -> TnxId :: pos_integer(), Reason :: string(). multicall(M, F, A) -> - multicall(M, F, A, timer:minutes(2)). + multicall(M, F, A, all, timer:minutes(2)). --spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId, term()} |{error, Reason} when +-spec multicall(Module, Function, Args, SucceedNum, Timeout) -> {ok, TnxId, term()} |{error, Reason} when Module :: module(), Function :: atom(), Args :: [term()], + SucceedNum :: pos_integer() | all, TnxId :: pos_integer(), Timeout :: timeout(), Reason :: string(). -multicall(M, F, A, Timeout) -> +multicall(M, F, A, SucceedNum, Timeout) -> MFA = {initiate, {M, F, A}}, - case ekka_rlog:role() of - core -> gen_server:call(?MODULE, MFA, Timeout); - replicant -> - %% the initiate transaction must happened on core node - %% make sure MFA(in the transaction) and the transaction on the same node - %% don't need rpc again inside transaction. - case ekka_rlog_status:upstream_node(?COMMON_SHARD) of - {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); - disconnected -> {error, disconnected} - end - end. + Begin = erlang:monotonic_time(), + InitRes = + case ekka_rlog:role() of + core -> gen_server:call(?MODULE, MFA, Timeout); + replicant -> + %% the initiate transaction must happened on core node + %% make sure MFA(in the transaction) and the transaction on the same node + %% don't need rpc again inside transaction. + case ekka_rlog_status:upstream_node(?COMMON_SHARD) of + {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); + disconnected -> {error, disconnected} + end + end, + End = erlang:monotonic_time(), + Gap = erlang:convert_time_unit(Begin - End, native, millisecond) + 50, + RetryMs = get_retry_ms(), + confirm_commit(InitRes, SucceedNum, Gap, 3 * max(Gap, RetryMs)). -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> @@ -110,8 +116,15 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun trans_status/0, []). +%% Regardless of what MFA is returned, consider it a success), +%% then move to the next tnxId. +%% if the next TnxId failed, need call the function again to skip. +-spec skip_failed_commit(node()) -> pos_integer(). +skip_failed_commit(Node) -> + gen_server:call({?MODULE, Node}, skip_failed_commit). + %%%=================================================================== -%%% gen_statem callbacks +%%% gen_server callbacks %%%=================================================================== %% @private @@ -135,6 +148,8 @@ handle_call({initiate, MFA}, _From, State = #{node := Node}) -> {aborted, Reason} -> {reply, {error, Reason}, State, {continue, ?CATCH_UP}} end; +handle_call(skip_failed_commit, _From, State) -> + {reply, ok, State, catch_up(State, true)}; handle_call(_, _From, State) -> {reply, ok, State, catch_up(State)}. @@ -155,15 +170,17 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -catch_up(#{node := Node, retry_interval := RetryMs} = State) -> +catch_up(State) -> catch_up(State, false). + +catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> case transaction(fun read_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA), - case Succeed of + case Succeed orelse SkipResult of true -> case transaction(fun commit/2, [Node, NextId]) of - {atomic, ok} -> catch_up(State); + {atomic, ok} -> catch_up(State, false); Error -> ?SLOG(error, #{ msg => "failed to commit applied call", @@ -275,24 +292,91 @@ trans_query(TnxId) -> #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} end. +-define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))). + apply_mfa(TnxId, {M, F, A} = MFA) -> - try - Res = erlang:apply(M, F, A), - Succeed = - case Res of - ok -> - ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), - true; - {ok, _} -> - ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), - true; - _ -> - ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), - false - end, - {Succeed, Res} - catch - C : E -> - ?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}), - {false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} + Result = + try + Meta0 = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)}, + Res = erlang:apply(M, F, A), + Succeed = + case Res of + ok -> + OkMeta = Meta0#{msg => <<"succeeded to apply MFA">>, result => Res}, + ?SLOG(notice, OkMeta), + {true, OkMeta}; + {ok, _} -> + OkMeta1 = Meta0#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)}, + ?SLOG(notice, OkMeta1), + {true, OkMeta1}; + _ -> + NotOkMeta = Meta0#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)}, + ?SLOG(error, NotOkMeta), + {false, NotOkMeta} + end, + {Succeed, Res} + catch + C : E -> + CrashMeta = #{msg => <<"crash to apply MFA">>, tnx_id => TnxId, exception => C, reason => ?TO_BIN(E), + module => M, function => F, args => ?TO_BIN(A)}, + ?SLOG(critical, CrashMeta), + {{false, CrashMeta}, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} + end, + case Result of + {{true, Meta}, OkRes} -> + emqx_alarm:deactivate(cluster_rpc_apply_failed, Meta), + {true, OkRes}; + {{false, Meta}, NotOkRes} -> + emqx_alarm:activate(cluster_rpc_apply_failed, Meta), + {false, NotOkRes} end. + +confirm_commit({ok, TnxId, _Res} = Init, all, _Gap, Timeout) when Timeout =< 0 -> + case node_not_commit(TnxId) of + ok -> Init; + Error -> Error + end; +confirm_commit({ok, TnxId, _Res} = Init, all, Gap, Timeout) -> + timer:sleep(Gap), + case node_not_commit(TnxId) of + ok -> Init; + _Error -> confirm_commit(Init, all, Gap, Timeout - Gap) + end; +confirm_commit(Init, 1, _Gap, _Timeout) -> Init; +confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, _Gap, Timeout) when Timeout =< 0 -> + case node_already_commit(TnxId, SucceedNum) of + ok -> Init; + _Error -> + case node_not_commit(TnxId) of + ok -> Init; %% All commit but The succeedNum is greater than the length(nodes()). + Error -> Error + end + end; +confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, Gap, Timeout) -> + timer:sleep(Gap), + case node_already_commit(TnxId, SucceedNum) of + ok -> Init; + _Error -> confirm_commit(Init, SucceedNum, Gap, Timeout - Gap) + end; +confirm_commit(Error, _SucceedNum, _Gap, _Timeout) -> Error. + +node_not_commit(TnxId) -> + case transaction(fun commit_status_trans/2, ['<', TnxId]) of + {atomic, []} -> ok; + {atomic, Nodes} -> {error, Nodes} + end. + +node_already_commit(TnxId, SucceedNum) -> + case transaction(fun commit_status_trans/2, ['>=', TnxId]) of + {atomic, Nodes} when length(Nodes) >= SucceedNum -> ok; + {atomic, Nodes} -> {error, Nodes} + end. + +commit_status_trans(Operator, TnxId) -> + MatchHead = #cluster_rpc_commit{tnx_id = '$1', node = '$2', _ = '_'}, + Guard = {Operator, '$1', TnxId}, + Result = '$2', + mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). + +get_retry_ms() -> + application:get_env(emqx_machine, cluster_call_retry_interval, 1000). diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index 26ad28f3e..d790c7069 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -32,7 +32,8 @@ all() -> [ t_commit_crash_test, t_commit_ok_but_apply_fail_on_other_node, t_commit_ok_apply_fail_on_other_node_then_recover, - t_del_stale_mfa + t_del_stale_mfa, + t_skip_failed_commit ]. suite() -> [{timetrap, {minutes, 3}}]. groups() -> []. @@ -45,19 +46,16 @@ init_per_suite(Config) -> application:set_env(emqx_machine, cluster_call_max_history, 100), application:set_env(emqx_machine, cluster_call_clean_interval, 1000), application:set_env(emqx_machine, cluster_call_retry_interval, 900), - %%dbg:tracer(), - %%dbg:p(all, c), - %%dbg:tpl(emqx_cluster_rpc, cx), - %%dbg:tpl(gen_statem, loop_receive, cx), - %%dbg:tpl(gen_statem, loop_state_callback, cx), - %%dbg:tpl(gen_statem, loop_callback_mode_result, cx), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 2, ok), + meck:expect(emqx_alarm, deactivate, 2, ok), Config. end_per_suite(_Config) -> ekka:stop(), ekka_mnesia:ensure_stopped(), ekka_mnesia:delete_schema(), - %%dbg:stop(), + meck:unload(emqx_alarm), ok. init_per_testcase(_TestCase, Config) -> @@ -78,7 +76,6 @@ t_base_test(_Config) -> ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), - sleep(400), {atomic, Status} = emqx_cluster_rpc:status(), ?assertEqual(3, length(Status)), ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)), @@ -105,12 +102,12 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), {atomic, [Status]} = emqx_cluster_rpc:status(), ?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), erlang:send(?NODE2, test), - Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + Res = gen_server:call(?NODE2, {initiate, {M, F, A}}), ?assertEqual({error, "MFA return not ok"}, Res), ok. @@ -118,8 +115,8 @@ t_catch_up_status_handle_next_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, - {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), - {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}), ok. t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> @@ -127,8 +124,8 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), Now = erlang:system_time(second), {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, - {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), - {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), {atomic, [Status|L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), @@ -177,6 +174,17 @@ t_del_stale_mfa(_Config) -> end || I <- lists:seq(51, 150)], ok. +t_skip_failed_commit(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), + {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + ok = gen_server:call(?NODE2, skip_failed_commit, 5000), + {atomic, List} = emqx_cluster_rpc:status(), + ?assertEqual([1, 2, 2], lists:sort(lists:map(fun(#{tnx_id := TnxId}) -> TnxId end, List))), + ok. + start() -> {ok, Pid1} = emqx_cluster_rpc:start_link(), {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),