diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index 4936da1f9..8baa8fee8 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -59,3 +59,27 @@ end end)() ). + +-define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin + __TEST_CASE = ?FUNCTION_NAME, + (fun + __GO(__CONFIG, __N) when __N >= NUM_RETRIES -> + TEST_BODY_FN(__CONFIG); + __GO(__CONFIG, __N) -> + try + TEST_BODY_FN(__CONFIG) + catch + __KIND:__REASON:__STACKTRACE -> + ct:pal("test errored; will retry\n ~p", [ + #{kind => __KIND, reason => __REASON, stacktrace => __STACKTRACE} + ]), + end_per_testcase(__TEST_CASE, __CONFIG), + garbage_collect(), + timer:sleep(1000), + __CONFIG1 = init_per_testcase(__TEST_CASE, __CONFIG), + __GO(__CONFIG1, __N + 1) + end + end)( + CONFIG, 0 + ) +end). diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index df69d4a13..1cfc10f74 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -725,10 +725,17 @@ start_slave(Name, Opts) when is_map(Opts) -> Node = node_name(Name), put_peer_mod(Node, SlaveMod), Cookie = atom_to_list(erlang:get_cookie()), + PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"), + NodeDataDir = filename:join([ + PrivDataDir, + Node, + integer_to_list(erlang:unique_integer()) + ]), DoStart = fun() -> case SlaveMod of ct_slave -> + ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]), ct_slave:start( Node, [ @@ -739,7 +746,8 @@ start_slave(Name, Opts) when is_map(Opts) -> {erl_flags, erl_flags()}, {env, [ {"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"}, - {"EMQX_NODE__COOKIE", Cookie} + {"EMQX_NODE__COOKIE", Cookie}, + {"EMQX_NODE__DATA_DIR", NodeDataDir} ]} ] ); @@ -844,7 +852,14 @@ setup_node(Node, Opts) when is_map(Opts) -> integer_to_list(erlang:unique_integer()), "mnesia" ]), - erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]), + case erpc:call(Node, application, get_env, [mnesia, dir, undefined]) of + undefined -> + ct:pal("~p: setting mnesia dir: ~p", [Node, MnesiaDataDir]), + erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]); + PreviousMnesiaDir -> + ct:pal("~p: mnesia dir already set: ~p", [Node, PreviousMnesiaDir]), + ok + end, %% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler %% in emqx_common_test_helpers:start_apps(...) @@ -877,8 +892,8 @@ setup_node(Node, Opts) when is_map(Opts) -> %% nodes. these variables might not be in the %% config file (e.g.: emqx_enterprise_schema). Cookie = atom_to_list(erlang:get_cookie()), - os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir), - os:putenv("EMQX_NODE__COOKIE", Cookie), + set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir), + set_env_once("EMQX_NODE__COOKIE", Cookie), emqx_config:init_load(SchemaMod), application:set_env(emqx, init_config_load_done, true) end, @@ -930,6 +945,15 @@ setup_node(Node, Opts) when is_map(Opts) -> %% Helpers +set_env_once(Var, Value) -> + case os:getenv(Var) of + false -> + os:putenv(Var, Value); + _OldValue -> + ok + end, + ok. + put_peer_mod(Node, SlaveMod) -> put({?MODULE, Node}, SlaveMod), ok. @@ -1289,6 +1313,7 @@ call_janitor() -> call_janitor(Timeout) -> Janitor = get_or_spawn_janitor(), ok = emqx_test_janitor:stop(Janitor, Timeout), + erase({?MODULE, janitor_proc}), ok. get_or_spawn_janitor() -> diff --git a/apps/emqx/test/emqx_test_janitor.erl b/apps/emqx/test/emqx_test_janitor.erl index 041b03fa7..2ee01e807 100644 --- a/apps/emqx/test/emqx_test_janitor.erl +++ b/apps/emqx/test/emqx_test_janitor.erl @@ -60,13 +60,14 @@ init(Parent) -> {ok, #{callbacks => [], owner => Parent}}. terminate(_Reason, #{callbacks := Callbacks}) -> - do_terminate(Callbacks). + _ = do_terminate(Callbacks), + ok. handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) -> {reply, ok, State#{callbacks := [Callback | Callbacks]}}; handle_call(terminate, _From, State = #{callbacks := Callbacks}) -> - do_terminate(Callbacks), - {stop, normal, ok, State}; + FailedCallbacks = do_terminate(Callbacks), + {stop, normal, ok, State#{callbacks := FailedCallbacks}}; handle_call(_Req, _From, State) -> {reply, error, State}. @@ -83,17 +84,18 @@ handle_info(_Msg, State) -> %%---------------------------------------------------------------------------------- do_terminate(Callbacks) -> - lists:foreach( - fun(Fun) -> + lists:foldl( + fun(Fun, Failed) -> try - Fun() + Fun(), + Failed catch K:E:S -> ct:pal("error executing callback ~p: ~p", [Fun, {K, E}]), ct:pal("stacktrace: ~p", [S]), - ok + [Fun | Failed] end end, + [], Callbacks - ), - ok. + ). diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index ce14eb83d..9f9381c95 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -148,6 +149,7 @@ end_per_testcase(_Testcase, Config) -> true -> ok; false -> + ok = emqx_config:delete_override_conf_files(), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), @@ -157,6 +159,7 @@ end_per_testcase(_Testcase, Config) -> %% machines struggle with all the containers running... emqx_common_test_helpers:call_janitor(60_000), ok = snabbkaffe:stop(), + flush_consumed(), ok end. @@ -373,7 +376,9 @@ start_consumer(TestCase, Config) -> (integer_to_binary(PulsarPort))/binary>> ), ConnOpts = #{}, - ConsumerClientId = TestCase, + ConsumerClientId = list_to_atom( + atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()) + ), CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"), SSLOpts = #{ enable => UseTLS, @@ -393,12 +398,12 @@ start_consumer(TestCase, Config) -> cb_init_args => #{send_to => self()}, cb_module => pulsar_echo_consumer, sub_type => 'Shared', - subscription => atom_to_list(TestCase), + subscription => atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()), max_consumer_num => 1, %% Note! This must not coincide with the client %% id, or else weird bugs will happen, like the %% consumer never starts... - name => test_consumer, + name => list_to_atom("test_consumer" ++ integer_to_list(erlang:unique_integer())), consumer_id => 1, conn_opts => ConnOpts }, @@ -440,7 +445,10 @@ wait_until_connected(SupMod, Mod) -> ?retry( _Sleep = 300, _Attempts0 = 20, - lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) + begin + true = length(Pids) > 0, + lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) + end ), ok. @@ -483,6 +491,12 @@ receive_consumed(Timeout) -> ct:fail("no message consumed") end. +flush_consumed() -> + receive + {pulsar_message, _} -> flush_consumed() + after 0 -> ok + end. + try_decode_json(Payload) -> case emqx_utils_json:safe_decode(Payload, [return_maps]) of {error, _} -> @@ -1054,31 +1068,44 @@ t_resource_manager_crash_before_producers_started(Config) -> ), ok. -t_cluster(Config) -> - MQTTTopic = ?config(mqtt_topic, Config), - ResourceId = resource_id(Config), - Cluster = cluster(Config), - ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), - QoS = 0, - Payload = emqx_guid:to_hexstr(emqx_guid:gen()), +t_cluster(Config0) -> + ct:timetrap({seconds, 120}), + ?retrying(Config0, 3, fun do_t_cluster/1). + +do_t_cluster(Config) -> ?check_trace( begin + MQTTTopic = ?config(mqtt_topic, Config), + ResourceId = resource_id(Config), + Cluster = cluster(Config), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + QoS = 0, + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + NumNodes = length(Cluster), + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := emqx_bridge_app_started}), + NumNodes, + 25_000 + ), Nodes = [N1, N2 | _] = start_cluster(Cluster), %% wait until bridge app supervisor is up; by that point, %% `emqx_config_handler:add_handler' has been called and the node should be %% ready to create bridges. - NumNodes = length(Nodes), - {ok, _} = snabbkaffe:block_until( - ?match_n_events(NumNodes, #{?snk_kind := emqx_bridge_app_started}), - 15_000 - ), - {ok, SRef0} = snabbkaffe:subscribe( + {ok, _} = snabbkaffe:receive_events(SRef0), + {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := pulsar_producer_bridge_started}), NumNodes, - 15_000 + 25_000 ), {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end), - {ok, _} = snabbkaffe:receive_events(SRef0), + {ok, _} = snabbkaffe:receive_events(SRef1), + {ok, _} = snabbkaffe:block_until( + ?match_n_events( + NumNodes, + #{?snk_kind := bridge_post_config_update_done} + ), + 25_000 + ), lists:foreach( fun(N) -> ?retry( @@ -1095,6 +1122,7 @@ t_cluster(Config) -> ), erpc:multicall(Nodes, fun wait_until_producer_connected/0), Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload), + ?tp(publishing_message, #{}), erpc:call(N2, emqx, publish, [Message0]), lists:foreach( @@ -1108,10 +1136,7 @@ t_cluster(Config) -> Nodes ), - ok - end, - fun(_Trace) -> - Data0 = receive_consumed(10_000), + Data0 = receive_consumed(30_000), ?assertMatch( [ #{ @@ -1123,7 +1148,9 @@ t_cluster(Config) -> ], Data0 ), + ok - end + end, + [] ), ok. diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl index 95b4ce697..f1a1383f0 100644 --- a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -20,14 +20,17 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). -t_copy_conf_override_on_restarts(_Config) -> +t_copy_conf_override_on_restarts(Config) -> ct:timetrap({seconds, 120}), snabbkaffe:fix_ct_logging(), - Cluster = cluster([cluster_spec({core, 1}), cluster_spec({core, 2}), cluster_spec({core, 3})]), + Cluster = cluster( + [cluster_spec({core, 1}), cluster_spec({core, 2}), cluster_spec({core, 3})], Config + ), %% 1. Start all nodes Nodes = start_cluster(Cluster), @@ -50,16 +53,19 @@ t_copy_conf_override_on_restarts(_Config) -> stop_cluster(Nodes) end. -t_copy_new_data_dir(_Config) -> +t_copy_new_data_dir(Config) -> net_kernel:start(['master1@127.0.0.1', longnames]), ct:timetrap({seconds, 120}), snabbkaffe:fix_ct_logging(), - Cluster = cluster([cluster_spec({core, 4}), cluster_spec({core, 5}), cluster_spec({core, 6})]), + Cluster = cluster( + [cluster_spec({core, 4}), cluster_spec({core, 5}), cluster_spec({core, 6})], Config + ), %% 1. Start all nodes [First | Rest] = Nodes = start_cluster(Cluster), try - File = "/configs/cluster.hocon", + NodeDataDir = erpc:call(First, emqx, data_dir, []), + File = NodeDataDir ++ "/configs/cluster.hocon", assert_config_load_done(Nodes), rpc:call(First, ?MODULE, create_data_dir, [File]), {[ok, ok, ok], []} = rpc:multicall(Nodes, application, stop, [emqx_conf]), @@ -74,16 +80,19 @@ t_copy_new_data_dir(_Config) -> stop_cluster(Nodes) end. -t_copy_deprecated_data_dir(_Config) -> +t_copy_deprecated_data_dir(Config) -> net_kernel:start(['master2@127.0.0.1', longnames]), ct:timetrap({seconds, 120}), snabbkaffe:fix_ct_logging(), - Cluster = cluster([cluster_spec({core, 7}), cluster_spec({core, 8}), cluster_spec({core, 9})]), + Cluster = cluster( + [cluster_spec({core, 7}), cluster_spec({core, 8}), cluster_spec({core, 9})], Config + ), %% 1. Start all nodes [First | Rest] = Nodes = start_cluster(Cluster), try - File = "/configs/cluster-override.conf", + NodeDataDir = erpc:call(First, emqx, data_dir, []), + File = NodeDataDir ++ "/configs/cluster-override.conf", assert_config_load_done(Nodes), rpc:call(First, ?MODULE, create_data_dir, [File]), {[ok, ok, ok], []} = rpc:multicall(Nodes, application, stop, [emqx_conf]), @@ -131,56 +140,60 @@ t_no_copy_from_newer_version_node(_Config) -> %%------------------------------------------------------------------------------ create_data_dir(File) -> - Node = atom_to_list(node()), - ok = filelib:ensure_dir(Node ++ "/certs/"), - ok = filelib:ensure_dir(Node ++ "/authz/"), - ok = filelib:ensure_dir(Node ++ "/configs/"), - ok = file:write_file(Node ++ "/certs/fake-cert", list_to_binary(Node)), - ok = file:write_file(Node ++ "/authz/fake-authz", list_to_binary(Node)), + NodeDataDir = emqx:data_dir(), + ok = filelib:ensure_dir(NodeDataDir ++ "/certs/"), + ok = filelib:ensure_dir(NodeDataDir ++ "/authz/"), + ok = filelib:ensure_dir(NodeDataDir ++ "/configs/"), + ok = file:write_file(NodeDataDir ++ "/certs/fake-cert", list_to_binary(NodeDataDir)), + ok = file:write_file(NodeDataDir ++ "/authz/fake-authz", list_to_binary(NodeDataDir)), Telemetry = <<"telemetry.enable = false">>, - ok = file:write_file(Node ++ File, Telemetry). + ok = file:write_file(File, Telemetry). set_data_dir_env() -> - Node = atom_to_list(node()), + NodeDataDir = emqx:data_dir(), + NodeStr = atom_to_list(node()), %% will create certs and authz dir - ok = filelib:ensure_dir(Node ++ "/configs/"), + ok = filelib:ensure_dir(NodeDataDir ++ "/configs/"), {ok, [ConfigFile]} = application:get_env(emqx, config_files), - NewConfigFile = ConfigFile ++ "." ++ Node, + NewConfigFile = ConfigFile ++ "." ++ NodeStr, + ok = filelib:ensure_dir(NewConfigFile), {ok, _} = file:copy(ConfigFile, NewConfigFile), Bin = iolist_to_binary(io_lib:format("node.config_files = [~p]~n", [NewConfigFile])), ok = file:write_file(NewConfigFile, Bin, [append]), - DataDir = iolist_to_binary(io_lib:format("node.data_dir = ~p~n", [Node])), + DataDir = iolist_to_binary(io_lib:format("node.data_dir = ~p~n", [NodeDataDir])), ok = file:write_file(NewConfigFile, DataDir, [append]), application:set_env(emqx, config_files, [NewConfigFile]), - application:set_env(emqx, data_dir, Node), + %% application:set_env(emqx, data_dir, Node), %% We set env both cluster.hocon and cluster-override.conf, but only one will be used - application:set_env(emqx, cluster_hocon_file, Node ++ "/configs/cluster.hocon"), - application:set_env(emqx, cluster_override_conf_file, Node ++ "/configs/cluster-override.conf"), + application:set_env(emqx, cluster_hocon_file, NodeDataDir ++ "/configs/cluster.hocon"), + application:set_env( + emqx, cluster_override_conf_file, NodeDataDir ++ "/configs/cluster-override.conf" + ), ok. -assert_data_copy_done([First0 | Rest], File) -> - First = atom_to_list(First0), - {ok, FakeCertFile} = file:read_file(First ++ "/certs/fake-cert"), - {ok, FakeAuthzFile} = file:read_file(First ++ "/authz/fake-authz"), - {ok, FakeOverrideFile} = file:read_file(First ++ File), +assert_data_copy_done([_First | Rest], File) -> + FirstDataDir = filename:dirname(filename:dirname(File)), + {ok, FakeCertFile} = file:read_file(FirstDataDir ++ "/certs/fake-cert"), + {ok, FakeAuthzFile} = file:read_file(FirstDataDir ++ "/authz/fake-authz"), + {ok, FakeOverrideFile} = file:read_file(File), {ok, ExpectFake} = hocon:binary(FakeOverrideFile), lists:foreach( fun(Node0) -> - Node = atom_to_list(Node0), + NodeDataDir = erpc:call(Node0, emqx, data_dir, []), ?assertEqual( {ok, FakeCertFile}, - file:read_file(Node ++ "/certs/fake-cert"), - #{node => Node} + file:read_file(NodeDataDir ++ "/certs/fake-cert"), + #{node => Node0} ), ?assertEqual( {ok, ExpectFake}, - hocon:files([Node ++ File]), - #{node => Node} + hocon:files([File]), + #{node => Node0} ), ?assertEqual( {ok, FakeAuthzFile}, - file:read_file(Node ++ "/authz/fake-authz"), - #{node => Node} + file:read_file(NodeDataDir ++ "/authz/fake-authz"), + #{node => Node0} ) end, Rest @@ -207,7 +220,7 @@ assert_config_load_done(Nodes) -> ). stop_cluster(Nodes) -> - [emqx_common_test_helpers:stop_slave(Node) || Node <- Nodes]. + emqx_utils:pmap(fun emqx_common_test_helpers:stop_slave/1, Nodes). start_cluster(Specs) -> [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Specs]. @@ -222,7 +235,8 @@ start_cluster_async(Specs) -> || {Name, Opts} <- Specs ]. -cluster(Specs) -> +cluster(Specs, Config) -> + PrivDataDir = ?config(priv_dir, Config), Env = [ {emqx, init_config_load_done, false}, {emqx, boot_modules, []} @@ -232,6 +246,7 @@ cluster(Specs) -> {apps, [emqx_conf]}, {load_schema, false}, {join_to, true}, + {priv_data_dir, PrivDataDir}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, []),