diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 756a5ec30..373bafdba 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -343,13 +343,8 @@ handle_call(reset, _From, State) -> _ = mria:clear_table(?CLUSTER_COMMIT), _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; -handle_call(?INITIATE(MFA), _From, State = #{node := Node}) -> - case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of - {atomic, {ok, TnxId, Result}} -> - {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; - {aborted, Error} -> - {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}} - end; +handle_call(?INITIATE(MFA), _From, State) -> + do_initiate(MFA, State, 1, #{}); handle_call(skip_failed_commit, _From, State = #{node := Node}) -> Timeout = catch_up(State, true), {atomic, LatestId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]), @@ -465,11 +460,40 @@ get_oldest_mfa_id() -> Id -> Id end. +do_initiate(_MFA, State, Count, Failure) when Count > 10 -> + %% refuse to initiate cluster call from this node + %% because it's likely that the caller is based on + %% a stale view event we retry 10 time. + Error = stale_view_of_cluster_msg(Failure, Count), + {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}; +do_initiate(MFA, State = #{node := Node}, Count, Failure0) -> + case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of + {atomic, {ok, TnxId, Result}} -> + {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; + {atomic, {retry, Failure1}} when Failure0 =:= Failure1 -> + %% Useless retry, so we return early. + Error = stale_view_of_cluster_msg(Failure0, Count), + {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}}; + {atomic, {retry, Failure1}} -> + catch_up(State), + do_initiate(MFA, State, Count + 1, Failure1); + {aborted, Error} -> + {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}} + end. + +stale_view_of_cluster_msg(Meta, Count) -> + Reason = Meta#{ + msg => stale_view_of_cluster_state, + retry_times => Count + }, + ?SLOG(warning, Reason), + Reason. + %% The entry point of a config change transaction. init_mfa(Node, MFA) -> mnesia:write_lock_table(?CLUSTER_MFA), LatestId = get_cluster_tnx_id(), - MyTnxId = get_node_tnx_id(node()), + MyTnxId = get_node_tnx_id(Node), case MyTnxId =:= LatestId of true -> TnxId = LatestId + 1, @@ -486,16 +510,8 @@ init_mfa(Node, MFA) -> {false, Error} -> mnesia:abort(Error) end; false -> - %% refuse to initiate cluster call from this node - %% because it's likely that the caller is based on - %% a stale view. - Reason = #{ - msg => stale_view_of_cluster_state, - cluster_tnx_id => LatestId, - node_tnx_id => MyTnxId - }, - ?SLOG(warning, Reason), - mnesia:abort({error, Reason}) + Meta = #{cluster_tnx_id => LatestId, node_tnx_id => MyTnxId}, + {retry, Meta} end. transaction(Func, Args) -> diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 8cdfcaeea..99a0766ec 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -35,9 +35,10 @@ all() -> t_commit_ok_apply_fail_on_other_node_then_recover, t_del_stale_mfa, t_skip_failed_commit, - t_fast_forward_commit + t_fast_forward_commit, + t_commit_concurrency ]. -suite() -> [{timetrap, {minutes, 3}}]. +suite() -> [{timetrap, {minutes, 5}}]. groups() -> []. init_per_suite(Config) -> @@ -63,6 +64,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(_TestCase, Config) -> + stop(), start(), Config. @@ -119,17 +121,101 @@ t_commit_crash_test(_Config) -> t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), - MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, + Pid = self(), + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, + {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), + ?assertEqual(ok, receive_msg(3, test)), + + {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {ok, _, ok} = multicall(M, F, A, 1, 1000), - {atomic, [Status]} = emqx_cluster_rpc:status(), - ?assertEqual(MFA, maps:get(mfa, Status)), - ?assertEqual(node(), maps:get(node, Status)), + {atomic, AllStatus} = emqx_cluster_rpc:status(), + Node = node(), + ?assertEqual( + [ + {1, {Node, emqx_cluster_rpc2}}, + {1, {Node, emqx_cluster_rpc3}}, + {2, Node} + ], + lists:sort([{T, N} || #{tnx_id := T, node := N} <- AllStatus]) + ), erlang:send(?NODE2, test), Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A), - Res = gen_server:call(?NODE2, Call), - ?assertEqual({init_failure, "MFA return not ok"}, Res), + Res1 = gen_server:call(?NODE2, Call), + Res2 = gen_server:call(?NODE3, Call), + %% Node2 is retry on tnx_id 1, and should not run Next MFA. + ?assertEqual( + {init_failure, #{ + msg => stale_view_of_cluster_state, + retry_times => 2, + cluster_tnx_id => 2, + node_tnx_id => 1 + }}, + Res1 + ), + ?assertEqual(Res1, Res2), ok. +t_commit_concurrency(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + Pid = self(), + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, + {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), + ?assertEqual(ok, receive_msg(3, test)), + + %% call concurrently without stale tnx_id error + Workers = lists:seq(1, 256), + lists:foreach( + fun(Seq) -> + {EchoM, EchoF, EchoA} = {?MODULE, echo_delay, [Pid, Seq]}, + Call = emqx_cluster_rpc:make_initiate_call_req(EchoM, EchoF, EchoA), + spawn_link(fun() -> + ?assertMatch({ok, _, ok}, gen_server:call(?NODE1, Call, infinity)) + end), + spawn_link(fun() -> + ?assertMatch({ok, _, ok}, gen_server:call(?NODE2, Call, infinity)) + end), + spawn_link(fun() -> + ?assertMatch({ok, _, ok}, gen_server:call(?NODE3, Call, infinity)) + end) + end, + Workers + ), + %% receive seq msg in order + List = lists:sort(receive_seq_msg([])), + ?assertEqual(256 * 3 * 3, length(List), List), + {atomic, Status} = emqx_cluster_rpc:status(), + lists:map( + fun(#{tnx_id := TnxId} = S) -> + ?assertEqual(256 * 3 + 1, TnxId, S) + end, + Status + ), + AllMsgIndex = lists:flatten(lists:duplicate(9, Workers)), + Result = + lists:foldl( + fun(Index, Acc) -> + ?assertEqual(true, lists:keymember(Index, 1, Acc), {Index, Acc}), + lists:keydelete(Index, 1, Acc) + end, + List, + AllMsgIndex + ), + ?assertEqual([], Result), + receive + Unknown -> throw({receive_unknown_msg, Unknown}) + after 1000 -> ok + end, + ok. + +receive_seq_msg(Acc) -> + receive + {msg, Seq, Time, Pid} -> + receive_seq_msg([{Seq, Time, Pid} | Acc]) + after 3000 -> + Acc + end. + t_catch_up_status_handle_next_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), @@ -296,9 +382,8 @@ stop() -> erlang:exit(P, kill) end end - || N <- [?NODE1, ?NODE2, ?NODE3] - ], - gen_server:stop(emqx_cluster_rpc_cleaner, normal, 5000). + || N <- [?NODE1, ?NODE2, ?NODE3, emqx_cluster_rpc_cleaner] + ]. receive_msg(0, _Msg) -> ok; @@ -306,7 +391,7 @@ receive_msg(Count, Msg) when Count > 0 -> receive Msg -> receive_msg(Count - 1, Msg) - after 800 -> + after 1000 -> timeout end. @@ -314,6 +399,11 @@ echo(Pid, Msg) -> erlang:send(Pid, Msg), ok. +echo_delay(Pid, Msg) -> + timer:sleep(rand:uniform(150)), + erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}), + ok. + failed_on_node(Pid) -> case Pid =:= self() of true -> ok; diff --git a/changes/ce/fix-12121.en.md b/changes/ce/fix-12121.en.md new file mode 100644 index 000000000..3f0782826 --- /dev/null +++ b/changes/ce/fix-12121.en.md @@ -0,0 +1 @@ +Fixed occasionally return stale view when updating configurations on different nodes concurrently