fix: occasionally return stale view when updating configurations on different nodes concurrently

This commit is contained in:
zhongwencool 2023-12-07 16:06:04 +08:00
parent 17e6703ba2
commit 45c4fb33a8
3 changed files with 137 additions and 30 deletions

View File

@ -343,13 +343,8 @@ handle_call(reset, _From, State) ->
_ = mria:clear_table(?CLUSTER_COMMIT),
_ = mria:clear_table(?CLUSTER_MFA),
{reply, ok, State, {continue, ?CATCH_UP}};
handle_call(?INITIATE(MFA), _From, State = #{node := Node}) ->
case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of
{atomic, {ok, TnxId, Result}} ->
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
{aborted, Error} ->
{reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}
end;
handle_call(?INITIATE(MFA), _From, State) ->
do_initiate(MFA, State, 1, #{});
handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
Timeout = catch_up(State, true),
{atomic, LatestId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]),
@ -465,11 +460,40 @@ get_oldest_mfa_id() ->
Id -> Id
end.
do_initiate(_MFA, State, Count, Failure) when Count > 10 ->
%% refuse to initiate cluster call from this node
%% because it's likely that the caller is based on
%% a stale view event we retry 10 time.
Error = stale_view_of_cluster_msg(Failure, Count),
{reply, {init_failure, Error}, State, {continue, ?CATCH_UP}};
do_initiate(MFA, State = #{node := Node}, Count, Failure0) ->
case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of
{atomic, {ok, TnxId, Result}} ->
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
{atomic, {retry, Failure1}} when Failure0 =:= Failure1 ->
%% Useless retry, so we return early.
Error = stale_view_of_cluster_msg(Failure0, Count),
{reply, {init_failure, Error}, State, {continue, ?CATCH_UP}};
{atomic, {retry, Failure1}} ->
catch_up(State),
do_initiate(MFA, State, Count + 1, Failure1);
{aborted, Error} ->
{reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}
end.
stale_view_of_cluster_msg(Meta, Count) ->
Reason = Meta#{
msg => stale_view_of_cluster_state,
retry_times => Count
},
?SLOG(warning, Reason),
Reason.
%% The entry point of a config change transaction.
init_mfa(Node, MFA) ->
mnesia:write_lock_table(?CLUSTER_MFA),
LatestId = get_cluster_tnx_id(),
MyTnxId = get_node_tnx_id(node()),
MyTnxId = get_node_tnx_id(Node),
case MyTnxId =:= LatestId of
true ->
TnxId = LatestId + 1,
@ -486,16 +510,8 @@ init_mfa(Node, MFA) ->
{false, Error} -> mnesia:abort(Error)
end;
false ->
%% refuse to initiate cluster call from this node
%% because it's likely that the caller is based on
%% a stale view.
Reason = #{
msg => stale_view_of_cluster_state,
cluster_tnx_id => LatestId,
node_tnx_id => MyTnxId
},
?SLOG(warning, Reason),
mnesia:abort({error, Reason})
Meta = #{cluster_tnx_id => LatestId, node_tnx_id => MyTnxId},
{retry, Meta}
end.
transaction(Func, Args) ->

View File

@ -35,9 +35,10 @@ all() ->
t_commit_ok_apply_fail_on_other_node_then_recover,
t_del_stale_mfa,
t_skip_failed_commit,
t_fast_forward_commit
t_fast_forward_commit,
t_commit_concurrency
].
suite() -> [{timetrap, {minutes, 3}}].
suite() -> [{timetrap, {minutes, 5}}].
groups() -> [].
init_per_suite(Config) ->
@ -63,6 +64,7 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
stop(),
start(),
Config.
@ -119,17 +121,101 @@ t_commit_crash_test(_Config) ->
t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = multicall(M, F, A, 1, 1000),
{atomic, [Status]} = emqx_cluster_rpc:status(),
?assertEqual(MFA, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)),
{atomic, AllStatus} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual(
[
{1, {Node, emqx_cluster_rpc2}},
{1, {Node, emqx_cluster_rpc3}},
{2, Node}
],
lists:sort([{T, N} || #{tnx_id := T, node := N} <- AllStatus])
),
erlang:send(?NODE2, test),
Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
Res = gen_server:call(?NODE2, Call),
?assertEqual({init_failure, "MFA return not ok"}, Res),
Res1 = gen_server:call(?NODE2, Call),
Res2 = gen_server:call(?NODE3, Call),
%% Node2 is retry on tnx_id 1, and should not run Next MFA.
?assertEqual(
{init_failure, #{
msg => stale_view_of_cluster_state,
retry_times => 2,
cluster_tnx_id => 2,
node_tnx_id => 1
}},
Res1
),
?assertEqual(Res1, Res2),
ok.
t_commit_concurrency(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
%% call concurrently without stale tnx_id error
Workers = lists:seq(1, 256),
lists:foreach(
fun(Seq) ->
{EchoM, EchoF, EchoA} = {?MODULE, echo_delay, [Pid, Seq]},
Call = emqx_cluster_rpc:make_initiate_call_req(EchoM, EchoF, EchoA),
spawn_link(fun() ->
?assertMatch({ok, _, ok}, gen_server:call(?NODE1, Call, infinity))
end),
spawn_link(fun() ->
?assertMatch({ok, _, ok}, gen_server:call(?NODE2, Call, infinity))
end),
spawn_link(fun() ->
?assertMatch({ok, _, ok}, gen_server:call(?NODE3, Call, infinity))
end)
end,
Workers
),
%% receive seq msg in order
List = lists:sort(receive_seq_msg([])),
?assertEqual(256 * 3 * 3, length(List), List),
{atomic, Status} = emqx_cluster_rpc:status(),
lists:map(
fun(#{tnx_id := TnxId} = S) ->
?assertEqual(256 * 3 + 1, TnxId, S)
end,
Status
),
AllMsgIndex = lists:flatten(lists:duplicate(9, Workers)),
Result =
lists:foldl(
fun(Index, Acc) ->
?assertEqual(true, lists:keymember(Index, 1, Acc), {Index, Acc}),
lists:keydelete(Index, 1, Acc)
end,
List,
AllMsgIndex
),
?assertEqual([], Result),
receive
Unknown -> throw({receive_unknown_msg, Unknown})
after 1000 -> ok
end,
ok.
receive_seq_msg(Acc) ->
receive
{msg, Seq, Time, Pid} ->
receive_seq_msg([{Seq, Time, Pid} | Acc])
after 3000 ->
Acc
end.
t_catch_up_status_handle_next_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
@ -296,9 +382,8 @@ stop() ->
erlang:exit(P, kill)
end
end
|| N <- [?NODE1, ?NODE2, ?NODE3]
],
gen_server:stop(emqx_cluster_rpc_cleaner, normal, 5000).
|| N <- [?NODE1, ?NODE2, ?NODE3, emqx_cluster_rpc_cleaner]
].
receive_msg(0, _Msg) ->
ok;
@ -306,7 +391,7 @@ receive_msg(Count, Msg) when Count > 0 ->
receive
Msg ->
receive_msg(Count - 1, Msg)
after 800 ->
after 1000 ->
timeout
end.
@ -314,6 +399,11 @@ echo(Pid, Msg) ->
erlang:send(Pid, Msg),
ok.
echo_delay(Pid, Msg) ->
timer:sleep(rand:uniform(150)),
erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}),
ok.
failed_on_node(Pid) ->
case Pid =:= self() of
true -> ok;

View File

@ -0,0 +1 @@
Fixed occasionally return stale view when updating configurations on different nodes concurrently