test: add more debug msg to flaky cluster_rpc SUITE

This commit is contained in:
zhongwencool 2024-06-21 18:04:24 +08:00
parent acc8bf3405
commit 75a524c916
1 changed files with 33 additions and 15 deletions

View File

@ -74,13 +74,14 @@ end_per_testcase(_Config) ->
t_base_test(_Config) ->
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
Pid = self(),
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
Msg = ?FUNCTION_NAME,
MFA = {M, F, A} = {?MODULE, echo, [Pid, Msg]},
{ok, TnxId, ok} = multicall(M, F, A),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFA, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual(ok, receive_msg(3, Msg)),
?assertEqual({ok, 2, ok}, multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(),
case length(Status) =:= 3 of
@ -118,9 +119,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
Msg = ?FUNCTION_NAME,
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual(ok, receive_msg(3, Msg)),
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
{ok, _, ok} = multicall(M, F, A, 1, 1000),
@ -154,9 +156,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
t_commit_concurrency(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
Pid = self(),
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
?assertEqual(ok, receive_msg(3, test)),
Msg = ?FUNCTION_NAME,
{BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
?assertEqual({ok, 1, ok}, multicall(BaseM, BaseF, BaseA)),
?assertEqual(ok, receive_msg(3, Msg)),
%% call concurrently without stale tnx_id error
Workers = lists:seq(1, 256),
@ -231,23 +234,24 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = multicall(io, format, ["test"], 1, 1000),
{ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
ct:sleep(1000),
{atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)),
Pid = self(),
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
Msg = ?FUNCTION_NAME,
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
{ok, TnxId, ok} = multicall(M1, F1, A1),
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
?assertEqual(MFAEcho, maps:get(mfa, Query)),
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual(ok, receive_msg(3, Msg)),
ok.
t_del_stale_mfa(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {io, format, ["test"]},
MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]},
Keys = lists:seq(1, 50),
Keys2 = lists:seq(51, 150),
Ids =
@ -288,7 +292,7 @@ t_del_stale_mfa(_Config) ->
t_skip_failed_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -308,7 +312,7 @@ t_skip_failed_commit(_Config) ->
t_fast_forward_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
{ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
@ -356,7 +360,11 @@ tnx_ids(Status) ->
start() ->
{ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
{ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
ok = emqx_cluster_rpc:reset(),
%% Ensure all processes are idle status.
ok = gen_server:call(?NODE2, test),
ok = gen_server:call(?NODE3, test),
ok.
stop() ->
@ -366,6 +374,7 @@ stop() ->
undefined ->
ok;
P ->
erlang:unregister(N),
erlang:unlink(P),
erlang:exit(P, kill)
end
@ -379,8 +388,9 @@ receive_msg(Count, Msg) when Count > 0 ->
receive
Msg ->
receive_msg(Count - 1, Msg)
after 1000 ->
timeout
after 1300 ->
Msg = iolist_to_binary(io_lib:format("There's still ~w messages to be received", [Count])),
{Msg, flush_msg([])}
end.
echo(Pid, Msg) ->
@ -425,3 +435,11 @@ multicall(M, F, A, N, T) ->
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).
flush_msg(Acc) ->
receive
Msg ->
flush_msg([Msg | Acc])
after 10 ->
Acc
end.