test(emqx_broker_SUITE): fix flaky test case

This commit is contained in:
Kjell Winblad 2024-03-04 17:24:48 +01:00
parent c2dcb507cf
commit c8e42cf6b1
2 changed files with 60 additions and 30 deletions

View File

@ -721,8 +721,8 @@ do_get_chann_conn_mod(ClientId, ChanPid) ->
end. end.
mark_channel_connected(ChanPid) -> mark_channel_connected(ChanPid) ->
?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}),
ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}), ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}),
?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}),
ok. ok.
mark_channel_disconnected(ChanPid) -> mark_channel_disconnected(ChanPid) ->

View File

@ -559,45 +559,70 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
%% we spawn several clients simultaneously to cause the race %% we spawn several clients simultaneously to cause the race
%% condition for the client id lock %% condition for the client id lock
NumClients = 20, NumClients = 20,
ConnectSuccessCntr = counters:new(1, []),
ConnectFailCntr = counters:new(1, []),
ConnectFun =
fun() ->
process_flag(trap_exit, true),
try
{ok, ConnPid} =
emqtt:start_link([
{clean_start, true},
{clientid, ClientID}
| Config
]),
{ok, _} = emqtt:ConnFun(ConnPid),
counters:add(ConnectSuccessCntr, 1, 1)
catch
_:_ ->
counters:add(ConnectFailCntr, 1, 1)
end
end,
{ok, {ok, [_, _]}} = {ok, {ok, [_, _]}} =
wait_for_events( wait_for_events(
fun() -> fun() ->
lists:foreach( lists:foreach(
fun(_) -> fun(_) ->
spawn( spawn(ConnectFun)
fun() ->
{ok, ConnPid} =
emqtt:start_link([
{clean_start, true},
{clientid, ClientID}
| Config
]),
%% don't assert the result: most of them fail
%% during the race
emqtt:ConnFun(ConnPid),
ok
end
),
ok
end, end,
lists:seq(1, NumClients) lists:seq(1, NumClients)
) ),
ok
end, end,
%% there can be only one channel that wins the race for the %% At least one channel acquires the lock for this client id. We
%% lock for this client id. we also expect a decrement %% also expect a decrement event because the client dies along with
%% event because the client dies along with the ephemeral %% the ephemeral process.
%% process.
[ [
emqx_cm_connected_client_count_inc, emqx_cm_connected_client_count_inc,
emqx_cm_connected_client_count_dec emqx_cm_connected_client_count_dec_done
], ],
1000 _Timeout = 10000
), ),
%% Since more than one pair of inc/dec may be emitted, we need to %% Since more than one pair of inc/dec may be emitted, we need to
%% wait for full stabilization %% wait for full stabilization
timer:sleep(100), ?retry(
%% It must be 0 again because we spawn-linked the clients in _Sleep = 100,
%% ephemeral processes above, and all should be dead now. _Retries = 100,
begin
ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1),
ConnectFailCnt = counters:get(ConnectFailCntr, 1),
NumClients = ConnectSuccessCnt + ConnectFailCnt
end
),
ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1),
?assert(ConnectSuccessCnt > 0),
EventsThatShouldHaveHappened = lists:flatten(
lists:duplicate(
ConnectSuccessCnt,
[
emqx_cm_connected_client_count_inc,
emqx_cm_connected_client_count_dec_done
]
)
),
wait_for_events(fun() -> ok end, EventsThatShouldHaveHappened, 10000, infinity),
%% It must be 0 again because we got enough
%% emqx_cm_connected_client_count_dec_done events
?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_cm:get_connected_client_count()),
%% connecting again %% connecting again
{ok, ConnPid1} = emqtt:start_link([ {ok, ConnPid1} = emqtt:start_link([
@ -608,7 +633,8 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
{{ok, _}, {ok, [_]}} = {{ok, _}, {ok, [_]}} =
wait_for_events( wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end, fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc] [emqx_cm_connected_client_count_inc],
_Timeout = 10000
), ),
?assertEqual(1, emqx_cm:get_connected_client_count()), ?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process %% abnormal exit of channel process
@ -620,9 +646,10 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
ok ok
end, end,
[ [
emqx_cm_connected_client_count_dec, emqx_cm_connected_client_count_dec_done,
emqx_cm_process_down emqx_cm_process_down
] ],
_Timeout = 10000
), ),
?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_cm:get_connected_client_count()),
ok; ok;
@ -735,11 +762,14 @@ wait_for_events(Action, Kinds) ->
wait_for_events(Action, Kinds, 500). wait_for_events(Action, Kinds, 500).
wait_for_events(Action, Kinds, Timeout) -> wait_for_events(Action, Kinds, Timeout) ->
wait_for_events(Action, Kinds, Timeout, 0).
wait_for_events(Action, Kinds, Timeout, BackInTime) ->
Predicate = fun(#{?snk_kind := K}) -> Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds) lists:member(K, Kinds)
end, end,
N = length(Kinds), N = length(Kinds),
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, BackInTime),
Res = Action(), Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} -> {timeout, _} ->