test(emqx_cm_SUITE): use one helper function: `emqx_pool:flush_async_tasks/1`

This commit is contained in:
Serge Tupchii 2024-01-16 19:42:37 +02:00 committed by zhongwencool
parent f52cc93d9d
commit a8c6280a5e
1 changed files with 1 additions and 23 deletions

View File

@ -343,31 +343,9 @@ test_stepdown_session(Action, Reason) ->
end, end,
% sync % sync
ignored = gen_server:call(?CM, ignore, infinity), ignored = gen_server:call(?CM, ignore, infinity),
ok = flush_emqx_cm_pool(), ok = emqx_pool:flush_async_tasks(?CM_POOL),
?assertEqual([], emqx_cm:lookup_channels(ClientId)). ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
%% Channel deregistration is delegated to emqx_pool as a sync tasks.
%% The emqx_pool is pool of workers, and there is no way to know
%% which worker was picked for the last deregistration task.
%% This help function creates a large enough number of async tasks
%% to sync with the pool workers.
%% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks.
flush_emqx_cm_pool() ->
Self = self(),
L = lists:seq(1, 1000),
lists:foreach(
fun(I) -> emqx_pool:async_submit_to_pool(?CM_POOL, fun() -> Self ! {done, I} end, []) end, L
),
lists:foreach(
fun(I) ->
receive
{done, I} -> ok
end
end,
L
).
t_discard_session_race(_) -> t_discard_session_race(_) ->
ClientId = rand_client_id(), ClientId = rand_client_id(),
?check_trace( ?check_trace(