From c8e42cf6b19ff376073b49739a1dbdb26d24e8f1 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 4 Mar 2024 17:24:48 +0100 Subject: [PATCH] test(emqx_broker_SUITE): fix flaky test case --- apps/emqx/src/emqx_cm.erl | 2 +- apps/emqx/test/emqx_broker_SUITE.erl | 88 +++++++++++++++++++--------- 2 files changed, 60 insertions(+), 30 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 0cf015141..c14058f9a 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -721,8 +721,8 @@ do_get_chann_conn_mod(ClientId, ChanPid) -> end. mark_channel_connected(ChanPid) -> - ?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}), ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}), + ?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}), ok. mark_channel_disconnected(ChanPid) -> diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index aa62a11d4..d4bb9e7fc 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -559,45 +559,70 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) -> %% we spawn several clients simultaneously to cause the race %% condition for the client id lock NumClients = 20, + ConnectSuccessCntr = counters:new(1, []), + ConnectFailCntr = counters:new(1, []), + ConnectFun = + fun() -> + process_flag(trap_exit, true), + try + {ok, ConnPid} = + emqtt:start_link([ + {clean_start, true}, + {clientid, ClientID} + | Config + ]), + {ok, _} = emqtt:ConnFun(ConnPid), + counters:add(ConnectSuccessCntr, 1, 1) + catch + _:_ -> + counters:add(ConnectFailCntr, 1, 1) + end + end, {ok, {ok, [_, _]}} = wait_for_events( fun() -> lists:foreach( fun(_) -> - spawn( - fun() -> - {ok, ConnPid} = - emqtt:start_link([ - {clean_start, true}, - {clientid, ClientID} - | Config - ]), - %% don't assert the result: most of them fail - %% during the race - emqtt:ConnFun(ConnPid), - ok - end - ), - ok + spawn(ConnectFun) end, lists:seq(1, NumClients) - ) + ), + ok end, - %% there can be only one channel that wins the race for the - %% lock for this client id. we also expect a decrement - %% event because the client dies along with the ephemeral - %% process. + %% At least one channel acquires the lock for this client id. We + %% also expect a decrement event because the client dies along with + %% the ephemeral process. [ emqx_cm_connected_client_count_inc, - emqx_cm_connected_client_count_dec + emqx_cm_connected_client_count_dec_done ], - 1000 + _Timeout = 10000 ), %% Since more than one pair of inc/dec may be emitted, we need to %% wait for full stabilization - timer:sleep(100), - %% It must be 0 again because we spawn-linked the clients in - %% ephemeral processes above, and all should be dead now. + ?retry( + _Sleep = 100, + _Retries = 100, + begin + ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1), + ConnectFailCnt = counters:get(ConnectFailCntr, 1), + NumClients = ConnectSuccessCnt + ConnectFailCnt + end + ), + ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1), + ?assert(ConnectSuccessCnt > 0), + EventsThatShouldHaveHappened = lists:flatten( + lists:duplicate( + ConnectSuccessCnt, + [ + emqx_cm_connected_client_count_inc, + emqx_cm_connected_client_count_dec_done + ] + ) + ), + wait_for_events(fun() -> ok end, EventsThatShouldHaveHappened, 10000, infinity), + %% It must be 0 again because we got enough + %% emqx_cm_connected_client_count_dec_done events ?assertEqual(0, emqx_cm:get_connected_client_count()), %% connecting again {ok, ConnPid1} = emqtt:start_link([ @@ -608,7 +633,8 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) -> {{ok, _}, {ok, [_]}} = wait_for_events( fun() -> emqtt:ConnFun(ConnPid1) end, - [emqx_cm_connected_client_count_inc] + [emqx_cm_connected_client_count_inc], + _Timeout = 10000 ), ?assertEqual(1, emqx_cm:get_connected_client_count()), %% abnormal exit of channel process @@ -620,9 +646,10 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) -> ok end, [ - emqx_cm_connected_client_count_dec, + emqx_cm_connected_client_count_dec_done, emqx_cm_process_down - ] + ], + _Timeout = 10000 ), ?assertEqual(0, emqx_cm:get_connected_client_count()), ok; @@ -735,11 +762,14 @@ wait_for_events(Action, Kinds) -> wait_for_events(Action, Kinds, 500). wait_for_events(Action, Kinds, Timeout) -> + wait_for_events(Action, Kinds, Timeout, 0). + +wait_for_events(Action, Kinds, Timeout, BackInTime) -> Predicate = fun(#{?snk_kind := K}) -> lists:member(K, Kinds) end, N = length(Kinds), - {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, BackInTime), Res = Action(), case snabbkaffe_collector:receive_events(Sub) of {timeout, _} ->