feat(cluster-call): support confirm success after all mfa apply ok
This commit is contained in:
parent
662f7438fa
commit
679edbc29c
|
@ -402,6 +402,8 @@ normalize_message(high_cpu_usage, #{usage := Usage}) ->
|
|||
list_to_binary(io_lib:format("~s cpu usage", [Usage]));
|
||||
normalize_message(too_many_processes, #{usage := Usage}) ->
|
||||
list_to_binary(io_lib:format("~s process usage", [Usage]));
|
||||
normalize_message(cluster_rpc_apply_failed, #{tnx_id := TnxId}) ->
|
||||
list_to_binary(io_lib:format("cluster_rpc_apply_failed:~w", [TnxId]));
|
||||
normalize_message(partition, #{occurred := Node}) ->
|
||||
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
||||
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
-record(cluster_rpc_commit, {
|
||||
node :: node(),
|
||||
tnx_id :: pos_integer()
|
||||
tnx_id :: pos_integer() | '$1'
|
||||
}).
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
%% API
|
||||
-export([start_link/0, mnesia/1]).
|
||||
-export([multicall/3, multicall/4, query/1, reset/0, status/0]).
|
||||
-export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
handle_continue/2, code_change/3]).
|
||||
|
@ -63,8 +63,7 @@ mnesia(copy) ->
|
|||
ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies).
|
||||
|
||||
start_link() ->
|
||||
RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000),
|
||||
start_link(node(), ?MODULE, RetryMs).
|
||||
start_link(node(), ?MODULE, get_retry_ms()).
|
||||
|
||||
start_link(Node, Name, RetryMs) ->
|
||||
gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
|
||||
|
@ -76,28 +75,35 @@ start_link(Node, Name, RetryMs) ->
|
|||
TnxId :: pos_integer(),
|
||||
Reason :: string().
|
||||
multicall(M, F, A) ->
|
||||
multicall(M, F, A, timer:minutes(2)).
|
||||
multicall(M, F, A, all, timer:minutes(2)).
|
||||
|
||||
-spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId, term()} |{error, Reason} when
|
||||
-spec multicall(Module, Function, Args, SucceedNum, Timeout) -> {ok, TnxId, term()} |{error, Reason} when
|
||||
Module :: module(),
|
||||
Function :: atom(),
|
||||
Args :: [term()],
|
||||
SucceedNum :: pos_integer() | all,
|
||||
TnxId :: pos_integer(),
|
||||
Timeout :: timeout(),
|
||||
Reason :: string().
|
||||
multicall(M, F, A, Timeout) ->
|
||||
multicall(M, F, A, SucceedNum, Timeout) ->
|
||||
MFA = {initiate, {M, F, A}},
|
||||
case ekka_rlog:role() of
|
||||
core -> gen_server:call(?MODULE, MFA, Timeout);
|
||||
replicant ->
|
||||
%% the initiate transaction must happened on core node
|
||||
%% make sure MFA(in the transaction) and the transaction on the same node
|
||||
%% don't need rpc again inside transaction.
|
||||
case ekka_rlog_status:upstream_node(?COMMON_SHARD) of
|
||||
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
|
||||
disconnected -> {error, disconnected}
|
||||
end
|
||||
end.
|
||||
Begin = erlang:monotonic_time(),
|
||||
InitRes =
|
||||
case ekka_rlog:role() of
|
||||
core -> gen_server:call(?MODULE, MFA, Timeout);
|
||||
replicant ->
|
||||
%% the initiate transaction must happened on core node
|
||||
%% make sure MFA(in the transaction) and the transaction on the same node
|
||||
%% don't need rpc again inside transaction.
|
||||
case ekka_rlog_status:upstream_node(?COMMON_SHARD) of
|
||||
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
|
||||
disconnected -> {error, disconnected}
|
||||
end
|
||||
end,
|
||||
End = erlang:monotonic_time(),
|
||||
Gap = erlang:convert_time_unit(Begin - End, native, millisecond) + 50,
|
||||
RetryMs = get_retry_ms(),
|
||||
confirm_commit(InitRes, SucceedNum, Gap, 3 * max(Gap, RetryMs)).
|
||||
|
||||
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
|
||||
query(TnxId) ->
|
||||
|
@ -110,8 +116,15 @@ reset() -> gen_server:call(?MODULE, reset).
|
|||
status() ->
|
||||
transaction(fun trans_status/0, []).
|
||||
|
||||
%% Regardless of what MFA is returned, consider it a success),
|
||||
%% then move to the next tnxId.
|
||||
%% if the next TnxId failed, need call the function again to skip.
|
||||
-spec skip_failed_commit(node()) -> pos_integer().
|
||||
skip_failed_commit(Node) ->
|
||||
gen_server:call({?MODULE, Node}, skip_failed_commit).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
|
@ -135,6 +148,8 @@ handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
|
|||
{aborted, Reason} ->
|
||||
{reply, {error, Reason}, State, {continue, ?CATCH_UP}}
|
||||
end;
|
||||
handle_call(skip_failed_commit, _From, State) ->
|
||||
{reply, ok, State, catch_up(State, true)};
|
||||
handle_call(_, _From, State) ->
|
||||
{reply, ok, State, catch_up(State)}.
|
||||
|
||||
|
@ -155,15 +170,17 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
catch_up(#{node := Node, retry_interval := RetryMs} = State) ->
|
||||
catch_up(State) -> catch_up(State, false).
|
||||
|
||||
catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
|
||||
case transaction(fun read_next_mfa/1, [Node]) of
|
||||
{atomic, caught_up} -> ?TIMEOUT;
|
||||
{atomic, {still_lagging, NextId, MFA}} ->
|
||||
{Succeed, _} = apply_mfa(NextId, MFA),
|
||||
case Succeed of
|
||||
case Succeed orelse SkipResult of
|
||||
true ->
|
||||
case transaction(fun commit/2, [Node, NextId]) of
|
||||
{atomic, ok} -> catch_up(State);
|
||||
{atomic, ok} -> catch_up(State, false);
|
||||
Error ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed to commit applied call",
|
||||
|
@ -275,24 +292,91 @@ trans_query(TnxId) ->
|
|||
#{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
|
||||
end.
|
||||
|
||||
-define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))).
|
||||
|
||||
apply_mfa(TnxId, {M, F, A} = MFA) ->
|
||||
try
|
||||
Res = erlang:apply(M, F, A),
|
||||
Succeed =
|
||||
case Res of
|
||||
ok ->
|
||||
?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
|
||||
true;
|
||||
{ok, _} ->
|
||||
?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
|
||||
true;
|
||||
_ ->
|
||||
?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
|
||||
false
|
||||
end,
|
||||
{Succeed, Res}
|
||||
catch
|
||||
C : E ->
|
||||
?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}),
|
||||
{false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))}
|
||||
Result =
|
||||
try
|
||||
Meta0 = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)},
|
||||
Res = erlang:apply(M, F, A),
|
||||
Succeed =
|
||||
case Res of
|
||||
ok ->
|
||||
OkMeta = Meta0#{msg => <<"succeeded to apply MFA">>, result => Res},
|
||||
?SLOG(notice, OkMeta),
|
||||
{true, OkMeta};
|
||||
{ok, _} ->
|
||||
OkMeta1 = Meta0#{msg => <<"succeeded to apply MFA">>, result => ?TO_BIN(Res)},
|
||||
?SLOG(notice, OkMeta1),
|
||||
{true, OkMeta1};
|
||||
_ ->
|
||||
NotOkMeta = Meta0#{msg => <<"failed to apply MFA">>, result => ?TO_BIN(Res)},
|
||||
?SLOG(error, NotOkMeta),
|
||||
{false, NotOkMeta}
|
||||
end,
|
||||
{Succeed, Res}
|
||||
catch
|
||||
C : E ->
|
||||
CrashMeta = #{msg => <<"crash to apply MFA">>, tnx_id => TnxId, exception => C, reason => ?TO_BIN(E),
|
||||
module => M, function => F, args => ?TO_BIN(A)},
|
||||
?SLOG(critical, CrashMeta),
|
||||
{{false, CrashMeta}, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))}
|
||||
end,
|
||||
case Result of
|
||||
{{true, Meta}, OkRes} ->
|
||||
emqx_alarm:deactivate(cluster_rpc_apply_failed, Meta),
|
||||
{true, OkRes};
|
||||
{{false, Meta}, NotOkRes} ->
|
||||
emqx_alarm:activate(cluster_rpc_apply_failed, Meta),
|
||||
{false, NotOkRes}
|
||||
end.
|
||||
|
||||
confirm_commit({ok, TnxId, _Res} = Init, all, _Gap, Timeout) when Timeout =< 0 ->
|
||||
case node_not_commit(TnxId) of
|
||||
ok -> Init;
|
||||
Error -> Error
|
||||
end;
|
||||
confirm_commit({ok, TnxId, _Res} = Init, all, Gap, Timeout) ->
|
||||
timer:sleep(Gap),
|
||||
case node_not_commit(TnxId) of
|
||||
ok -> Init;
|
||||
_Error -> confirm_commit(Init, all, Gap, Timeout - Gap)
|
||||
end;
|
||||
confirm_commit(Init, 1, _Gap, _Timeout) -> Init;
|
||||
confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, _Gap, Timeout) when Timeout =< 0 ->
|
||||
case node_already_commit(TnxId, SucceedNum) of
|
||||
ok -> Init;
|
||||
_Error ->
|
||||
case node_not_commit(TnxId) of
|
||||
ok -> Init; %% All commit but The succeedNum is greater than the length(nodes()).
|
||||
Error -> Error
|
||||
end
|
||||
end;
|
||||
confirm_commit({ok, TnxId, _Res} = Init, SucceedNum, Gap, Timeout) ->
|
||||
timer:sleep(Gap),
|
||||
case node_already_commit(TnxId, SucceedNum) of
|
||||
ok -> Init;
|
||||
_Error -> confirm_commit(Init, SucceedNum, Gap, Timeout - Gap)
|
||||
end;
|
||||
confirm_commit(Error, _SucceedNum, _Gap, _Timeout) -> Error.
|
||||
|
||||
node_not_commit(TnxId) ->
|
||||
case transaction(fun commit_status_trans/2, ['<', TnxId]) of
|
||||
{atomic, []} -> ok;
|
||||
{atomic, Nodes} -> {error, Nodes}
|
||||
end.
|
||||
|
||||
node_already_commit(TnxId, SucceedNum) ->
|
||||
case transaction(fun commit_status_trans/2, ['>=', TnxId]) of
|
||||
{atomic, Nodes} when length(Nodes) >= SucceedNum -> ok;
|
||||
{atomic, Nodes} -> {error, Nodes}
|
||||
end.
|
||||
|
||||
commit_status_trans(Operator, TnxId) ->
|
||||
MatchHead = #cluster_rpc_commit{tnx_id = '$1', node = '$2', _ = '_'},
|
||||
Guard = {Operator, '$1', TnxId},
|
||||
Result = '$2',
|
||||
mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
|
||||
|
||||
get_retry_ms() ->
|
||||
application:get_env(emqx_machine, cluster_call_retry_interval, 1000).
|
||||
|
|
|
@ -32,7 +32,8 @@ all() -> [
|
|||
t_commit_crash_test,
|
||||
t_commit_ok_but_apply_fail_on_other_node,
|
||||
t_commit_ok_apply_fail_on_other_node_then_recover,
|
||||
t_del_stale_mfa
|
||||
t_del_stale_mfa,
|
||||
t_skip_failed_commit
|
||||
].
|
||||
suite() -> [{timetrap, {minutes, 3}}].
|
||||
groups() -> [].
|
||||
|
@ -45,19 +46,16 @@ init_per_suite(Config) ->
|
|||
application:set_env(emqx_machine, cluster_call_max_history, 100),
|
||||
application:set_env(emqx_machine, cluster_call_clean_interval, 1000),
|
||||
application:set_env(emqx_machine, cluster_call_retry_interval, 900),
|
||||
%%dbg:tracer(),
|
||||
%%dbg:p(all, c),
|
||||
%%dbg:tpl(emqx_cluster_rpc, cx),
|
||||
%%dbg:tpl(gen_statem, loop_receive, cx),
|
||||
%%dbg:tpl(gen_statem, loop_state_callback, cx),
|
||||
%%dbg:tpl(gen_statem, loop_callback_mode_result, cx),
|
||||
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
|
||||
meck:expect(emqx_alarm, activate, 2, ok),
|
||||
meck:expect(emqx_alarm, deactivate, 2, ok),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ekka:stop(),
|
||||
ekka_mnesia:ensure_stopped(),
|
||||
ekka_mnesia:delete_schema(),
|
||||
%%dbg:stop(),
|
||||
meck:unload(emqx_alarm),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
|
@ -78,7 +76,6 @@ t_base_test(_Config) ->
|
|||
?assertEqual(node(), maps:get(initiator, Query)),
|
||||
?assert(maps:is_key(created_at, Query)),
|
||||
?assertEqual(ok, receive_msg(3, test)),
|
||||
sleep(400),
|
||||
{atomic, Status} = emqx_cluster_rpc:status(),
|
||||
?assertEqual(3, length(Status)),
|
||||
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)),
|
||||
|
@ -105,12 +102,12 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
|
|||
emqx_cluster_rpc:reset(),
|
||||
{atomic, []} = emqx_cluster_rpc:status(),
|
||||
MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
|
||||
{atomic, [Status]} = emqx_cluster_rpc:status(),
|
||||
?assertEqual(MFA, maps:get(mfa, Status)),
|
||||
?assertEqual(node(), maps:get(node, Status)),
|
||||
erlang:send(?NODE2, test),
|
||||
Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}),
|
||||
Res = gen_server:call(?NODE2, {initiate, {M, F, A}}),
|
||||
?assertEqual({error, "MFA return not ok"}, Res),
|
||||
ok.
|
||||
|
||||
|
@ -118,8 +115,8 @@ t_catch_up_status_handle_next_commit(_Config) ->
|
|||
emqx_cluster_rpc:reset(),
|
||||
{atomic, []} = emqx_cluster_rpc:status(),
|
||||
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}),
|
||||
{ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
|
||||
{ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}),
|
||||
ok.
|
||||
|
||||
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
|
||||
|
@ -127,8 +124,8 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
|
|||
{atomic, []} = emqx_cluster_rpc:status(),
|
||||
Now = erlang:system_time(second),
|
||||
{M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]},
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
|
||||
{atomic, [Status|L]} = emqx_cluster_rpc:status(),
|
||||
?assertEqual([], L),
|
||||
?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
|
||||
|
@ -177,6 +174,17 @@ t_del_stale_mfa(_Config) ->
|
|||
end || I <- lists:seq(51, 150)],
|
||||
ok.
|
||||
|
||||
t_skip_failed_commit(_Config) ->
|
||||
emqx_cluster_rpc:reset(),
|
||||
{atomic, []} = emqx_cluster_rpc:status(),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
|
||||
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
|
||||
ok = gen_server:call(?NODE2, skip_failed_commit, 5000),
|
||||
{atomic, List} = emqx_cluster_rpc:status(),
|
||||
?assertEqual([1, 2, 2], lists:sort(lists:map(fun(#{tnx_id := TnxId}) -> TnxId end, List))),
|
||||
ok.
|
||||
|
||||
start() ->
|
||||
{ok, Pid1} = emqx_cluster_rpc:start_link(),
|
||||
{ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
|
||||
|
|
Loading…
Reference in New Issue