From 7ae45056367e9085359f36dd7a6c9e58d0011dbc Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Tue, 5 Oct 2021 20:55:36 +0200 Subject: [PATCH] test(emqx_broker): fix flaky tests --- apps/emqx/test/emqx_broker_SUITE.erl | 153 ++++++++++++++++++++------- 1 file changed, 112 insertions(+), 41 deletions(-) diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index fe754e9df..fbc374f90 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -37,38 +37,82 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +init_per_testcase(Case, Config) -> + ?MODULE:Case({init, Config}). + +end_per_testcase(Case, Config) -> + ?MODULE:Case({'end', Config}). + %%-------------------------------------------------------------------- %% PubSub Test %%-------------------------------------------------------------------- -t_stats_fun(_) -> - Subscribers = emqx_stats:getstat('subscribers.count'), - Subscriptions = emqx_stats:getstat('subscriptions.count'), - Subopts = emqx_stats:getstat('suboptions.count'), +t_stats_fun({init, Config}) -> + Parent = self(), + F = fun Loop() -> + N1 = emqx_stats:getstat('subscribers.count'), + N2 = emqx_stats:getstat('subscriptions.count'), + N3 = emqx_stats:getstat('suboptions.count'), + case N1 + N2 + N3 =:= 0 of + true -> + Parent ! {ready, self()}, + exit(normal); + false -> + receive + stop -> + exit(normal) + after + 100 -> + Loop() + end + end + end, + Pid = spawn_link(F), + receive + {ready, P} when P =:= Pid-> + Config + after + 5000 -> + Pid ! stop, + ct:fail("timedout_waiting_for_sub_stats_to_reach_zero") + end; +t_stats_fun(Config) when is_list(Config) -> ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>), + %% ensure stats refreshed emqx_broker:stats_fun(), - ct:sleep(10), - ?assertEqual(Subscribers + 2, emqx_stats:getstat('subscribers.count')), - ?assertEqual(Subscribers + 2, emqx_stats:getstat('subscribers.max')), - ?assertEqual(Subscriptions + 2, emqx_stats:getstat('subscriptions.count')), - ?assertEqual(Subscriptions + 2, emqx_stats:getstat('subscriptions.max')), - ?assertEqual(Subopts + 2, emqx_stats:getstat('suboptions.count')), - ?assertEqual(Subopts + 2, emqx_stats:getstat('suboptions.max')). + %% emqx_stats:set_stat is a gen_server cast + %% make a synced call sync + ignored = gen_server:call(emqx_stats, call, infinity), + ?assertEqual(2, emqx_stats:getstat('subscribers.count')), + ?assertEqual(2, emqx_stats:getstat('subscribers.max')), + ?assertEqual(2, emqx_stats:getstat('subscriptions.count')), + ?assertEqual(2, emqx_stats:getstat('subscriptions.max')), + ?assertEqual(2, emqx_stats:getstat('suboptions.count')), + ?assertEqual(2, emqx_stats:getstat('suboptions.max')); +t_stats_fun({'end', _Config}) -> + ok = emqx_broker:unsubscribe(<<"topic">>), + ok = emqx_broker:unsubscribe(<<"topic2">>). -t_subscribed(_) -> +t_subscribed({init, Config}) -> emqx_broker:subscribe(<<"topic">>), + Config; +t_subscribed(Config) when is_list(Config) -> ?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)), - ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)), + ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)); +t_subscribed({'end', _Config}) -> emqx_broker:unsubscribe(<<"topic">>). -t_subscribed_2(_) -> +t_subscribed_2({init, Config}) -> emqx_broker:subscribe(<<"topic">>, <<"clientid">>), - %?assertEqual(true, emqx_broker:subscribed(<<"clientid">>, <<"topic">>)), - ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)), + Config; +t_subscribed_2(Config) when is_list(Config) -> + ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)); +t_subscribed_2({'end', _Config}) -> emqx_broker:unsubscribe(<<"topic">>). -t_subopts(_) -> +t_subopts({init, Config}) -> Config; +t_subopts(Config) when is_list(Config) -> ?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})), ?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)), ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)), @@ -85,42 +129,54 @@ t_subopts(_) -> ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})), ?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>}, - emqx_broker:get_subopts(self(), <<"topic">>)), + emqx_broker:get_subopts(self(), <<"topic">>)); +t_subopts({'end', _Config}) -> emqx_broker:unsubscribe(<<"topic">>). -t_topics(_) -> +t_topics({init, Config}) -> Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>], - ok = emqx_broker:subscribe(lists:nth(1, Topics), <<"clientId">>), - ok = emqx_broker:subscribe(lists:nth(2, Topics), <<"clientId">>), - ok = emqx_broker:subscribe(lists:nth(3, Topics), <<"clientId">>), + [{topics, Topics} | Config]; +t_topics(Config) when is_list(Config) -> + Topics = [T1, T2, T3] = proplists:get_value(topics, Config), + ok = emqx_broker:subscribe(T1, <<"clientId">>), + ok = emqx_broker:subscribe(T2, <<"clientId">>), + ok = emqx_broker:subscribe(T3, <<"clientId">>), Topics1 = emqx_broker:topics(), ?assertEqual(true, lists:foldl(fun(Topic, Acc) -> case lists:member(Topic, Topics1) of true -> Acc; false -> false end - end, true, Topics)), - emqx_broker:unsubscribe(lists:nth(1, Topics)), - emqx_broker:unsubscribe(lists:nth(2, Topics)), - emqx_broker:unsubscribe(lists:nth(3, Topics)). + end, true, Topics)); +t_topics({'end', Config}) -> + Topics = proplists:get_value(topics, Config), + lists:foreach(fun(T) -> emqx_broker:unsubscribe(T) end, Topics). -t_subscribers(_) -> +t_subscribers({init, Config}) -> emqx_broker:subscribe(<<"topic">>, <<"clientid">>), - ?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)), + Config; +t_subscribers(Config) when is_list(Config) -> + ?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)); +t_subscribers({'end', _Config}) -> emqx_broker:unsubscribe(<<"topic">>). -t_subscriptions(_) -> +t_subscriptions({init, Config}) -> emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), - ok = timer:sleep(100), + Config; +t_subscriptions(Config) when is_list(Config) -> + ct:sleep(100), ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))), ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, - proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))), + proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))); +t_subscriptions({'end', _Config}) -> emqx_broker:unsubscribe(<<"topic">>). -t_sub_pub(_) -> +t_sub_pub({init, Config}) -> ok = emqx_broker:subscribe(<<"topic">>), - ct:sleep(10), + Config; +t_sub_pub(Config) when is_list(Config) -> + ct:sleep(100), emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), ?assert( receive @@ -130,16 +186,22 @@ t_sub_pub(_) -> false after 100 -> false - end). + end); +t_sub_pub({'end', _Config}) -> + ok = emqx_broker:unsubscribe(<<"topic">>). -t_nosub_pub(_) -> +t_nosub_pub({init, Config}) -> Config; +t_nosub_pub({'end', _Config}) -> ok; +t_nosub_pub(Config) when is_list(Config) -> ?assertEqual(0, emqx_metrics:val('messages.dropped')), emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), ?assertEqual(1, emqx_metrics:val('messages.dropped')). -t_shared_subscribe(_) -> +t_shared_subscribe({init, Config}) -> emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}), - ct:sleep(10), + ct:sleep(100), + Config; +t_shared_subscribe(Config) when is_list(Config) -> emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), ?assert(receive {deliver, <<"topic">>, #message{payload = <<"hello">>}} -> @@ -149,9 +211,12 @@ t_shared_subscribe(_) -> false after 100 -> false - end), + end); +t_shared_subscribe({'end', _Config}) -> emqx_broker:unsubscribe(<<"$share/group/topic">>). +t_shared_subscribe_2({init, Config}) -> Config; +t_shared_subscribe_2({'end', _Config}) -> ok; t_shared_subscribe_2(_) -> {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]), {ok, _} = emqtt:connect(ConnPid), @@ -173,6 +238,8 @@ t_shared_subscribe_2(_) -> emqtt:disconnect(ConnPid), emqtt:disconnect(ConnPid2). +t_shared_subscribe_3({init, Config}) -> Config; +t_shared_subscribe_3({'end', _Config}) -> ok; t_shared_subscribe_3(_) -> {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]), {ok, _} = emqtt:connect(ConnPid), @@ -189,11 +256,13 @@ t_shared_subscribe_3(_) -> emqtt:disconnect(ConnPid), emqtt:disconnect(ConnPid2). -t_shard(_) -> +t_shard({init, Config}) -> ok = meck:new(emqx_broker_helper, [passthrough, no_history]), ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end), emqx_broker:subscribe(<<"topic">>, <<"clientid">>), - ct:sleep(10), + Config; +t_shard(Config) when is_list(Config) -> + ct:sleep(100), emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), ?assert( receive @@ -203,7 +272,9 @@ t_shard(_) -> false after 100 -> false - end), + end); +t_shard({'end', _Config}) -> + emqx_broker:unsubscribe(<<"topic">>), ok = meck:unload(emqx_broker_helper). recv_msgs(Count) ->