From 3a008c8b4c7051992327ab53b4632c4f98cde1af Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 16 Feb 2024 11:15:04 +0100 Subject: [PATCH 1/8] refactor: bump ekka to 0.19.0 w/o mnesia boot phase --- apps/emqx/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 9953dd3fc..8a2abb8c5 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.18.4"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.0"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.41.0"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, diff --git a/mix.exs b/mix.exs index 598b770b8..6c55ffa7e 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.18.4", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.19.0", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.15", override: true}, diff --git a/rebar.config b/rebar.config index 88b32483e..137ffdfa4 100644 --- a/rebar.config +++ b/rebar.config @@ -83,7 +83,7 @@ {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.18.4"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.0"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.15"}}}, From d60ff1e616e58bfe821e42d352bb9666a1e3a8d8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 16 Feb 2024 13:32:58 +0100 Subject: [PATCH 2/8] test(plugins): avoid using same workdir for different nodes --- apps/emqx_plugins/test/emqx_plugins_SUITE.erl | 53 ++++++++----------- 1 file changed, 21 insertions(+), 32 deletions(-) diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 8fd1c2103..0f66e20dc 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -604,12 +604,9 @@ t_load_config_from_cli(Config) when is_list(Config) -> ok. group_t_copy_plugin_to_a_new_node({init, Config}) -> - WorkDir = proplists:get_value(install_dir, Config), - FromInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_from)), - file:del_dir_r(FromInstallDir), + FromInstallDir = filename:join(emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), from), ok = filelib:ensure_path(FromInstallDir), - ToInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_to)), - file:del_dir_r(ToInstallDir), + ToInstallDir = filename:join(emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), to), ok = filelib:ensure_path(ToInstallDir), #{package := Package, release_name := PluginName} = get_demo_plugin_package(FromInstallDir), Apps = [ @@ -697,8 +694,7 @@ group_t_copy_plugin_to_a_new_node(Config) -> %% checks that we can start a cluster with a lone node. group_t_copy_plugin_to_a_new_node_single_node({init, Config}) -> - WorkDir = ?config(install_dir, Config), - ToInstallDir = filename:join(WorkDir, "plugins_copy_to"), + ToInstallDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), file:del_dir_r(ToInstallDir), ok = filelib:ensure_path(ToInstallDir), #{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir), @@ -718,9 +714,7 @@ group_t_copy_plugin_to_a_new_node_single_node({init, Config}) -> ], [CopyToNode] = emqx_cth_cluster:start( [{plugins_copy_to, #{role => core, apps => Apps}}], - #{ - work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) - } + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} ), [ {to_install_dir, ToInstallDir}, @@ -752,36 +746,31 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) -> ok. group_t_cluster_leave({init, Config}) -> - WorkDir = ?config(install_dir, Config), - ToInstallDir = filename:join(WorkDir, "plugins_copy_to"), - file:del_dir_r(ToInstallDir), - ok = filelib:ensure_path(ToInstallDir), - #{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir), + Specs = emqx_cth_cluster:mk_nodespecs( + [ + {group_t_cluster_leave1, #{role => core, apps => [emqx, emqx_conf, emqx_ctl]}}, + {group_t_cluster_leave2, #{role => core, apps => [emqx, emqx_conf, emqx_ctl]}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(Specs), + InstallRelDir = "plugins_copy_to", + InstallDirs = [filename:join(WD, InstallRelDir) || #{work_dir := WD} <- Specs], + ok = lists:foreach(fun filelib:ensure_path/1, InstallDirs), + #{package := Package, release_name := PluginName} = get_demo_plugin_package(hd(InstallDirs)), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), - Apps = [ - emqx, - emqx_conf, - emqx_ctl, - {emqx_plugins, #{ + [{ok, _}, {ok, _}] = erpc:multicall(Nodes, emqx_cth_suite, start_app, [ + emqx_plugins, + #{ config => #{ plugins => #{ - install_dir => ToInstallDir, + install_dir => InstallRelDir, states => [#{name_vsn => NameVsn, enable => true}] } } - }} - ], - Nodes = emqx_cth_cluster:start( - [ - {group_t_cluster_leave1, #{role => core, apps => Apps}}, - {group_t_cluster_leave2, #{role => core, apps => Apps}} - ], - #{ - work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) } - ), + ]), [ - {to_install_dir, ToInstallDir}, {nodes, Nodes}, {name_vsn, NameVsn}, {plugin_name, PluginName} From 56eefe34d55e9b2375af606f7e9439573fed8ff1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 16 Feb 2024 13:33:59 +0100 Subject: [PATCH 3/8] feat(cth-cluster): use workdir as cwd on each node --- apps/emqx/test/emqx_cth_cluster.erl | 18 ++++++++++------ apps/emqx/test/emqx_cth_peer.erl | 33 ++++++++++------------------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index d02719c7e..28ee1f30f 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -332,11 +332,12 @@ allocate_listener_ports(Types, Spec) -> start_nodes_init(Specs, Timeout) -> Names = lists:map(fun(#{name := Name}) -> Name end, Specs), - Nodes = start_bare_nodes(Names, Timeout), - lists:foreach(fun node_init/1, Nodes). + _Nodes = start_bare_nodes(Names, Timeout), + lists:foreach(fun node_init/1, Specs). start_bare_nodes(Names) -> start_bare_nodes(Names, ?TIMEOUT_NODE_START_MS). + start_bare_nodes(Names, Timeout) -> Args = erl_flags(), Envs = [], @@ -355,7 +356,7 @@ start_bare_nodes(Names, Timeout) -> Nodes. deadline(Timeout) -> - erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, nanosecond). + erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, native). is_overdue(Deadline) -> erlang:monotonic_time() > Deadline. @@ -379,10 +380,15 @@ wait_boot_complete(Waits, Deadline) -> wait_boot_complete(Waits, Deadline) end. -node_init(Node) -> - % Make it possible to call `ct:pal` and friends (if running under rebar3) +node_init(#{name := Node, work_dir := WorkDir}) -> + %% Create exclusive current directory for the node. Some configurations, like plugin + %% installation directory, are the same for the whole cluster, and nodes on the same + %% machine will step on each other's toes... + ok = filelib:ensure_path(WorkDir), + ok = erpc:call(Node, file, set_cwd, [WorkDir]), + %% Make it possible to call `ct:pal` and friends (if running under rebar3) _ = share_load_module(Node, cthr), - % Enable snabbkaffe trace forwarding + %% Enable snabbkaffe trace forwarding ok = snabbkaffe:forward_trace(Node), when_cover_enabled(fun() -> {ok, _} = cover:start([Node]) end), ok. diff --git a/apps/emqx/test/emqx_cth_peer.erl b/apps/emqx/test/emqx_cth_peer.erl index 815b0a712..ca66b42b7 100644 --- a/apps/emqx/test/emqx_cth_peer.erl +++ b/apps/emqx/test/emqx_cth_peer.erl @@ -43,28 +43,17 @@ start_link(Name, Args, Envs, Timeout) when is_atom(Name) -> do_start(Name0, Args, Envs, Timeout, Func) when is_atom(Name0) -> {Name, Host} = parse_node_name(Name0), - %% Create exclusive current directory for the node. Some configurations, like plugin - %% installation directory, are the same for the whole cluster, and nodes on the same - %% machine will step on each other's toes... - {ok, Cwd} = file:get_cwd(), - NodeCwd = filename:join([Cwd, Name]), - ok = filelib:ensure_dir(filename:join([NodeCwd, "dummy"])), - try - file:set_cwd(NodeCwd), - {ok, Pid, Node} = peer:Func(#{ - name => Name, - host => Host, - args => Args, - env => Envs, - wait_boot => Timeout, - longnames => true, - shutdown => {halt, 1000} - }), - true = register(Node, Pid), - {ok, Node} - after - file:set_cwd(Cwd) - end. + {ok, Pid, Node} = peer:Func(#{ + name => Name, + host => Host, + args => Args, + env => Envs, + wait_boot => Timeout, + longnames => true, + shutdown => {halt, 1000} + }), + true = register(Node, Pid), + {ok, Node}. stop(Node) when is_atom(Node) -> Pid = whereis(Node), From ae79516fd2d4a4d053425cc4cde7c3b58c66437e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 16 Feb 2024 18:50:23 +0100 Subject: [PATCH 4/8] test(cluster-rpc): ensure testsuite is side-effects-free --- .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 48da67195..33ff86ff7 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -19,9 +19,9 @@ -compile(export_all). -compile(nowarn_export_all). --include("emqx_conf.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). + -define(NODE1, emqx_cluster_rpc). -define(NODE2, emqx_cluster_rpc2). -define(NODE3, emqx_cluster_rpc3). @@ -42,20 +42,25 @@ suite() -> [{timetrap, {minutes, 5}}]. groups() -> []. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([]), - ok = mria:wait_for_tables(emqx_cluster_rpc:create_tables()), - ok = emqx_config:put([node, cluster_call, retry_interval], 1000), - meck:new(emqx_alarm, [non_strict, passthrough, no_link]), - meck:expect(emqx_alarm, activate, 3, ok), - meck:expect(emqx_alarm, deactivate, 3, ok), + Apps = emqx_cth_suite:start( + [ + emqx, + {emqx_conf, + "node.cluster_call {" + "\n retry_interval = 1s" + "\n max_history = 100" + "\n cleanup_interval = 500ms" + "\n}"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), meck:new(mria, [non_strict, passthrough, no_link]), meck:expect(mria, running_nodes, 0, [?NODE1, {node(), ?NODE2}, {node(), ?NODE3}]), - Config. + [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([]), - meck:unload(emqx_alarm), - ok. +end_per_suite(Config) -> + ok = meck:unload(mria), + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_testcase(_TestCase, Config) -> stop(), @@ -67,7 +72,6 @@ end_per_testcase(_Config) -> ok. t_base_test(_Config) -> - emqx_cluster_rpc:reset(), ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, @@ -94,7 +98,6 @@ t_base_test(_Config) -> ok. t_commit_fail_test(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]}, {init_failure, "MFA return not ok"} = multicall(M, F, A), @@ -102,7 +105,6 @@ t_commit_fail_test(_Config) -> ok. t_commit_crash_test(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, no_exist_function, []}, {init_failure, {error, Meta}} = multicall(M, F, A), @@ -150,7 +152,6 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> ok. t_commit_concurrency(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), Pid = self(), {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]}, @@ -211,7 +212,6 @@ receive_seq_msg(Acc) -> end. t_catch_up_status_handle_next_commit(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, {ok, 1, ok} = multicall(M, F, A, 1, 1000), @@ -220,7 +220,6 @@ t_catch_up_status_handle_next_commit(_Config) -> ok. t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), ets:new(test, [named_table, public]), ets:insert(test, {other_mfa_result, failed}), @@ -247,7 +246,6 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> ok. t_del_stale_mfa(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), MFA = {M, F, A} = {io, format, ["test"]}, Keys = lists:seq(1, 50), @@ -289,7 +287,6 @@ t_del_stale_mfa(_Config) -> ok. t_skip_failed_commit(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), ct:sleep(180), @@ -310,7 +307,6 @@ t_skip_failed_commit(_Config) -> ok. t_fast_forward_commit(_Config) -> - emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), ct:sleep(180), @@ -358,12 +354,10 @@ tnx_ids(Status) -> ). start() -> - {ok, Pid1} = emqx_cluster_rpc:start_link(), - {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), - {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), - {ok, Pid4} = emqx_cluster_rpc_cleaner:start_link(100, 500), - true = erlang:register(emqx_cluster_rpc_cleaner, Pid4), - {ok, [Pid1, Pid2, Pid3, Pid4]}. + {ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), + {ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), + ok = emqx_cluster_rpc:reset(), + ok. stop() -> [ @@ -376,7 +370,7 @@ stop() -> erlang:exit(P, kill) end end - || N <- [?NODE1, ?NODE2, ?NODE3, emqx_cluster_rpc_cleaner] + || N <- [?NODE2, ?NODE3] ]. receive_msg(0, _Msg) -> From 6ea9d2a6d92e59f3f250a9e245a6ba952e69d19f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 16 Feb 2024 19:06:42 +0100 Subject: [PATCH 5/8] test(conf): simplify confsync test suite --- apps/emqx_conf/test/emqx_conf_app_SUITE.erl | 44 ++++++++++----------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl index 5c22b5eaf..4760cbed3 100644 --- a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -74,10 +74,7 @@ t_copy_new_data_dir(Config) -> {[ok, ok, ok], []} = rpc:multicall(Nodes, ?MODULE, set_data_dir_env, []), ok = rpc:call(First, application, start, [emqx_conf]), {[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]), - - assert_data_copy_done(Nodes, File), - stop_cluster(Nodes), - ok + ok = assert_data_copy_done(Nodes, File) after stop_cluster(Nodes) end. @@ -101,10 +98,7 @@ t_copy_deprecated_data_dir(Config) -> {[ok, ok, ok], []} = rpc:multicall(Nodes, ?MODULE, set_data_dir_env, []), ok = rpc:call(First, application, start, [emqx_conf]), {[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]), - - assert_data_copy_done(Nodes, File), - stop_cluster(Nodes), - ok + ok = assert_data_copy_done(Nodes, File) after stop_cluster(Nodes) end. @@ -133,9 +127,7 @@ t_no_copy_from_newer_version_node(Config) -> ]), ok = rpc:call(First, application, start, [emqx_conf]), {[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]), - ok = assert_no_cluster_conf_copied(Rest, File), - stop_cluster(Nodes), - ok + ok = assert_no_cluster_conf_copied(Rest, File) after stop_cluster(Nodes) end. @@ -155,26 +147,30 @@ create_data_dir(File) -> set_data_dir_env() -> NodeDataDir = emqx:data_dir(), - NodeStr = atom_to_list(node()), + NodeConfigDir = filename:join(NodeDataDir, "configs"), %% will create certs and authz dir - ok = filelib:ensure_dir(NodeDataDir ++ "/configs/"), - {ok, [ConfigFile]} = application:get_env(emqx, config_files), - NewConfigFile = ConfigFile ++ "." ++ NodeStr, - ok = filelib:ensure_dir(NewConfigFile), - {ok, _} = file:copy(ConfigFile, NewConfigFile), - Bin = iolist_to_binary(io_lib:format("node.config_files = [~p]~n", [NewConfigFile])), - ok = file:write_file(NewConfigFile, Bin, [append]), - DataDir = iolist_to_binary(io_lib:format("node.data_dir = ~p~n", [NodeDataDir])), - ok = file:write_file(NewConfigFile, DataDir, [append]), - application:set_env(emqx, config_files, [NewConfigFile]), + ok = filelib:ensure_path(NodeConfigDir), + ConfigFile = filename:join(NodeConfigDir, "emqx.conf"), + ok = append_format(ConfigFile, "node.config_files = [~p]~n", [ConfigFile]), + ok = append_format(ConfigFile, "node.data_dir = ~p~n", [NodeDataDir]), + application:set_env(emqx, config_files, [ConfigFile]), %% application:set_env(emqx, data_dir, Node), %% We set env both cluster.hocon and cluster-override.conf, but only one will be used - application:set_env(emqx, cluster_hocon_file, NodeDataDir ++ "/configs/cluster.hocon"), application:set_env( - emqx, cluster_override_conf_file, NodeDataDir ++ "/configs/cluster-override.conf" + emqx, + cluster_hocon_file, + filename:join([NodeDataDir, "configs", "cluster.hocon"]) + ), + application:set_env( + emqx, + cluster_override_conf_file, + filename:join([NodeDataDir, "configs", "cluster-override.conf"]) ), ok. +append_format(Filename, Fmt, Args) -> + ok = file:write_file(Filename, io_lib:format(Fmt, Args), [append]). + assert_data_copy_done([_First | Rest], File) -> FirstDataDir = filename:dirname(filename:dirname(File)), {ok, FakeCertFile} = file:read_file(FirstDataDir ++ "/certs/fake-cert"), From 367ffa8e801c03c0e77bf154b2eb66734cbbfb70 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 27 Feb 2024 23:41:50 +0100 Subject: [PATCH 6/8] feat(utils-fs): add function to compute relpaths naively --- apps/emqx_utils/src/emqx_utils_fs.erl | 23 ++++++++++++ apps/emqx_utils/test/emqx_utils_fs_SUITE.erl | 38 ++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/apps/emqx_utils/src/emqx_utils_fs.erl b/apps/emqx_utils/src/emqx_utils_fs.erl index 6d1bfe831..5c8b5ded0 100644 --- a/apps/emqx_utils/src/emqx_utils_fs.erl +++ b/apps/emqx_utils/src/emqx_utils_fs.erl @@ -20,6 +20,7 @@ -export([traverse_dir/3]). -export([read_info/1]). +-export([find_relpath/2]). -export([canonicalize/1]). -type fileinfo() :: #file_info{}. @@ -62,6 +63,28 @@ traverse_dir(FoldFun, Acc, AbsPath, {error, Reason}) -> read_info(AbsPath) -> file:read_link_info(AbsPath, [{time, posix}, raw]). +-spec find_relpath(file:name(), file:name()) -> + file:name(). +find_relpath(Path, RelativeTo) -> + case + filename:pathtype(Path) =:= filename:pathtype(RelativeTo) andalso + drop_path_prefix(filename:split(Path), filename:split(RelativeTo)) + of + false -> + Path; + [] -> + "."; + RelativePath -> + filename:join(RelativePath) + end. + +drop_path_prefix([Name | T1], [Name | T2]) -> + drop_path_prefix(T1, T2); +drop_path_prefix(Path, []) -> + Path; +drop_path_prefix(_Path, _To) -> + false. + %% @doc Canonicalize a file path. %% Removes stray slashes and converts to a string. -spec canonicalize(file:name()) -> diff --git a/apps/emqx_utils/test/emqx_utils_fs_SUITE.erl b/apps/emqx_utils/test/emqx_utils_fs_SUITE.erl index 2a36f6902..704d0af24 100644 --- a/apps/emqx_utils/test/emqx_utils_fs_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_fs_SUITE.erl @@ -143,6 +143,44 @@ t_canonicalize_non_utf8(_) -> emqx_utils_fs:canonicalize(<<128, 128, 128>>) ). +%% + +t_find_relpath(_) -> + ?assertEqual( + "d1/1", + emqx_utils_fs:find_relpath("/usr/local/nonempty/d1/1", "/usr/local/nonempty") + ). + +t_find_relpath_same(_) -> + ?assertEqual( + ".", + emqx_utils_fs:find_relpath("/usr/local/bin", "/usr/local/bin/") + ), + ?assertEqual( + ".", + emqx_utils_fs:find_relpath("/usr/local/bin/.", "/usr/local/bin") + ). + +t_find_relpath_no_prefix(_) -> + ?assertEqual( + "/usr/lib/erlang/lib", + emqx_utils_fs:find_relpath("/usr/lib/erlang/lib", "/usr/local/bin") + ). + +t_find_relpath_both_relative(_) -> + ?assertEqual( + "1/2/3", + emqx_utils_fs:find_relpath("local/nonempty/1/2/3", "local/nonempty") + ). + +t_find_relpath_different_types(_) -> + ?assertEqual( + "local/nonempty/1/2/3", + emqx_utils_fs:find_relpath("local/nonempty/1/2/3", "/usr/local/nonempty") + ). + +%% + chmod_file(File, Mode) -> {ok, FileInfo} = file:read_file_info(File), ok = file:write_file_info(File, FileInfo#file_info{mode = Mode}). From 28d664bae231dd980a94019fdcae1a2932b4b3dc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 27 Feb 2024 23:51:54 +0100 Subject: [PATCH 7/8] test(pulsar): simplify the test suite --- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 108 ++++++------------ 1 file changed, 34 insertions(+), 74 deletions(-) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 704de0745..dfc5af3a7 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -14,7 +14,7 @@ -import(emqx_common_test_helpers, [on_exit/1]). -define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>). --define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]). +-define(APPS, [emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]). -define(RULE_TOPIC, "mqtt/rule"). -define(RULE_TOPIC_BIN, <>). @@ -52,14 +52,27 @@ only_once_tests() -> ]. init_per_suite(Config) -> - Config. + %% Ensure enterprise bridge module is loaded + _ = emqx_bridge_enterprise:module_info(), + %% TODO + %% This is needed to ensure that filenames generated deep inside pulsar/replayq + %% will not exceed 256 characters, because replayq eventually turns them into atoms. + %% The downside is increased risk of accidental name clashes / testsuite interference. + {ok, Cwd} = file:get_cwd(), + PrivDir = ?config(priv_dir, Config), + WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd), + Apps = emqx_cth_suite:start( + lists:flatten([ + ?APPS, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ]), + #{work_dir => WorkDir} + ), + [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)), - _ = application:stop(emqx_connector), - ok. +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_group(plain = Type, Config) -> PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"), @@ -123,13 +136,6 @@ common_init_per_group() -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - %% Ensure enterprise bridge module is loaded - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_common_test_helpers:start_apps(?APPS), - {ok, _} = application:ensure_all_started(pulsar), - _ = emqx_bridge_enterprise:module_info(), - {ok, _} = application:ensure_all_started(emqx_connector), - emqx_mgmt_api_test_util:init_suite(), UniqueNum = integer_to_binary(erlang:unique_integer()), MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, [ @@ -210,9 +216,7 @@ pulsar_config(TestCase, _PulsarType, Config) -> PulsarTopic = ?config(pulsar_topic, Config), AuthType = proplists:get_value(sasl_auth_mechanism, Config, none), UseTLS = proplists:get_value(use_tls, Config, false), - Name = << - (atom_to_binary(TestCase))/binary, UniqueNum/binary - >>, + Name = atom_to_binary(TestCase), MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), Prefix = case UseTLS of @@ -508,51 +512,18 @@ try_decode_json(Payload) -> end. cluster(Config) -> - PrivDataDir = ?config(priv_dir, Config), - Cluster = emqx_common_test_helpers:emqx_cluster( - [core, core], + Apps = [ + {emqx, #{override_env => [{boot_modules, [broker]}]}} + | ?APPS + ], + Nodes = emqx_cth_cluster:start( [ - {apps, [emqx_conf] ++ ?APPS ++ [pulsar]}, - {listener_ports, []}, - {priv_data_dir, PrivDataDir}, - {load_schema, true}, - {start_autocluster, true}, - {schema_mod, emqx_enterprise_schema}, - {env_handler, fun - (emqx) -> - application:set_env(emqx, boot_modules, [broker]), - ok; - (emqx_conf) -> - ok; - (_) -> - ok - end} - ] - ), - ct:pal("cluster: ~p", [Cluster]), - Cluster. - -start_cluster(Cluster) -> - Nodes = - [ - emqx_common_test_helpers:start_peer(Name, Opts) - || {Name, Opts} <- Cluster + {emqx_bridge_pulsar_impl_producer1, #{apps => Apps}}, + {emqx_bridge_pulsar_impl_producer2, #{apps => Apps}} ], - NumNodes = length(Nodes), - on_exit(fun() -> - emqx_utils:pmap( - fun(N) -> - ct:pal("stopping ~p", [N]), - ok = emqx_common_test_helpers:stop_peer(N) - end, - Nodes - ) - end), - {ok, _} = snabbkaffe:block_until( - %% -1 because only those that join the first node will emit the event. - ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), - 30_000 + #{work_dir => emqx_cth_suite:work_dir(Config)} ), + ok = on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end), Nodes. kill_resource_managers() -> @@ -1105,24 +1076,13 @@ do_t_cluster(Config) -> begin MQTTTopic = ?config(mqtt_topic, Config), ResourceId = resource_id(Config), - Cluster = cluster(Config), + Nodes = [N1, N2 | _] = cluster(Config), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), QoS = 0, Payload = emqx_guid:to_hexstr(emqx_guid:gen()), - NumNodes = length(Cluster), - {ok, SRef0} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := emqx_bridge_app_started}), - NumNodes, - 25_000 - ), - Nodes = [N1, N2 | _] = start_cluster(Cluster), - %% wait until bridge app supervisor is up; by that point, - %% `emqx_config_handler:add_handler' has been called and the node should be - %% ready to create bridges. - {ok, _} = snabbkaffe:receive_events(SRef0), {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := pulsar_producer_bridge_started}), - NumNodes, + length(Nodes), 25_000 ), {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end), @@ -1130,7 +1090,7 @@ do_t_cluster(Config) -> erpc:multicall(Nodes, fun wait_until_producer_connected/0), {ok, _} = snabbkaffe:block_until( ?match_n_events( - NumNodes, + length(Nodes), #{?snk_kind := bridge_post_config_update_done} ), 25_000 From 20c4029b3350bd1e364bad8338ab68cb1c59da19 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 28 Feb 2024 14:03:59 +0100 Subject: [PATCH 8/8] test(cluster-rpc): unload any mocks in `end_per_suite/1` Co-authored-by: Thales Macedo Garitezi --- apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 33ff86ff7..b054988be 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -59,7 +59,7 @@ init_per_suite(Config) -> [{suite_apps, Apps} | Config]. end_per_suite(Config) -> - ok = meck:unload(mria), + _ = meck:unload(), ok = emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_testcase(_TestCase, Config) ->