Merge pull request #9088 from zmstone/1003-fix-more-flaky-test

test: fix more flaky test in share sub SUITE
This commit is contained in:
Zaiming (Stone) Shi 2022-10-03 18:15:50 +02:00 committed by GitHub
commit bea046d5b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 44 deletions

View File

@ -18,7 +18,7 @@
-include_lib("eunit/include/eunit.hrl").
-define(SLAVE_START_APPS, [emqx]).
-define(SLAVE_START_APPS, [emqx, emqx_modules]).
-export([start_slave/1,
start_slave/2,
@ -30,23 +30,44 @@ start_slave(Name) ->
start_slave(Name, #{}).
start_slave(Name, Opts) ->
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
[{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]),
Node = make_node_name(Name),
case ct_slave:start(Node, [{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]) of
{ok, _} ->
ok;
{error, started_not_connected, _} ->
ok
end,
pong = net_adm:ping(Node),
setup_node(Node, Opts),
Node.
stop_slave(Node) ->
rpc:call(Node, ekka, leave, []),
ct_slave:stop(Node).
make_node_name(Name) ->
case string:tokens(atom_to_list(Name), "@") of
[_Name, _Host] ->
%% the name already has a @
Name;
_ ->
list_to_atom(atom_to_list(Name) ++ "@" ++ host())
end.
stop_slave(Node0) ->
Node = make_node_name(Node0),
case rpc:call(Node, ekka, leave, []) of
ok -> ok;
{badrpc, nodedown} -> ok
end,
case ct_slave:stop(Node) of
{ok, _} -> ok;
{error, not_started, _} -> ok
end.
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
@ -73,7 +94,7 @@ setup_node(Node, #{} = Opts) ->
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
rpc:call(Node, ekka, join, [node()]),
ok = rpc:call(Node, ekka, join, [node()]),
%% Sanity check. Assert that `gen_rpc' is set up correctly:
?assertEqual( Node

View File

@ -40,6 +40,9 @@ init_per_suite(Config) ->
PortDiscovery = application:get_env(gen_rpc, port_discovery),
application:set_env(gen_rpc, port_discovery, stateless),
application:ensure_all_started(gen_rpc),
%% ensure emqx_moduels' app modules are loaded
%% so the mnesia tables are created
ok = load_app(emqx_modules),
emqx_ct_helpers:start_apps([]),
[{port_discovery, PortDiscovery} | Config].
@ -50,32 +53,45 @@ end_per_suite(Config) ->
_ -> ok
end.
t_is_ack_required(_) ->
init_per_testcase(Case, Config) ->
try
?MODULE:Case({'init', Config})
catch
error : function_clause ->
Config
end.
end_per_testcase(Case, Config) ->
try
?MODULE:Case({'end', Config})
catch
error : function_clause ->
ok
end.
t_is_ack_required(Config) when is_list(Config) ->
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
t_maybe_nack_dropped(_) ->
t_maybe_nack_dropped(Config) when is_list(Config) ->
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
t_nack_no_connection(_) ->
t_nack_no_connection(Config) when is_list(Config) ->
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
after 100 -> timeout end).
t_maybe_ack(_) ->
t_maybe_ack(Config) when is_list(Config) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
emqx_shared_sub:maybe_ack(Msg)),
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
% t_subscribers(_) ->
% error('TODO').
t_random_basic(_) ->
t_random_basic(Config) when is_list(Config) ->
ok = ensure_config(random),
ClientId = <<"ClientId">>,
Topic = <<"foo">>,
@ -105,7 +121,7 @@ t_random_basic(_) ->
%% After the connection for the 2nd session is also closed,
%% i.e. when all clients are offline, the following message(s)
%% should be delivered randomly.
t_no_connection_nack(_) ->
t_no_connection_nack(Config) when is_list(Config) ->
ok = ensure_config(sticky),
Publisher = <<"publisher">>,
Subscriber1 = <<"Subscriber1">>,
@ -171,27 +187,27 @@ t_no_connection_nack(_) ->
% emqx_sm:close_session(SPid2),
ok.
t_random(_) ->
t_random(Config) when is_list(Config) ->
ok = ensure_config(random, true),
test_two_messages(random).
t_round_robin(_) ->
t_round_robin(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true),
test_two_messages(round_robin).
t_sticky(_) ->
t_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky, true),
test_two_messages(sticky).
t_hash(_) ->
t_hash(Config) when is_list(Config) ->
ok = ensure_config(hash, false),
test_two_messages(hash).
t_hash_clinetid(_) ->
t_hash_clinetid(Config) when is_list(Config) ->
ok = ensure_config(hash_clientid, false),
test_two_messages(hash_clientid).
t_hash_topic(_) ->
t_hash_topic(Config) when is_list(Config) ->
ok = ensure_config(hash_topic, false),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -230,7 +246,7 @@ t_hash_topic(_) ->
ok.
%% if the original subscriber dies, change to another one alive
t_not_so_sticky(_) ->
t_not_so_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -303,7 +319,7 @@ last_message(ExpectedPayload, Pids, Timeout) ->
<<"not yet?">>
end.
t_dispatch(_) ->
t_dispatch(Config) when is_list(Config) ->
ok = ensure_config(random),
Topic = <<"foo">>,
?assertEqual({error, no_subscribers},
@ -312,18 +328,13 @@ t_dispatch(_) ->
?assertEqual({ok, 1},
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
% t_unsubscribe(_) ->
% error('TODO').
% t_subscribe(_) ->
% error('TODO').
t_uncovered_func(_) ->
t_uncovered_func(Config) when is_list(Config) ->
ignored = gen_server:call(emqx_shared_sub, ignored),
ok = gen_server:cast(emqx_shared_sub, ignored),
ignored = emqx_shared_sub ! ignored,
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
t_per_group_config(_) ->
t_per_group_config(Config) when is_list(Config) ->
ok = ensure_group_config(#{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
@ -342,8 +353,8 @@ t_per_group_config(_) ->
test_two_messages(round_robin, <<"round_robin_group">>),
test_two_messages(round_robin, <<"round_robin_group">>).
t_local(_) ->
Node = start_slave('local_shared_sub_test19', 21884),
t_local({'init', Config}) ->
Node = start_slave(local_shared_sub_test19, 21884),
GroupConfig = #{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
@ -352,7 +363,11 @@ t_local(_) ->
},
ok = ensure_group_config(Node, GroupConfig),
ok = ensure_group_config(GroupConfig),
[{slave_node, Node} | Config];
t_local({'end', _Config}) ->
ok = stop_slave(local_shared_sub_test19);
t_local(Config) when is_list(Config) ->
Node = proplists:get_value(slave_node, Config),
Topic = <<"local_foo1/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -388,7 +403,7 @@ t_local(_) ->
?assertNotEqual(UsedSubPid1, UsedSubPid2),
ok.
t_local_fallback(_) ->
t_local_fallback({'init', Config}) ->
ok = ensure_group_config(#{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
@ -396,10 +411,15 @@ t_local_fallback(_) ->
<<"sticky_group">> => sticky
}),
Node = start_slave(local_fallback_shared_sub_test19, 11885),
[{slave_node, Node} | Config];
t_local_fallback({'end', _}) ->
ok = stop_slave(local_fallback_shared_sub_test19);
t_local_fallback(Config) when is_list(Config) ->
Topic = <<"local_foo2/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
Node = start_slave('local_fallback_shared_sub_test19', 11885),
Node = proplists:get_value(slave_node, Config),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, _} = emqtt:connect(ConnPid1),
@ -423,7 +443,7 @@ t_local_fallback(_) ->
%% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK
t_redispatch(_) ->
t_redispatch(Config) when is_list(Config) ->
ok = ensure_config(sticky, true),
application:set_env(emqx, shared_dispatch_ack_enabled, true),
@ -454,7 +474,7 @@ t_redispatch(_) ->
emqtt:stop(UsedSubPid2),
ok.
t_dispatch_when_inflights_are_full(_) ->
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true),
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
@ -537,8 +557,20 @@ recv_msgs(Count, Msgs) ->
end.
start_slave(Name, Port) ->
ok = emqx_ct_helpers:start_apps([emqx_modules]),
Listeners = [#{listen_on => {{127,0,0,1}, Port},
start_apps => [emqx, emqx_modules],
name => "internal",
opts => [{zone,internal}],
proto => tcp}],
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).
stop_slave(Name) ->
emqx_node_helpers:stop_slave(Name).
load_app(App) ->
case application:load(App) of
ok -> ok;
{error, {already_loaded, _}} -> ok;
{error, Reason} -> error({failed_to_load_app, App, Reason})
end.