test(emqx_broker): fix flaky tests

This commit is contained in:
Zaiming Shi 2021-10-05 20:55:36 +02:00
parent 5369eb231b
commit 7ae4505636
1 changed files with 112 additions and 41 deletions

View File

@ -37,38 +37,82 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}).
end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% PubSub Test %% PubSub Test
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_stats_fun(_) -> t_stats_fun({init, Config}) ->
Subscribers = emqx_stats:getstat('subscribers.count'), Parent = self(),
Subscriptions = emqx_stats:getstat('subscriptions.count'), F = fun Loop() ->
Subopts = emqx_stats:getstat('suboptions.count'), N1 = emqx_stats:getstat('subscribers.count'),
N2 = emqx_stats:getstat('subscriptions.count'),
N3 = emqx_stats:getstat('suboptions.count'),
case N1 + N2 + N3 =:= 0 of
true ->
Parent ! {ready, self()},
exit(normal);
false ->
receive
stop ->
exit(normal)
after
100 ->
Loop()
end
end
end,
Pid = spawn_link(F),
receive
{ready, P} when P =:= Pid->
Config
after
5000 ->
Pid ! stop,
ct:fail("timedout_waiting_for_sub_stats_to_reach_zero")
end;
t_stats_fun(Config) when is_list(Config) ->
ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
%% ensure stats refreshed
emqx_broker:stats_fun(), emqx_broker:stats_fun(),
ct:sleep(10), %% emqx_stats:set_stat is a gen_server cast
?assertEqual(Subscribers + 2, emqx_stats:getstat('subscribers.count')), %% make a synced call sync
?assertEqual(Subscribers + 2, emqx_stats:getstat('subscribers.max')), ignored = gen_server:call(emqx_stats, call, infinity),
?assertEqual(Subscriptions + 2, emqx_stats:getstat('subscriptions.count')), ?assertEqual(2, emqx_stats:getstat('subscribers.count')),
?assertEqual(Subscriptions + 2, emqx_stats:getstat('subscriptions.max')), ?assertEqual(2, emqx_stats:getstat('subscribers.max')),
?assertEqual(Subopts + 2, emqx_stats:getstat('suboptions.count')), ?assertEqual(2, emqx_stats:getstat('subscriptions.count')),
?assertEqual(Subopts + 2, emqx_stats:getstat('suboptions.max')). ?assertEqual(2, emqx_stats:getstat('subscriptions.max')),
?assertEqual(2, emqx_stats:getstat('suboptions.count')),
?assertEqual(2, emqx_stats:getstat('suboptions.max'));
t_stats_fun({'end', _Config}) ->
ok = emqx_broker:unsubscribe(<<"topic">>),
ok = emqx_broker:unsubscribe(<<"topic2">>).
t_subscribed(_) -> t_subscribed({init, Config}) ->
emqx_broker:subscribe(<<"topic">>), emqx_broker:subscribe(<<"topic">>),
Config;
t_subscribed(Config) when is_list(Config) ->
?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)), ?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)), ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
t_subscribed({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>). emqx_broker:unsubscribe(<<"topic">>).
t_subscribed_2(_) -> t_subscribed_2({init, Config}) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>), emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
%?assertEqual(true, emqx_broker:subscribed(<<"clientid">>, <<"topic">>)), Config;
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)), t_subscribed_2(Config) when is_list(Config) ->
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
t_subscribed_2({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>). emqx_broker:unsubscribe(<<"topic">>).
t_subopts(_) -> t_subopts({init, Config}) -> Config;
t_subopts(Config) when is_list(Config) ->
?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})), ?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)), ?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)), ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
@ -85,42 +129,54 @@ t_subopts(_) ->
?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})), ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})),
?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>}, ?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>},
emqx_broker:get_subopts(self(), <<"topic">>)), emqx_broker:get_subopts(self(), <<"topic">>));
t_subopts({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>). emqx_broker:unsubscribe(<<"topic">>).
t_topics(_) -> t_topics({init, Config}) ->
Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>], Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
ok = emqx_broker:subscribe(lists:nth(1, Topics), <<"clientId">>), [{topics, Topics} | Config];
ok = emqx_broker:subscribe(lists:nth(2, Topics), <<"clientId">>), t_topics(Config) when is_list(Config) ->
ok = emqx_broker:subscribe(lists:nth(3, Topics), <<"clientId">>), Topics = [T1, T2, T3] = proplists:get_value(topics, Config),
ok = emqx_broker:subscribe(T1, <<"clientId">>),
ok = emqx_broker:subscribe(T2, <<"clientId">>),
ok = emqx_broker:subscribe(T3, <<"clientId">>),
Topics1 = emqx_broker:topics(), Topics1 = emqx_broker:topics(),
?assertEqual(true, lists:foldl(fun(Topic, Acc) -> ?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
case lists:member(Topic, Topics1) of case lists:member(Topic, Topics1) of
true -> Acc; true -> Acc;
false -> false false -> false
end end
end, true, Topics)), end, true, Topics));
emqx_broker:unsubscribe(lists:nth(1, Topics)), t_topics({'end', Config}) ->
emqx_broker:unsubscribe(lists:nth(2, Topics)), Topics = proplists:get_value(topics, Config),
emqx_broker:unsubscribe(lists:nth(3, Topics)). lists:foreach(fun(T) -> emqx_broker:unsubscribe(T) end, Topics).
t_subscribers(_) -> t_subscribers({init, Config}) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>), emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)), Config;
t_subscribers(Config) when is_list(Config) ->
?assertEqual([self()], emqx_broker:subscribers(<<"topic">>));
t_subscribers({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>). emqx_broker:unsubscribe(<<"topic">>).
t_subscriptions(_) -> t_subscriptions({init, Config}) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
ok = timer:sleep(100), Config;
t_subscriptions(Config) when is_list(Config) ->
ct:sleep(100),
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))), proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))), proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>)));
t_subscriptions({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>). emqx_broker:unsubscribe(<<"topic">>).
t_sub_pub(_) -> t_sub_pub({init, Config}) ->
ok = emqx_broker:subscribe(<<"topic">>), ok = emqx_broker:subscribe(<<"topic">>),
ct:sleep(10), Config;
t_sub_pub(Config) when is_list(Config) ->
ct:sleep(100),
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assert( ?assert(
receive receive
@ -130,16 +186,22 @@ t_sub_pub(_) ->
false false
after 100 -> after 100 ->
false false
end). end);
t_sub_pub({'end', _Config}) ->
ok = emqx_broker:unsubscribe(<<"topic">>).
t_nosub_pub(_) -> t_nosub_pub({init, Config}) -> Config;
t_nosub_pub({'end', _Config}) -> ok;
t_nosub_pub(Config) when is_list(Config) ->
?assertEqual(0, emqx_metrics:val('messages.dropped')), ?assertEqual(0, emqx_metrics:val('messages.dropped')),
emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assertEqual(1, emqx_metrics:val('messages.dropped')). ?assertEqual(1, emqx_metrics:val('messages.dropped')).
t_shared_subscribe(_) -> t_shared_subscribe({init, Config}) ->
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}), emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
ct:sleep(10), ct:sleep(100),
Config;
t_shared_subscribe(Config) when is_list(Config) ->
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assert(receive ?assert(receive
{deliver, <<"topic">>, #message{payload = <<"hello">>}} -> {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
@ -149,9 +211,12 @@ t_shared_subscribe(_) ->
false false
after 100 -> after 100 ->
false false
end), end);
t_shared_subscribe({'end', _Config}) ->
emqx_broker:unsubscribe(<<"$share/group/topic">>). emqx_broker:unsubscribe(<<"$share/group/topic">>).
t_shared_subscribe_2({init, Config}) -> Config;
t_shared_subscribe_2({'end', _Config}) -> ok;
t_shared_subscribe_2(_) -> t_shared_subscribe_2(_) ->
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]), {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
{ok, _} = emqtt:connect(ConnPid), {ok, _} = emqtt:connect(ConnPid),
@ -173,6 +238,8 @@ t_shared_subscribe_2(_) ->
emqtt:disconnect(ConnPid), emqtt:disconnect(ConnPid),
emqtt:disconnect(ConnPid2). emqtt:disconnect(ConnPid2).
t_shared_subscribe_3({init, Config}) -> Config;
t_shared_subscribe_3({'end', _Config}) -> ok;
t_shared_subscribe_3(_) -> t_shared_subscribe_3(_) ->
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]), {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
{ok, _} = emqtt:connect(ConnPid), {ok, _} = emqtt:connect(ConnPid),
@ -189,11 +256,13 @@ t_shared_subscribe_3(_) ->
emqtt:disconnect(ConnPid), emqtt:disconnect(ConnPid),
emqtt:disconnect(ConnPid2). emqtt:disconnect(ConnPid2).
t_shard(_) -> t_shard({init, Config}) ->
ok = meck:new(emqx_broker_helper, [passthrough, no_history]), ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end), ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
emqx_broker:subscribe(<<"topic">>, <<"clientid">>), emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
ct:sleep(10), Config;
t_shard(Config) when is_list(Config) ->
ct:sleep(100),
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
?assert( ?assert(
receive receive
@ -203,7 +272,9 @@ t_shard(_) ->
false false
after 100 -> after 100 ->
false false
end), end);
t_shard({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>),
ok = meck:unload(emqx_broker_helper). ok = meck:unload(emqx_broker_helper).
recv_msgs(Count) -> recv_msgs(Count) ->