test: refactor emqx_takeover_SUITE

This commit is contained in:
Zaiming (Stone) Shi 2022-11-02 21:04:22 +01:00
parent 7d3c7e1cc3
commit 0ab5ac95bb
2 changed files with 66 additions and 25 deletions

View File

@ -51,7 +51,7 @@ init_per_suite(Config) ->
PortDiscovery = application:get_env(gen_rpc, port_discovery), PortDiscovery = application:get_env(gen_rpc, port_discovery),
application:set_env(gen_rpc, port_discovery, stateless), application:set_env(gen_rpc, port_discovery, stateless),
application:ensure_all_started(gen_rpc), application:ensure_all_started(gen_rpc),
%% ensure emqx_moduels' app modules are loaded %% ensure emqx_modules app modules are loaded
%% so the mnesia tables are created %% so the mnesia tables are created
ok = load_app(emqx_modules), ok = load_app(emqx_modules),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),

View File

@ -32,29 +32,43 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx]), emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx]), emqx_ct_helpers:stop_apps([]),
ok. ok.
init_per_testcase(Case, Config) ->
?MODULE:Case({'init', Config}).
end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Testcases %% Testcases
t_takeover(_) -> t_takeover({init, Config}) when is_list(Config) ->
process_flag(trap_exit, true), Config;
t_takeover({'end', Config}) when is_list(Config) ->
ok;
t_takeover(Config) when is_list(Config) ->
AllMsgs = messages(?CNT), AllMsgs = messages(?CNT),
Pos = rand:uniform(?CNT), Pos = rand:uniform(?CNT),
ClientId = <<"clientid">>, ClientId = random_clientid(),
ClientOpts = [{clientid, ClientId},
{clean_start, false},
{host, "127.0.0.1"},
{port, 1883}
],
C1 = C1 =
with_retry( with_retry(
fun() -> fun() ->
{ok, C} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), {ok, C} = emqtt:start_link(ClientOpts),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
C C
end, 5), end, 5),
emqtt:subscribe(C1, <<"t">>, 1), emqtt:subscribe(C1, <<"t">>, 1),
spawn(fun() -> spawn(fun() ->
[begin [begin
emqx:publish(lists:nth(I, AllMsgs)), emqx:publish(lists:nth(I, AllMsgs)),
@ -63,28 +77,55 @@ t_takeover(_) ->
end), end),
emqtt:pause(C1), emqtt:pause(C1),
timer:sleep(?CNT*10), timer:sleep(?CNT*10),
load_meck(ClientId), load_meck(ClientId),
spawn(fun() -> try
[begin spawn(fun() ->
emqx:publish(lists:nth(I, AllMsgs)), [begin
timer:sleep(rand:uniform(10)) emqx:publish(lists:nth(I, AllMsgs)),
end || I <- lists:seq(Pos+1, ?CNT)] timer:sleep(rand:uniform(10))
end), end || I <- lists:seq(Pos+1, ?CNT)]
{ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), end),
{ok, _} = emqtt:connect(C2), {ok, C2} = emqtt:start_link(ClientOpts),
%% C1 is going down, unlink it so the test can continue to run
Received = all_received_publishs(), _ = monitor(process, C1),
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), ?assert(erlang:is_process_alive(C1)),
assert_messages_missed(AllMsgs, Received), unlink(C1),
assert_messages_order(AllMsgs, Received), {ok, _} = emqtt:connect(C2),
receive
emqtt:disconnect(C2), {'DOWN', _, process, C1, _} ->
unload_meck(ClientId). ok
after 1000 ->
ct:fail("timedout_waiting_for_old_connection_shutdown")
end,
Received = all_received_publishs(),
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
assert_messages_missed(AllMsgs, Received),
assert_messages_order(AllMsgs, Received),
kill_process(C2, fun emqtt:stop/1)
after
unload_meck(ClientId)
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
random_clientid() ->
iolist_to_binary(["clientid", "-", integer_to_list(erlang:system_time())]).
kill_process(Pid, WithFun) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
try WithFun(Pid)
catch _:_ -> ok
end,
receive
{'DOWN', _, process, Pid, _} ->
ok
after 10_000 ->
exit(Pid, kill),
error(timeout)
end.
with_retry(Fun, 1) -> Fun(); with_retry(Fun, 1) -> Fun();
with_retry(Fun, N) when N > 1 -> with_retry(Fun, N) when N > 1 ->
try try