Merge pull request #12639 from kjellwinblad/kjell/fix_flaky_test_case/emqx_broker_SUITE.connected_client_count_group.quic.t_connected_client_count_transient_takeover

test(emqx_broker_SUITE): fix flaky test case
This commit is contained in:
Kjell Winblad 2024-03-05 18:02:07 +01:00 committed by GitHub
commit 8cf681ad3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 60 additions and 30 deletions

View File

@ -721,8 +721,8 @@ do_get_chann_conn_mod(ClientId, ChanPid) ->
end. end.
mark_channel_connected(ChanPid) -> mark_channel_connected(ChanPid) ->
?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}),
ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}), ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}),
?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}),
ok. ok.
mark_channel_disconnected(ChanPid) -> mark_channel_disconnected(ChanPid) ->

View File

@ -559,45 +559,70 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
%% we spawn several clients simultaneously to cause the race %% we spawn several clients simultaneously to cause the race
%% condition for the client id lock %% condition for the client id lock
NumClients = 20, NumClients = 20,
{ok, {ok, [_, _]}} = ConnectSuccessCntr = counters:new(1, []),
wait_for_events( ConnectFailCntr = counters:new(1, []),
fun() -> ConnectFun =
lists:foreach(
fun(_) ->
spawn(
fun() -> fun() ->
process_flag(trap_exit, true),
try
{ok, ConnPid} = {ok, ConnPid} =
emqtt:start_link([ emqtt:start_link([
{clean_start, true}, {clean_start, true},
{clientid, ClientID} {clientid, ClientID}
| Config | Config
]), ]),
%% don't assert the result: most of them fail {ok, _} = emqtt:ConnFun(ConnPid),
%% during the race counters:add(ConnectSuccessCntr, 1, 1)
emqtt:ConnFun(ConnPid), catch
ok _:_ ->
counters:add(ConnectFailCntr, 1, 1)
end end
end,
{ok, {ok, [_, _]}} =
wait_for_events(
fun() ->
lists:foreach(
fun(_) ->
spawn(ConnectFun)
end,
lists:seq(1, NumClients)
), ),
ok ok
end, end,
lists:seq(1, NumClients) %% At least one channel acquires the lock for this client id. We
) %% also expect a decrement event because the client dies along with
end, %% the ephemeral process.
%% there can be only one channel that wins the race for the
%% lock for this client id. we also expect a decrement
%% event because the client dies along with the ephemeral
%% process.
[ [
emqx_cm_connected_client_count_inc, emqx_cm_connected_client_count_inc,
emqx_cm_connected_client_count_dec emqx_cm_connected_client_count_dec_done
], ],
1000 _Timeout = 10000
), ),
%% Since more than one pair of inc/dec may be emitted, we need to %% Since more than one pair of inc/dec may be emitted, we need to
%% wait for full stabilization %% wait for full stabilization
timer:sleep(100), ?retry(
%% It must be 0 again because we spawn-linked the clients in _Sleep = 100,
%% ephemeral processes above, and all should be dead now. _Retries = 100,
begin
ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1),
ConnectFailCnt = counters:get(ConnectFailCntr, 1),
NumClients = ConnectSuccessCnt + ConnectFailCnt
end
),
ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1),
?assert(ConnectSuccessCnt > 0),
EventsThatShouldHaveHappened = lists:flatten(
lists:duplicate(
ConnectSuccessCnt,
[
emqx_cm_connected_client_count_inc,
emqx_cm_connected_client_count_dec_done
]
)
),
wait_for_events(fun() -> ok end, EventsThatShouldHaveHappened, 10000, infinity),
%% It must be 0 again because we got enough
%% emqx_cm_connected_client_count_dec_done events
?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_cm:get_connected_client_count()),
%% connecting again %% connecting again
{ok, ConnPid1} = emqtt:start_link([ {ok, ConnPid1} = emqtt:start_link([
@ -608,7 +633,8 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
{{ok, _}, {ok, [_]}} = {{ok, _}, {ok, [_]}} =
wait_for_events( wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end, fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc] [emqx_cm_connected_client_count_inc],
_Timeout = 10000
), ),
?assertEqual(1, emqx_cm:get_connected_client_count()), ?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process %% abnormal exit of channel process
@ -620,9 +646,10 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
ok ok
end, end,
[ [
emqx_cm_connected_client_count_dec, emqx_cm_connected_client_count_dec_done,
emqx_cm_process_down emqx_cm_process_down
] ],
_Timeout = 10000
), ),
?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_cm:get_connected_client_count()),
ok; ok;
@ -735,11 +762,14 @@ wait_for_events(Action, Kinds) ->
wait_for_events(Action, Kinds, 500). wait_for_events(Action, Kinds, 500).
wait_for_events(Action, Kinds, Timeout) -> wait_for_events(Action, Kinds, Timeout) ->
wait_for_events(Action, Kinds, Timeout, 0).
wait_for_events(Action, Kinds, Timeout, BackInTime) ->
Predicate = fun(#{?snk_kind := K}) -> Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds) lists:member(K, Kinds)
end, end,
N = length(Kinds), N = length(Kinds),
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, BackInTime),
Res = Action(), Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} -> {timeout, _} ->