diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 2112f0b8c..897a59cb6 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -51,7 +51,7 @@ init_per_suite(Config) -> PortDiscovery = application:get_env(gen_rpc, port_discovery), application:set_env(gen_rpc, port_discovery, stateless), application:ensure_all_started(gen_rpc), - %% ensure emqx_moduels' app modules are loaded + %% ensure emqx_modules app modules are loaded %% so the mnesia tables are created ok = load_app(emqx_modules), emqx_ct_helpers:start_apps([]), diff --git a/test/emqx_takeover_SUITE.erl b/test/emqx_takeover_SUITE.erl index 882967ab3..c9e84bc1c 100644 --- a/test/emqx_takeover_SUITE.erl +++ b/test/emqx_takeover_SUITE.erl @@ -32,29 +32,43 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx]), + emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx]), + emqx_ct_helpers:stop_apps([]), ok. + +init_per_testcase(Case, Config) -> + ?MODULE:Case({'init', Config}). + +end_per_testcase(Case, Config) -> + ?MODULE:Case({'end', Config}). + %%-------------------------------------------------------------------- %% Testcases -t_takeover(_) -> - process_flag(trap_exit, true), +t_takeover({init, Config}) when is_list(Config) -> + Config; +t_takeover({'end', Config}) when is_list(Config) -> + ok; +t_takeover(Config) when is_list(Config) -> AllMsgs = messages(?CNT), Pos = rand:uniform(?CNT), - ClientId = <<"clientid">>, + ClientId = random_clientid(), + ClientOpts = [{clientid, ClientId}, + {clean_start, false}, + {host, "127.0.0.1"}, + {port, 1883} + ], C1 = with_retry( fun() -> - {ok, C} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), + {ok, C} = emqtt:start_link(ClientOpts), {ok, _} = emqtt:connect(C), C end, 5), emqtt:subscribe(C1, <<"t">>, 1), - spawn(fun() -> [begin emqx:publish(lists:nth(I, AllMsgs)), @@ -63,28 +77,55 @@ t_takeover(_) -> end), emqtt:pause(C1), timer:sleep(?CNT*10), - load_meck(ClientId), - spawn(fun() -> - [begin - emqx:publish(lists:nth(I, AllMsgs)), - timer:sleep(rand:uniform(10)) - end || I <- lists:seq(Pos+1, ?CNT)] - end), - {ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), - {ok, _} = emqtt:connect(C2), - - Received = all_received_publishs(), - ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), - assert_messages_missed(AllMsgs, Received), - assert_messages_order(AllMsgs, Received), - - emqtt:disconnect(C2), - unload_meck(ClientId). + try + spawn(fun() -> + [begin + emqx:publish(lists:nth(I, AllMsgs)), + timer:sleep(rand:uniform(10)) + end || I <- lists:seq(Pos+1, ?CNT)] + end), + {ok, C2} = emqtt:start_link(ClientOpts), + %% C1 is going down, unlink it so the test can continue to run + _ = monitor(process, C1), + ?assert(erlang:is_process_alive(C1)), + unlink(C1), + {ok, _} = emqtt:connect(C2), + receive + {'DOWN', _, process, C1, _} -> + ok + after 1000 -> + ct:fail("timedout_waiting_for_old_connection_shutdown") + end, + Received = all_received_publishs(), + ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), + assert_messages_missed(AllMsgs, Received), + assert_messages_order(AllMsgs, Received), + kill_process(C2, fun emqtt:stop/1) + after + unload_meck(ClientId) + end. %%-------------------------------------------------------------------- %% Helpers +random_clientid() -> + iolist_to_binary(["clientid", "-", integer_to_list(erlang:system_time())]). + +kill_process(Pid, WithFun) -> + _ = unlink(Pid), + _ = monitor(process, Pid), + try WithFun(Pid) + catch _:_ -> ok + end, + receive + {'DOWN', _, process, Pid, _} -> + ok + after 10_000 -> + exit(Pid, kill), + error(timeout) + end. + with_retry(Fun, 1) -> Fun(); with_retry(Fun, N) when N > 1 -> try