From 75a524c916744c01fba7e03d08477608c1484cd4 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 21 Jun 2024 18:04:24 +0800 Subject: [PATCH] test: add more debug msg to flaky cluster_rpc SUITE --- .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index b054988be..cfdc5820e 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -74,13 +74,14 @@ end_per_testcase(_Config) -> t_base_test(_Config) -> ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), - MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + MFA = {M, F, A} = {?MODULE, echo, [Pid, Msg]}, {ok, TnxId, ok} = multicall(M, F, A), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), ?assertEqual({ok, 2, ok}, multicall(M, F, A)), {atomic, Status} = emqx_cluster_rpc:status(), case length(Status) =:= 3 of @@ -118,9 +119,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), Pid = self(), - {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]}, {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, {ok, _, ok} = multicall(M, F, A, 1, 1000), @@ -154,9 +156,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> t_commit_concurrency(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), Pid = self(), - {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, - {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA), - ?assertEqual(ok, receive_msg(3, test)), + Msg = ?FUNCTION_NAME, + {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]}, + ?assertEqual({ok, 1, ok}, multicall(BaseM, BaseF, BaseA)), + ?assertEqual(ok, receive_msg(3, Msg)), %% call concurrently without stale tnx_id error Workers = lists:seq(1, 256), @@ -231,23 +234,24 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {atomic, [_Status | L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ets:insert(test, {other_mfa_result, ok}), - {ok, 2, ok} = multicall(io, format, ["test"], 1, 1000), + {ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000), ct:sleep(1000), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), - MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, + Msg = ?FUNCTION_NAME, + MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]}, {ok, TnxId, ok} = multicall(M1, F1, A1), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), - ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual(ok, receive_msg(3, Msg)), ok. t_del_stale_mfa(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - MFA = {M, F, A} = {io, format, ["test"]}, + MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]}, Keys = lists:seq(1, 50), Keys2 = lists:seq(51, 150), Ids = @@ -288,7 +292,7 @@ t_del_stale_mfa(_Config) -> t_skip_failed_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -308,7 +312,7 @@ t_skip_failed_commit(_Config) -> t_fast_forward_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -356,7 +360,11 @@ tnx_ids(Status) -> start() -> {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), + ok = emqx_cluster_rpc:wait_for_cluster_rpc(), ok = emqx_cluster_rpc:reset(), + %% Ensure all processes are idle status. + ok = gen_server:call(?NODE2, test), + ok = gen_server:call(?NODE3, test), ok. stop() -> @@ -366,6 +374,7 @@ stop() -> undefined -> ok; P -> + erlang:unregister(N), erlang:unlink(P), erlang:exit(P, kill) end @@ -379,8 +388,9 @@ receive_msg(Count, Msg) when Count > 0 -> receive Msg -> receive_msg(Count - 1, Msg) - after 1000 -> - timeout + after 1300 -> + Msg = iolist_to_binary(io_lib:format("There's still ~w messages to be received", [Count])), + {Msg, flush_msg([])} end. echo(Pid, Msg) -> @@ -425,3 +435,11 @@ multicall(M, F, A, N, T) -> multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). + +flush_msg(Acc) -> + receive + Msg -> + flush_msg([Msg | Acc]) + after 10 -> + Acc + end.