diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 1a83056cb..e24600181 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -20,7 +20,8 @@ -export([stop/1, stop_node/1]). -export([share_load_module/2]). --export([node_name/1]). +-export([node_name/1, mk_nodespecs/2]). +-export([start_apps/2, set_node_opts/2]). -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). @@ -80,12 +81,7 @@ when %% Working directory %% Everything a test produces should go here. Each node's stuff should go in its %% own directory. - work_dir := file:name(), - %% Usually, we want to ensure the node / test suite starts from a clean slate. - %% However, sometimes, we may want to test restarting a node. For such - %% situations, we need to disable this check to allow resuming from an existing - %% state. - skip_clean_suite_state_check => boolean() + work_dir := file:name() }. start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), @@ -129,14 +125,12 @@ mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) -> Node = node_name(Name), BasePort = base_port(N), WorkDir = maps:get(work_dir, ClusterOpts), - SkipCleanSuiteStateCheck = maps:get(skip_clean_suite_state_check, ClusterOpts, false), Defaults = #{ name => Node, role => core, apps => [], base_port => BasePort, work_dir => filename:join([WorkDir, Node]), - skip_clean_suite_state_check => SkipCleanSuiteStateCheck, driver => ct_slave }, maps:merge(Defaults, NodeOpts). @@ -307,7 +301,7 @@ start_apps(Node, #{apps := Apps} = Spec) -> ok. suite_opts(Spec) -> - maps:with([work_dir, skip_clean_suite_state_check], Spec). + maps:with([work_dir], Spec). maybe_join_cluster(_Node, #{role := replicant}) -> ok; diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index dbe9423da..9b3e58da4 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -358,8 +358,6 @@ stop_apps(Apps) -> %% -verify_clean_suite_state(#{skip_clean_suite_state_check := true}) -> - ok; verify_clean_suite_state(#{work_dir := WorkDir}) -> {ok, []} = file:list_dir(WorkDir), none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none), diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index c79856fc7..842782e35 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -20,9 +20,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% avoid inter-suite flakiness... - application:stop(emqx), - application:stop(emqx_durable_storage), TCApps = emqx_cth_suite:start( app_specs(), #{work_dir => ?config(priv_dir, Config)} @@ -36,8 +33,16 @@ end_per_suite(Config) -> init_per_testcase(t_session_subscription_idempotency, Config) -> Cluster = cluster(#{n => 1}), - Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), - [{cluster, Cluster}, {nodes, Nodes} | Config]; + ClusterOpts = #{work_dir => ?config(priv_dir, Config)}, + NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts), + Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts), + [ + {cluster, Cluster}, + {node_specs, NodeSpecs}, + {cluster_opts, ClusterOpts}, + {nodes, Nodes} + | Config + ]; init_per_testcase(_TestCase, Config) -> Config. @@ -92,12 +97,28 @@ get_all_iterator_ids(Node) -> emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) end). +wait_nodeup(Node) -> + ?retry( + _Sleep0 = 500, + _Attempts0 = 50, + pong = net_adm:ping(Node) + ). + +wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) -> + #{override_env := Env} = proplists:get_value(gen_rpc, Apps), + Port = proplists:get_value(tcp_server_port, Env), + ?retry( + _Sleep0 = 500, + _Attempts0 = 50, + false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port) + ). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_session_subscription_idempotency(Config) -> - Cluster = ?config(cluster, Config), + [Node1Spec | _] = ?config(node_specs, Config), [Node1] = ?config(nodes, Config), Port = get_mqtt_port(Node1, tcp), SubTopicFilter = <<"t/+">>, @@ -119,13 +140,25 @@ t_session_subscription_idempotency(Config) -> spawn_link(fun() -> ?tp(will_restart_node, #{}), - ct:pal("stopping node ~p", [Node1]), - ok = emqx_cth_cluster:stop_node(Node1), - ct:pal("stopped node ~p; restarting...", [Node1]), - [Node1] = emqx_cth_cluster:start(Cluster, #{ - work_dir => ?config(priv_dir, Config), - skip_clean_suite_state_check => true - }), + ct:pal("restarting node ~p", [Node1]), + true = monitor_node(Node1, true), + ok = erpc:call(Node1, init, restart, []), + receive + {nodedown, Node1} -> + ok + after 10_000 -> + ct:fail("node ~p didn't stop", [Node1]) + end, + ct:pal("waiting for nodeup ~p", [Node1]), + wait_nodeup(Node1), + wait_gen_rpc_down(Node1Spec), + ct:pal("restarting apps on ~p", [Node1]), + Apps = maps:get(apps, Node1Spec), + ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), + _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), + %% have to re-inject this so that we may stop the node succesfully at the + %% end.... + ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), ct:pal("node ~p restarted", [Node1]), ?tp(restarted_node, #{}), ok