test: fix more flaky test in share sub SUITE
This commit is contained in:
parent
c39116c7a5
commit
2e28d5e73e
|
@ -30,23 +30,44 @@ start_slave(Name) ->
|
||||||
start_slave(Name, #{}).
|
start_slave(Name, #{}).
|
||||||
|
|
||||||
start_slave(Name, Opts) ->
|
start_slave(Name, Opts) ->
|
||||||
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
|
Node = make_node_name(Name),
|
||||||
[{kill_if_fail, true},
|
case ct_slave:start(Node, [{kill_if_fail, true},
|
||||||
{monitor_master, true},
|
{monitor_master, true},
|
||||||
{init_timeout, 10000},
|
{init_timeout, 10000},
|
||||||
{startup_timeout, 10000},
|
{startup_timeout, 10000},
|
||||||
{erl_flags, ebin_path()}]),
|
{erl_flags, ebin_path()}]) of
|
||||||
|
{ok, _} ->
|
||||||
|
ok;
|
||||||
|
{error, started_not_connected, _} ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
pong = net_adm:ping(Node),
|
pong = net_adm:ping(Node),
|
||||||
setup_node(Node, Opts),
|
setup_node(Node, Opts),
|
||||||
Node.
|
Node.
|
||||||
|
|
||||||
stop_slave(Node) ->
|
make_node_name(Name) ->
|
||||||
rpc:call(Node, ekka, leave, []),
|
case string:tokens(atom_to_list(Name), "@") of
|
||||||
ct_slave:stop(Node).
|
[_Name, _Host] ->
|
||||||
|
%% the name already has a @
|
||||||
|
Name;
|
||||||
|
_ ->
|
||||||
|
list_to_atom(atom_to_list(Name) ++ "@" ++ host())
|
||||||
|
end.
|
||||||
|
|
||||||
|
stop_slave(Node0) ->
|
||||||
|
Node = make_node_name(Node0),
|
||||||
|
case rpc:call(Node, ekka, leave, []) of
|
||||||
|
ok -> ok;
|
||||||
|
{badrpc, nodedown} -> ok
|
||||||
|
end,
|
||||||
|
case ct_slave:stop(Node) of
|
||||||
|
{ok, _} -> ok;
|
||||||
|
{error, not_started, _} -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
host() ->
|
host() ->
|
||||||
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
|
[_, Host] = string:tokens(atom_to_list(node()), "@"),
|
||||||
|
Host.
|
||||||
|
|
||||||
ebin_path() ->
|
ebin_path() ->
|
||||||
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
||||||
|
@ -70,10 +91,10 @@ setup_node(Node, #{} = Opts) ->
|
||||||
end,
|
end,
|
||||||
EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler),
|
EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler),
|
||||||
|
|
||||||
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
|
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_modules, emqx]],
|
||||||
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
|
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
|
||||||
|
|
||||||
rpc:call(Node, ekka, join, [node()]),
|
ok = rpc:call(Node, ekka, join, [node()]),
|
||||||
|
|
||||||
%% Sanity check. Assert that `gen_rpc' is set up correctly:
|
%% Sanity check. Assert that `gen_rpc' is set up correctly:
|
||||||
?assertEqual( Node
|
?assertEqual( Node
|
||||||
|
|
|
@ -40,6 +40,9 @@ init_per_suite(Config) ->
|
||||||
PortDiscovery = application:get_env(gen_rpc, port_discovery),
|
PortDiscovery = application:get_env(gen_rpc, port_discovery),
|
||||||
application:set_env(gen_rpc, port_discovery, stateless),
|
application:set_env(gen_rpc, port_discovery, stateless),
|
||||||
application:ensure_all_started(gen_rpc),
|
application:ensure_all_started(gen_rpc),
|
||||||
|
%% ensure emqx_moduels' app modules are loaded
|
||||||
|
%% so the mnesia tables are created
|
||||||
|
ok = load_app(emqx_modules),
|
||||||
emqx_ct_helpers:start_apps([]),
|
emqx_ct_helpers:start_apps([]),
|
||||||
[{port_discovery, PortDiscovery} | Config].
|
[{port_discovery, PortDiscovery} | Config].
|
||||||
|
|
||||||
|
@ -50,32 +53,45 @@ end_per_suite(Config) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
t_is_ack_required(_) ->
|
init_per_testcase(Case, Config) ->
|
||||||
|
try
|
||||||
|
?MODULE:Case({'init', Config})
|
||||||
|
catch
|
||||||
|
error : function_clause ->
|
||||||
|
Config
|
||||||
|
end.
|
||||||
|
|
||||||
|
end_per_testcase(Case, Config) ->
|
||||||
|
try
|
||||||
|
?MODULE:Case({'end', Config})
|
||||||
|
catch
|
||||||
|
error : function_clause ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
t_is_ack_required(Config) when is_list(Config) ->
|
||||||
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
||||||
|
|
||||||
t_maybe_nack_dropped(_) ->
|
t_maybe_nack_dropped(Config) when is_list(Config) ->
|
||||||
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||||
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
||||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
|
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
|
||||||
|
|
||||||
t_nack_no_connection(_) ->
|
t_nack_no_connection(Config) when is_list(Config) ->
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||||
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
||||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
|
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
|
||||||
after 100 -> timeout end).
|
after 100 -> timeout end).
|
||||||
|
|
||||||
t_maybe_ack(_) ->
|
t_maybe_ack(Config) when is_list(Config) ->
|
||||||
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||||
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
||||||
emqx_shared_sub:maybe_ack(Msg)),
|
emqx_shared_sub:maybe_ack(Msg)),
|
||||||
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
|
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
|
||||||
|
|
||||||
% t_subscribers(_) ->
|
t_random_basic(Config) when is_list(Config) ->
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
t_random_basic(_) ->
|
|
||||||
ok = ensure_config(random),
|
ok = ensure_config(random),
|
||||||
ClientId = <<"ClientId">>,
|
ClientId = <<"ClientId">>,
|
||||||
Topic = <<"foo">>,
|
Topic = <<"foo">>,
|
||||||
|
@ -105,7 +121,7 @@ t_random_basic(_) ->
|
||||||
%% After the connection for the 2nd session is also closed,
|
%% After the connection for the 2nd session is also closed,
|
||||||
%% i.e. when all clients are offline, the following message(s)
|
%% i.e. when all clients are offline, the following message(s)
|
||||||
%% should be delivered randomly.
|
%% should be delivered randomly.
|
||||||
t_no_connection_nack(_) ->
|
t_no_connection_nack(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(sticky),
|
ok = ensure_config(sticky),
|
||||||
Publisher = <<"publisher">>,
|
Publisher = <<"publisher">>,
|
||||||
Subscriber1 = <<"Subscriber1">>,
|
Subscriber1 = <<"Subscriber1">>,
|
||||||
|
@ -171,27 +187,27 @@ t_no_connection_nack(_) ->
|
||||||
% emqx_sm:close_session(SPid2),
|
% emqx_sm:close_session(SPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_random(_) ->
|
t_random(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(random, true),
|
ok = ensure_config(random, true),
|
||||||
test_two_messages(random).
|
test_two_messages(random).
|
||||||
|
|
||||||
t_round_robin(_) ->
|
t_round_robin(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(round_robin, true),
|
ok = ensure_config(round_robin, true),
|
||||||
test_two_messages(round_robin).
|
test_two_messages(round_robin).
|
||||||
|
|
||||||
t_sticky(_) ->
|
t_sticky(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(sticky, true),
|
ok = ensure_config(sticky, true),
|
||||||
test_two_messages(sticky).
|
test_two_messages(sticky).
|
||||||
|
|
||||||
t_hash(_) ->
|
t_hash(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(hash, false),
|
ok = ensure_config(hash, false),
|
||||||
test_two_messages(hash).
|
test_two_messages(hash).
|
||||||
|
|
||||||
t_hash_clinetid(_) ->
|
t_hash_clinetid(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(hash_clientid, false),
|
ok = ensure_config(hash_clientid, false),
|
||||||
test_two_messages(hash_clientid).
|
test_two_messages(hash_clientid).
|
||||||
|
|
||||||
t_hash_topic(_) ->
|
t_hash_topic(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(hash_topic, false),
|
ok = ensure_config(hash_topic, false),
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
@ -230,7 +246,7 @@ t_hash_topic(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% if the original subscriber dies, change to another one alive
|
%% if the original subscriber dies, change to another one alive
|
||||||
t_not_so_sticky(_) ->
|
t_not_so_sticky(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(sticky),
|
ok = ensure_config(sticky),
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
@ -303,7 +319,7 @@ last_message(ExpectedPayload, Pids, Timeout) ->
|
||||||
<<"not yet?">>
|
<<"not yet?">>
|
||||||
end.
|
end.
|
||||||
|
|
||||||
t_dispatch(_) ->
|
t_dispatch(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(random),
|
ok = ensure_config(random),
|
||||||
Topic = <<"foo">>,
|
Topic = <<"foo">>,
|
||||||
?assertEqual({error, no_subscribers},
|
?assertEqual({error, no_subscribers},
|
||||||
|
@ -312,18 +328,13 @@ t_dispatch(_) ->
|
||||||
?assertEqual({ok, 1},
|
?assertEqual({ok, 1},
|
||||||
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
|
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
|
||||||
|
|
||||||
% t_unsubscribe(_) ->
|
t_uncovered_func(Config) when is_list(Config) ->
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
% t_subscribe(_) ->
|
|
||||||
% error('TODO').
|
|
||||||
t_uncovered_func(_) ->
|
|
||||||
ignored = gen_server:call(emqx_shared_sub, ignored),
|
ignored = gen_server:call(emqx_shared_sub, ignored),
|
||||||
ok = gen_server:cast(emqx_shared_sub, ignored),
|
ok = gen_server:cast(emqx_shared_sub, ignored),
|
||||||
ignored = emqx_shared_sub ! ignored,
|
ignored = emqx_shared_sub ! ignored,
|
||||||
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
|
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
|
||||||
|
|
||||||
t_per_group_config(_) ->
|
t_per_group_config(Config) when is_list(Config) ->
|
||||||
ok = ensure_group_config(#{
|
ok = ensure_group_config(#{
|
||||||
<<"local_group_fallback">> => local,
|
<<"local_group_fallback">> => local,
|
||||||
<<"local_group">> => local,
|
<<"local_group">> => local,
|
||||||
|
@ -342,8 +353,8 @@ t_per_group_config(_) ->
|
||||||
test_two_messages(round_robin, <<"round_robin_group">>),
|
test_two_messages(round_robin, <<"round_robin_group">>),
|
||||||
test_two_messages(round_robin, <<"round_robin_group">>).
|
test_two_messages(round_robin, <<"round_robin_group">>).
|
||||||
|
|
||||||
t_local(_) ->
|
t_local({'init', Config}) ->
|
||||||
Node = start_slave('local_shared_sub_test19', 21884),
|
Node = start_slave(local_shared_sub_test19, 21884),
|
||||||
GroupConfig = #{
|
GroupConfig = #{
|
||||||
<<"local_group_fallback">> => local,
|
<<"local_group_fallback">> => local,
|
||||||
<<"local_group">> => local,
|
<<"local_group">> => local,
|
||||||
|
@ -352,7 +363,11 @@ t_local(_) ->
|
||||||
},
|
},
|
||||||
ok = ensure_group_config(Node, GroupConfig),
|
ok = ensure_group_config(Node, GroupConfig),
|
||||||
ok = ensure_group_config(GroupConfig),
|
ok = ensure_group_config(GroupConfig),
|
||||||
|
[{slave_node, Node} | Config];
|
||||||
|
t_local({'end', _Config}) ->
|
||||||
|
ok = stop_slave(local_shared_sub_test19);
|
||||||
|
t_local(Config) when is_list(Config) ->
|
||||||
|
Node = proplists:get_value(slave_node, Config),
|
||||||
Topic = <<"local_foo1/bar">>,
|
Topic = <<"local_foo1/bar">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
@ -388,7 +403,7 @@ t_local(_) ->
|
||||||
?assertNotEqual(UsedSubPid1, UsedSubPid2),
|
?assertNotEqual(UsedSubPid1, UsedSubPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_local_fallback(_) ->
|
t_local_fallback({'init', Config}) ->
|
||||||
ok = ensure_group_config(#{
|
ok = ensure_group_config(#{
|
||||||
<<"local_group_fallback">> => local,
|
<<"local_group_fallback">> => local,
|
||||||
<<"local_group">> => local,
|
<<"local_group">> => local,
|
||||||
|
@ -396,10 +411,15 @@ t_local_fallback(_) ->
|
||||||
<<"sticky_group">> => sticky
|
<<"sticky_group">> => sticky
|
||||||
}),
|
}),
|
||||||
|
|
||||||
|
Node = start_slave(local_fallback_shared_sub_test19, 11885),
|
||||||
|
[{slave_node, Node} | Config];
|
||||||
|
t_local_fallback({'end', _}) ->
|
||||||
|
ok = stop_slave(local_fallback_shared_sub_test19);
|
||||||
|
t_local_fallback(Config) when is_list(Config) ->
|
||||||
Topic = <<"local_foo2/bar">>,
|
Topic = <<"local_foo2/bar">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
Node = start_slave('local_fallback_shared_sub_test19', 11885),
|
Node = proplists:get_value(slave_node, Config),
|
||||||
|
|
||||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||||
{ok, _} = emqtt:connect(ConnPid1),
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
@ -423,7 +443,7 @@ t_local_fallback(_) ->
|
||||||
|
|
||||||
%% This one tests that broker tries to select another shared subscriber
|
%% This one tests that broker tries to select another shared subscriber
|
||||||
%% If the first one doesn't return an ACK
|
%% If the first one doesn't return an ACK
|
||||||
t_redispatch(_) ->
|
t_redispatch(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(sticky, true),
|
ok = ensure_config(sticky, true),
|
||||||
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
||||||
|
|
||||||
|
@ -454,7 +474,7 @@ t_redispatch(_) ->
|
||||||
emqtt:stop(UsedSubPid2),
|
emqtt:stop(UsedSubPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_dispatch_when_inflights_are_full(_) ->
|
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(round_robin, true),
|
ok = ensure_config(round_robin, true),
|
||||||
Topic = <<"foo/bar">>,
|
Topic = <<"foo/bar">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
|
@ -537,8 +557,20 @@ recv_msgs(Count, Msgs) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_slave(Name, Port) ->
|
start_slave(Name, Port) ->
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_modules]),
|
||||||
Listeners = [#{listen_on => {{127,0,0,1}, Port},
|
Listeners = [#{listen_on => {{127,0,0,1}, Port},
|
||||||
|
start_apps => [emqx, emqx_modules],
|
||||||
name => "internal",
|
name => "internal",
|
||||||
opts => [{zone,internal}],
|
opts => [{zone,internal}],
|
||||||
proto => tcp}],
|
proto => tcp}],
|
||||||
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).
|
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).
|
||||||
|
|
||||||
|
stop_slave(Name) ->
|
||||||
|
emqx_node_helpers:stop_slave(Name).
|
||||||
|
|
||||||
|
load_app(App) ->
|
||||||
|
case application:load(App) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, {already_loaded, _}} -> ok;
|
||||||
|
{error, Reason} -> error({failed_to_load_app, App, Reason})
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue