Merge pull request #5877 from zmstone/test-fix-flaky-test-case-in-broker-suite
test(emqx_broker): fix flaky tests
This commit is contained in:
commit
0120f8cf45
|
@ -28,8 +28,9 @@ exec docker run \
|
||||||
-v "$TEMPDIR:/relup_test" \
|
-v "$TEMPDIR:/relup_test" \
|
||||||
-w "/relup_test" \
|
-w "/relup_test" \
|
||||||
-e REBAR_COLOR=none \
|
-e REBAR_COLOR=none \
|
||||||
-it savonarola/emqx-relup-env:4.3 \
|
-it emqx/relup-test-env:erl23.2.7.2-emqx-2-ubuntu20.04 \
|
||||||
lux \
|
lux \
|
||||||
|
--progress verbose \
|
||||||
--case_timeout infinity \
|
--case_timeout infinity \
|
||||||
--var PROFILE="$PROFILE" \
|
--var PROFILE="$PROFILE" \
|
||||||
--var PACKAGE_PATH="/relup_test/packages" \
|
--var PACKAGE_PATH="/relup_test/packages" \
|
||||||
|
@ -37,4 +38,3 @@ exec docker run \
|
||||||
--var VSN="$VSN" \
|
--var VSN="$VSN" \
|
||||||
--var OLD_VSN="$OLD_VSN" \
|
--var OLD_VSN="$OLD_VSN" \
|
||||||
relup.lux
|
relup.lux
|
||||||
|
|
||||||
|
|
|
@ -142,11 +142,11 @@
|
||||||
???SH-PROMPT
|
???SH-PROMPT
|
||||||
|
|
||||||
[shell bench]
|
[shell bench]
|
||||||
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched"
|
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched"
|
||||||
?300
|
?300
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success"
|
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success"
|
||||||
?300
|
?300
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
|
|
|
@ -308,6 +308,7 @@ jobs:
|
||||||
timeout-minutes: 5
|
timeout-minutes: 5
|
||||||
run: |
|
run: |
|
||||||
lux \
|
lux \
|
||||||
|
--progress verbose \
|
||||||
--case_timeout infinity \
|
--case_timeout infinity \
|
||||||
--var PROFILE=$PROFILE \
|
--var PROFILE=$PROFILE \
|
||||||
--var PACKAGE_PATH=$(pwd)/packages \
|
--var PACKAGE_PATH=$(pwd)/packages \
|
||||||
|
|
|
@ -37,23 +37,35 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
init_per_testcase(Case, Config) ->
|
||||||
|
?MODULE:Case({init, Config}).
|
||||||
|
|
||||||
|
end_per_testcase(Case, Config) ->
|
||||||
|
?MODULE:Case({'end', Config}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% PubSub Test
|
%% PubSub Test
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_subscribed(_) ->
|
t_subscribed({init, Config}) ->
|
||||||
emqx_broker:subscribe(<<"topic">>),
|
emqx_broker:subscribe(<<"topic">>),
|
||||||
|
Config;
|
||||||
|
t_subscribed(Config) when is_list(Config) ->
|
||||||
?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
|
?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
|
||||||
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
|
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
|
||||||
|
t_subscribed({'end', _Config}) ->
|
||||||
emqx_broker:unsubscribe(<<"topic">>).
|
emqx_broker:unsubscribe(<<"topic">>).
|
||||||
|
|
||||||
t_subscribed_2(_) ->
|
t_subscribed_2({init, Config}) ->
|
||||||
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
||||||
%?assertEqual(true, emqx_broker:subscribed(<<"clientid">>, <<"topic">>)),
|
Config;
|
||||||
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
|
t_subscribed_2(Config) when is_list(Config) ->
|
||||||
|
?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>));
|
||||||
|
t_subscribed_2({'end', _Config}) ->
|
||||||
emqx_broker:unsubscribe(<<"topic">>).
|
emqx_broker:unsubscribe(<<"topic">>).
|
||||||
|
|
||||||
t_subopts(_) ->
|
t_subopts({init, Config}) -> Config;
|
||||||
|
t_subopts(Config) when is_list(Config) ->
|
||||||
?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
|
?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
|
||||||
?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
|
?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
|
||||||
?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
|
?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
|
||||||
|
@ -70,42 +82,54 @@ t_subopts(_) ->
|
||||||
|
|
||||||
?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})),
|
?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})),
|
||||||
?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>},
|
?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>},
|
||||||
emqx_broker:get_subopts(self(), <<"topic">>)),
|
emqx_broker:get_subopts(self(), <<"topic">>));
|
||||||
|
t_subopts({'end', _Config}) ->
|
||||||
emqx_broker:unsubscribe(<<"topic">>).
|
emqx_broker:unsubscribe(<<"topic">>).
|
||||||
|
|
||||||
t_topics(_) ->
|
t_topics({init, Config}) ->
|
||||||
Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
|
Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
|
||||||
ok = emqx_broker:subscribe(lists:nth(1, Topics), <<"clientId">>),
|
[{topics, Topics} | Config];
|
||||||
ok = emqx_broker:subscribe(lists:nth(2, Topics), <<"clientId">>),
|
t_topics(Config) when is_list(Config) ->
|
||||||
ok = emqx_broker:subscribe(lists:nth(3, Topics), <<"clientId">>),
|
Topics = [T1, T2, T3] = proplists:get_value(topics, Config),
|
||||||
|
ok = emqx_broker:subscribe(T1, <<"clientId">>),
|
||||||
|
ok = emqx_broker:subscribe(T2, <<"clientId">>),
|
||||||
|
ok = emqx_broker:subscribe(T3, <<"clientId">>),
|
||||||
Topics1 = emqx_broker:topics(),
|
Topics1 = emqx_broker:topics(),
|
||||||
?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
|
?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
|
||||||
case lists:member(Topic, Topics1) of
|
case lists:member(Topic, Topics1) of
|
||||||
true -> Acc;
|
true -> Acc;
|
||||||
false -> false
|
false -> false
|
||||||
end
|
end
|
||||||
end, true, Topics)),
|
end, true, Topics));
|
||||||
emqx_broker:unsubscribe(lists:nth(1, Topics)),
|
t_topics({'end', Config}) ->
|
||||||
emqx_broker:unsubscribe(lists:nth(2, Topics)),
|
Topics = proplists:get_value(topics, Config),
|
||||||
emqx_broker:unsubscribe(lists:nth(3, Topics)).
|
lists:foreach(fun(T) -> emqx_broker:unsubscribe(T) end, Topics).
|
||||||
|
|
||||||
t_subscribers(_) ->
|
t_subscribers({init, Config}) ->
|
||||||
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
||||||
?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)),
|
Config;
|
||||||
|
t_subscribers(Config) when is_list(Config) ->
|
||||||
|
?assertEqual([self()], emqx_broker:subscribers(<<"topic">>));
|
||||||
|
t_subscribers({'end', _Config}) ->
|
||||||
emqx_broker:unsubscribe(<<"topic">>).
|
emqx_broker:unsubscribe(<<"topic">>).
|
||||||
|
|
||||||
t_subscriptions(_) ->
|
t_subscriptions({init, Config}) ->
|
||||||
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
|
||||||
ok = timer:sleep(100),
|
Config;
|
||||||
|
t_subscriptions(Config) when is_list(Config) ->
|
||||||
|
ct:sleep(100),
|
||||||
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
||||||
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
|
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
|
||||||
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>},
|
||||||
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))),
|
proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>)));
|
||||||
|
t_subscriptions({'end', _Config}) ->
|
||||||
emqx_broker:unsubscribe(<<"topic">>).
|
emqx_broker:unsubscribe(<<"topic">>).
|
||||||
|
|
||||||
t_sub_pub(_) ->
|
t_sub_pub({init, Config}) ->
|
||||||
ok = emqx_broker:subscribe(<<"topic">>),
|
ok = emqx_broker:subscribe(<<"topic">>),
|
||||||
ct:sleep(10),
|
Config;
|
||||||
|
t_sub_pub(Config) when is_list(Config) ->
|
||||||
|
ct:sleep(100),
|
||||||
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
||||||
?assert(
|
?assert(
|
||||||
receive
|
receive
|
||||||
|
@ -115,16 +139,22 @@ t_sub_pub(_) ->
|
||||||
false
|
false
|
||||||
after 100 ->
|
after 100 ->
|
||||||
false
|
false
|
||||||
end).
|
end);
|
||||||
|
t_sub_pub({'end', _Config}) ->
|
||||||
|
ok = emqx_broker:unsubscribe(<<"topic">>).
|
||||||
|
|
||||||
t_nosub_pub(_) ->
|
t_nosub_pub({init, Config}) -> Config;
|
||||||
|
t_nosub_pub({'end', _Config}) -> ok;
|
||||||
|
t_nosub_pub(Config) when is_list(Config) ->
|
||||||
?assertEqual(0, emqx_metrics:val('messages.dropped')),
|
?assertEqual(0, emqx_metrics:val('messages.dropped')),
|
||||||
emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
||||||
?assertEqual(1, emqx_metrics:val('messages.dropped')).
|
?assertEqual(1, emqx_metrics:val('messages.dropped')).
|
||||||
|
|
||||||
t_shared_subscribe(_) ->
|
t_shared_subscribe({init, Config}) ->
|
||||||
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
|
||||||
ct:sleep(10),
|
ct:sleep(100),
|
||||||
|
Config;
|
||||||
|
t_shared_subscribe(Config) when is_list(Config) ->
|
||||||
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
||||||
?assert(receive
|
?assert(receive
|
||||||
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
|
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
|
||||||
|
@ -134,9 +164,12 @@ t_shared_subscribe(_) ->
|
||||||
false
|
false
|
||||||
after 100 ->
|
after 100 ->
|
||||||
false
|
false
|
||||||
end),
|
end);
|
||||||
|
t_shared_subscribe({'end', _Config}) ->
|
||||||
emqx_broker:unsubscribe(<<"$share/group/topic">>).
|
emqx_broker:unsubscribe(<<"$share/group/topic">>).
|
||||||
|
|
||||||
|
t_shared_subscribe_2({init, Config}) -> Config;
|
||||||
|
t_shared_subscribe_2({'end', _Config}) -> ok;
|
||||||
t_shared_subscribe_2(_) ->
|
t_shared_subscribe_2(_) ->
|
||||||
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
|
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
|
||||||
{ok, _} = emqtt:connect(ConnPid),
|
{ok, _} = emqtt:connect(ConnPid),
|
||||||
|
@ -158,6 +191,8 @@ t_shared_subscribe_2(_) ->
|
||||||
emqtt:disconnect(ConnPid),
|
emqtt:disconnect(ConnPid),
|
||||||
emqtt:disconnect(ConnPid2).
|
emqtt:disconnect(ConnPid2).
|
||||||
|
|
||||||
|
t_shared_subscribe_3({init, Config}) -> Config;
|
||||||
|
t_shared_subscribe_3({'end', _Config}) -> ok;
|
||||||
t_shared_subscribe_3(_) ->
|
t_shared_subscribe_3(_) ->
|
||||||
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
|
{ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
|
||||||
{ok, _} = emqtt:connect(ConnPid),
|
{ok, _} = emqtt:connect(ConnPid),
|
||||||
|
@ -174,11 +209,13 @@ t_shared_subscribe_3(_) ->
|
||||||
emqtt:disconnect(ConnPid),
|
emqtt:disconnect(ConnPid),
|
||||||
emqtt:disconnect(ConnPid2).
|
emqtt:disconnect(ConnPid2).
|
||||||
|
|
||||||
t_shard(_) ->
|
t_shard({init, Config}) ->
|
||||||
ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
|
ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
|
||||||
ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
|
ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
|
||||||
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
||||||
ct:sleep(10),
|
Config;
|
||||||
|
t_shard(Config) when is_list(Config) ->
|
||||||
|
ct:sleep(100),
|
||||||
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
||||||
?assert(
|
?assert(
|
||||||
receive
|
receive
|
||||||
|
@ -188,23 +225,57 @@ t_shard(_) ->
|
||||||
false
|
false
|
||||||
after 100 ->
|
after 100 ->
|
||||||
false
|
false
|
||||||
end),
|
end);
|
||||||
|
t_shard({'end', _Config}) ->
|
||||||
|
emqx_broker:unsubscribe(<<"topic">>),
|
||||||
ok = meck:unload(emqx_broker_helper).
|
ok = meck:unload(emqx_broker_helper).
|
||||||
|
|
||||||
t_stats_fun(_) ->
|
t_stats_fun({init, Config}) ->
|
||||||
N = emqx_stats:getstat('subscribers.count'),
|
Parent = self(),
|
||||||
N = emqx_stats:getstat('subscriptions.count'),
|
F = fun Loop() ->
|
||||||
N = emqx_stats:getstat('suboptions.count'),
|
N1 = emqx_stats:getstat('subscribers.count'),
|
||||||
|
N2 = emqx_stats:getstat('subscriptions.count'),
|
||||||
|
N3 = emqx_stats:getstat('suboptions.count'),
|
||||||
|
case N1 + N2 + N3 =:= 0 of
|
||||||
|
true ->
|
||||||
|
Parent ! {ready, self()},
|
||||||
|
exit(normal);
|
||||||
|
false ->
|
||||||
|
receive
|
||||||
|
stop ->
|
||||||
|
exit(normal)
|
||||||
|
after
|
||||||
|
100 ->
|
||||||
|
Loop()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
Pid = spawn_link(F),
|
||||||
|
receive
|
||||||
|
{ready, P} when P =:= Pid->
|
||||||
|
Config
|
||||||
|
after
|
||||||
|
5000 ->
|
||||||
|
Pid ! stop,
|
||||||
|
ct:fail("timedout_waiting_for_sub_stats_to_reach_zero")
|
||||||
|
end;
|
||||||
|
t_stats_fun(Config) when is_list(Config) ->
|
||||||
ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
||||||
ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
|
ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
|
||||||
|
%% ensure stats refreshed
|
||||||
emqx_broker:stats_fun(),
|
emqx_broker:stats_fun(),
|
||||||
ct:sleep(10),
|
%% emqx_stats:set_stat is a gen_server cast
|
||||||
?assertEqual(N + 2, emqx_stats:getstat('subscribers.count')),
|
%% make a synced call sync
|
||||||
?assertEqual(N + 2, emqx_stats:getstat('subscribers.max')),
|
ignored = gen_server:call(emqx_stats, call, infinity),
|
||||||
?assertEqual(N + 2, emqx_stats:getstat('subscriptions.count')),
|
?assertEqual(2, emqx_stats:getstat('subscribers.count')),
|
||||||
?assertEqual(N + 2, emqx_stats:getstat('subscriptions.max')),
|
?assertEqual(2, emqx_stats:getstat('subscribers.max')),
|
||||||
?assertEqual(N + 2, emqx_stats:getstat('suboptions.count')),
|
?assertEqual(2, emqx_stats:getstat('subscriptions.count')),
|
||||||
?assertEqual(N + 2, emqx_stats:getstat('suboptions.max')).
|
?assertEqual(2, emqx_stats:getstat('subscriptions.max')),
|
||||||
|
?assertEqual(2, emqx_stats:getstat('suboptions.count')),
|
||||||
|
?assertEqual(2, emqx_stats:getstat('suboptions.max'));
|
||||||
|
t_stats_fun({'end', _Config}) ->
|
||||||
|
ok = emqx_broker:unsubscribe(<<"topic">>),
|
||||||
|
ok = emqx_broker:unsubscribe(<<"topic2">>).
|
||||||
|
|
||||||
recv_msgs(Count) ->
|
recv_msgs(Count) ->
|
||||||
recv_msgs(Count, []).
|
recv_msgs(Count, []).
|
||||||
|
|
Loading…
Reference in New Issue