From 2e28d5e73ed0f8b051413aaf3bd8f0cff60865fc Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Oct 2022 13:00:41 +0200 Subject: [PATCH 1/2] test: fix more flaky test in share sub SUITE --- test/emqx_node_helpers.erl | 47 ++++++++++++----- test/emqx_shared_sub_SUITE.erl | 94 +++++++++++++++++++++++----------- 2 files changed, 97 insertions(+), 44 deletions(-) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index b516e3752..d68fee62e 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -30,23 +30,44 @@ start_slave(Name) -> start_slave(Name, #{}). start_slave(Name, Opts) -> - {ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()), - [{kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 10000}, - {startup_timeout, 10000}, - {erl_flags, ebin_path()}]), - + Node = make_node_name(Name), + case ct_slave:start(Node, [{kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, ebin_path()}]) of + {ok, _} -> + ok; + {error, started_not_connected, _} -> + ok + end, pong = net_adm:ping(Node), setup_node(Node, Opts), Node. -stop_slave(Node) -> - rpc:call(Node, ekka, leave, []), - ct_slave:stop(Node). +make_node_name(Name) -> + case string:tokens(atom_to_list(Name), "@") of + [_Name, _Host] -> + %% the name already has a @ + Name; + _ -> + list_to_atom(atom_to_list(Name) ++ "@" ++ host()) + end. + +stop_slave(Node0) -> + Node = make_node_name(Node0), + case rpc:call(Node, ekka, leave, []) of + ok -> ok; + {badrpc, nodedown} -> ok + end, + case ct_slave:stop(Node) of + {ok, _} -> ok; + {error, not_started, _} -> ok + end. host() -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. ebin_path() -> string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). @@ -70,10 +91,10 @@ setup_node(Node, #{} = Opts) -> end, EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler), - [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_modules, emqx]], ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]), - rpc:call(Node, ekka, join, [node()]), + ok = rpc:call(Node, ekka, join, [node()]), %% Sanity check. Assert that `gen_rpc' is set up correctly: ?assertEqual( Node diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 2bf0cf048..9ffce523d 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -40,6 +40,9 @@ init_per_suite(Config) -> PortDiscovery = application:get_env(gen_rpc, port_discovery), application:set_env(gen_rpc, port_discovery, stateless), application:ensure_all_started(gen_rpc), + %% ensure emqx_moduels' app modules are loaded + %% so the mnesia tables are created + ok = load_app(emqx_modules), emqx_ct_helpers:start_apps([]), [{port_discovery, PortDiscovery} | Config]. @@ -50,32 +53,45 @@ end_per_suite(Config) -> _ -> ok end. -t_is_ack_required(_) -> +init_per_testcase(Case, Config) -> + try + ?MODULE:Case({'init', Config}) + catch + error : function_clause -> + Config + end. + +end_per_testcase(Case, Config) -> + try + ?MODULE:Case({'end', Config}) + catch + error : function_clause -> + ok + end. + +t_is_ack_required(Config) when is_list(Config) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). -t_maybe_nack_dropped(_) -> +t_maybe_nack_dropped(Config) when is_list(Config) -> ?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). -t_nack_no_connection(_) -> +t_nack_no_connection(Config) when is_list(Config) -> Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). -t_maybe_ack(_) -> +t_maybe_ack(Config) when is_list(Config) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). -% t_subscribers(_) -> -% error('TODO'). - -t_random_basic(_) -> +t_random_basic(Config) when is_list(Config) -> ok = ensure_config(random), ClientId = <<"ClientId">>, Topic = <<"foo">>, @@ -105,7 +121,7 @@ t_random_basic(_) -> %% After the connection for the 2nd session is also closed, %% i.e. when all clients are offline, the following message(s) %% should be delivered randomly. -t_no_connection_nack(_) -> +t_no_connection_nack(Config) when is_list(Config) -> ok = ensure_config(sticky), Publisher = <<"publisher">>, Subscriber1 = <<"Subscriber1">>, @@ -171,27 +187,27 @@ t_no_connection_nack(_) -> % emqx_sm:close_session(SPid2), ok. -t_random(_) -> +t_random(Config) when is_list(Config) -> ok = ensure_config(random, true), test_two_messages(random). -t_round_robin(_) -> +t_round_robin(Config) when is_list(Config) -> ok = ensure_config(round_robin, true), test_two_messages(round_robin). -t_sticky(_) -> +t_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky, true), test_two_messages(sticky). -t_hash(_) -> +t_hash(Config) when is_list(Config) -> ok = ensure_config(hash, false), test_two_messages(hash). -t_hash_clinetid(_) -> +t_hash_clinetid(Config) when is_list(Config) -> ok = ensure_config(hash_clientid, false), test_two_messages(hash_clientid). -t_hash_topic(_) -> +t_hash_topic(Config) when is_list(Config) -> ok = ensure_config(hash_topic, false), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -230,7 +246,7 @@ t_hash_topic(_) -> ok. %% if the original subscriber dies, change to another one alive -t_not_so_sticky(_) -> +t_not_so_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky), ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -303,7 +319,7 @@ last_message(ExpectedPayload, Pids, Timeout) -> <<"not yet?">> end. -t_dispatch(_) -> +t_dispatch(Config) when is_list(Config) -> ok = ensure_config(random), Topic = <<"foo">>, ?assertEqual({error, no_subscribers}, @@ -312,18 +328,13 @@ t_dispatch(_) -> ?assertEqual({ok, 1}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})). -% t_unsubscribe(_) -> -% error('TODO'). - -% t_subscribe(_) -> -% error('TODO'). -t_uncovered_func(_) -> +t_uncovered_func(Config) when is_list(Config) -> ignored = gen_server:call(emqx_shared_sub, ignored), ok = gen_server:cast(emqx_shared_sub, ignored), ignored = emqx_shared_sub ! ignored, {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}. -t_per_group_config(_) -> +t_per_group_config(Config) when is_list(Config) -> ok = ensure_group_config(#{ <<"local_group_fallback">> => local, <<"local_group">> => local, @@ -342,8 +353,8 @@ t_per_group_config(_) -> test_two_messages(round_robin, <<"round_robin_group">>), test_two_messages(round_robin, <<"round_robin_group">>). -t_local(_) -> - Node = start_slave('local_shared_sub_test19', 21884), +t_local({'init', Config}) -> + Node = start_slave(local_shared_sub_test19, 21884), GroupConfig = #{ <<"local_group_fallback">> => local, <<"local_group">> => local, @@ -352,7 +363,11 @@ t_local(_) -> }, ok = ensure_group_config(Node, GroupConfig), ok = ensure_group_config(GroupConfig), - + [{slave_node, Node} | Config]; +t_local({'end', _Config}) -> + ok = stop_slave(local_shared_sub_test19); +t_local(Config) when is_list(Config) -> + Node = proplists:get_value(slave_node, Config), Topic = <<"local_foo1/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -388,7 +403,7 @@ t_local(_) -> ?assertNotEqual(UsedSubPid1, UsedSubPid2), ok. -t_local_fallback(_) -> +t_local_fallback({'init', Config}) -> ok = ensure_group_config(#{ <<"local_group_fallback">> => local, <<"local_group">> => local, @@ -396,10 +411,15 @@ t_local_fallback(_) -> <<"sticky_group">> => sticky }), + Node = start_slave(local_fallback_shared_sub_test19, 11885), + [{slave_node, Node} | Config]; +t_local_fallback({'end', _}) -> + ok = stop_slave(local_fallback_shared_sub_test19); +t_local_fallback(Config) when is_list(Config) -> Topic = <<"local_foo2/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - Node = start_slave('local_fallback_shared_sub_test19', 11885), + Node = proplists:get_value(slave_node, Config), {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), {ok, _} = emqtt:connect(ConnPid1), @@ -423,7 +443,7 @@ t_local_fallback(_) -> %% This one tests that broker tries to select another shared subscriber %% If the first one doesn't return an ACK -t_redispatch(_) -> +t_redispatch(Config) when is_list(Config) -> ok = ensure_config(sticky, true), application:set_env(emqx, shared_dispatch_ack_enabled, true), @@ -454,7 +474,7 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. -t_dispatch_when_inflights_are_full(_) -> +t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> ok = ensure_config(round_robin, true), Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, @@ -537,8 +557,20 @@ recv_msgs(Count, Msgs) -> end. start_slave(Name, Port) -> + ok = emqx_ct_helpers:start_apps([emqx_modules]), Listeners = [#{listen_on => {{127,0,0,1}, Port}, + start_apps => [emqx, emqx_modules], name => "internal", opts => [{zone,internal}], proto => tcp}], emqx_node_helpers:start_slave(Name, #{listeners => Listeners}). + +stop_slave(Name) -> + emqx_node_helpers:stop_slave(Name). + +load_app(App) -> + case application:load(App) of + ok -> ok; + {error, {already_loaded, _}} -> ok; + {error, Reason} -> error({failed_to_load_app, App, Reason}) + end. From 7423646191aefa90def937f560f9a1977c045f73 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 3 Oct 2022 17:05:35 +0200 Subject: [PATCH 2/2] test: ensure emqx_modules app is started in ct-slave node --- test/emqx_node_helpers.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index d68fee62e..f7c048d51 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -18,7 +18,7 @@ -include_lib("eunit/include/eunit.hrl"). --define(SLAVE_START_APPS, [emqx]). +-define(SLAVE_START_APPS, [emqx, emqx_modules]). -export([start_slave/1, start_slave/2, @@ -91,7 +91,7 @@ setup_node(Node, #{} = Opts) -> end, EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler), - [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx_modules, emqx]], + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]), ok = rpc:call(Node, ekka, join, [node()]),