diff --git a/.gitignore b/.gitignore index 91183b48b..eecb62570 100644 --- a/.gitignore +++ b/.gitignore @@ -12,16 +12,12 @@ ebin !ebin/.placeholder .concrete/DEV_MODE .rebar -test/ebin/*.beam .exrc -plugins/*/ebin *.swp *.so .erlang.mk/ cover/ -eunit.coverdata test/ct.cover.spec -ct.coverdata .idea/ _build .rebar3 diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 33ed815fd..0278a1b1d 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.11"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 1fb2feb87..ffb4e3d1e 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -24,17 +24,14 @@ stop/1, get_description/0, get_release/0, - set_init_config_load_done/0, - get_init_config_load_done/0, + set_config_loader/1, + get_config_loader/0, set_init_tnx_id/1, get_init_tnx_id/0 ]). --include("emqx.hrl"). -include("logger.hrl"). --define(APP, emqx). - %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- @@ -62,11 +59,11 @@ stop(_State) -> ok. %% @doc Call this function to make emqx boot without loading config, %% in case we want to delegate the config load to a higher level app %% which manages emqx app. -set_init_config_load_done() -> - application:set_env(emqx, init_config_load_done, true). +set_config_loader(Module) when is_atom(Module) -> + application:set_env(emqx, config_loader, Module). -get_init_config_load_done() -> - application:get_env(emqx, init_config_load_done, false). +get_config_loader() -> + application:get_env(emqx, config_loader, emqx). %% @doc Set the transaction id from which this node should start applying after boot. %% The transaction ID is received from the core node which we just copied the latest @@ -79,9 +76,15 @@ get_init_tnx_id() -> application:get_env(emqx, cluster_rpc_init_tnx_id, -1). maybe_load_config() -> - case get_init_config_load_done() of - true -> ok; - false -> emqx_config:init_load(emqx_schema) + case get_config_loader() of + emqx -> + emqx_config:init_load(emqx_schema); + Module -> + ?SLOG(debug, #{ + msg => "skip_init_config_load", + reason => "Some application has set another config loader", + loader => Module + }) end. maybe_start_listeners() -> diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 1c21d1d2e..aaee3b64e 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -17,8 +17,6 @@ %% @doc Start/Stop MQTT listeners. -module(emqx_listeners). --elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]). - -include("emqx_mqtt.hrl"). -include("emqx_schema.hrl"). -include("logger.hrl"). @@ -98,7 +96,7 @@ format_list(Listener) -> do_list_raw() -> %% GET /listeners from other nodes returns [] when init config is not loaded. - case emqx_app:get_init_config_load_done() of + case emqx_app:get_config_loader() =/= emqx of true -> Key = <<"listeners">>, Raw = emqx_config:get_raw([Key], #{}), diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 498af53e6..b004b139e 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -67,6 +67,7 @@ -export([ emqx_cluster/1, emqx_cluster/2, + start_ekka/0, start_epmd/0, start_slave/2, stop_slave/1, @@ -348,7 +349,7 @@ stop_apps(Apps, Opts) -> [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]], ok = mria_mnesia:delete_schema(), %% to avoid inter-suite flakiness - application:unset_env(emqx, init_config_load_done), + application:unset_env(emqx, config_loader), application:unset_env(emqx, boot_modules), persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY), case Opts of @@ -910,7 +911,7 @@ setup_node(Node, Opts) when is_map(Opts) -> set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir), set_env_once("EMQX_NODE__COOKIE", Cookie), emqx_config:init_load(SchemaMod), - application:set_env(emqx, init_config_load_done, true) + emqx_app:set_config_loader(emqx_conf) end, %% Need to set this otherwise listeners will conflict between each other diff --git a/apps/emqx/test/emqx_common_test_http.erl b/apps/emqx/test/emqx_common_test_http.erl index e9064715d..7f50db92b 100644 --- a/apps/emqx/test/emqx_common_test_http.erl +++ b/apps/emqx/test/emqx_common_test_http.erl @@ -26,6 +26,7 @@ create_default_app/0, delete_default_app/0, default_auth_header/0, + auth_header/1, auth_header/2 ]). @@ -72,6 +73,9 @@ do_request_api(Method, Request, HttpOpts) -> get_http_data(ResponseBody) -> emqx_utils_json:decode(ResponseBody, [return_maps]). +auth_header(#{api_key := ApiKey, api_secret := Secret}) -> + auth_header(binary_to_list(ApiKey), binary_to_list(Secret)). + auth_header(User, Pass) -> Encoded = base64:encode_to_string(lists:append([User, ":", Pass])), {"Authorization", "Basic " ++ Encoded}. @@ -87,8 +91,7 @@ create_default_app() -> ExpiredAt = Now + timer:minutes(10), emqx_mgmt_auth:create( ?DEFAULT_APP_ID, ?DEFAULT_APP_SECRET, true, ExpiredAt, <<"default app key for test">> - ), - ok. + ). delete_default_app() -> emqx_mgmt_auth:delete(?DEFAULT_APP_ID). diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl new file mode 100644 index 000000000..caae62f4a --- /dev/null +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -0,0 +1,417 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cth_cluster). + +-export([start/2]). +-export([stop/1]). + +-export([share_load_module/2]). + +-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). + +-define(TIMEOUT_NODE_START_MS, 15000). +-define(TIMEOUT_APPS_START_MS, 30000). +-define(TIMEOUT_NODE_STOP_S, 15). + +%% + +-type nodespec() :: {_ShortName :: atom(), #{ + % DB Role + % Default: `core` + role => core | replicant, + + % DB Backend + % Default: `mnesia` if there are no replicants in cluster, otherwise `rlog` + % + % NOTE + % Default are chosen with the intention of lowering the chance of observing + % inconsistencies due to data races (i.e. missing mria shards on nodes where some + % application hasn't been started yet). + db_backend => mnesia | rlog, + + % Applications to start on the node + % Default: only applications needed for clustering are started + % + % NOTES + % 1. Apps needed for clustering started unconditionally. + % * It's not possible to redefine their startup order. + % * It's possible to add `{ekka, #{start => false}}` appspec though. + % 2. There are defaults applied to some appspecs if they present. + % * We try to keep `emqx_conf` config consistent with default configuration of + % clustering applications. + apps => [emqx_cth_suite:appspec()], + + base_port => inet:port_number(), + + % Node to join to in clustering phase + % If set to `undefined` this node won't try to join the cluster + % Default: no (first core node is used to join to by default) + join_to => node() | undefined, + + %% Working directory + %% If this directory is not empty, starting up the node applications will fail + %% Default: "${ClusterOpts.work_dir}/${nodename}" + work_dir => file:name(), + + % Tooling to manage nodes + % Default: `ct_slave`. + driver => ct_slave | slave +}}. + +-spec start([nodespec()], ClusterOpts) -> + [node()] +when + ClusterOpts :: #{ + %% Working directory + %% Everything a test produces should go here. Each node's stuff should go in its + %% own directory. + work_dir := file:name() + }. +start(Nodes, ClusterOpts) -> + NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), + ct:pal("Starting cluster: ~p", [NodeSpecs]), + % 1. Start bare nodes with only basic applications running + _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS), + % 2. Start applications needed to enable clustering + % Generally, this causes some applications to restart, but we deliberately don't + % start them yet. + _ = lists:foreach(fun run_node_phase_cluster/1, NodeSpecs), + % 3. Start applications after cluster is formed + % Cluster-joins are complete, so they shouldn't restart in the background anymore. + _ = emqx_utils:pmap(fun run_node_phase_apps/1, NodeSpecs, ?TIMEOUT_APPS_START_MS), + [Node || #{name := Node} <- NodeSpecs]. + +mk_nodespecs(Nodes, ClusterOpts) -> + NodeSpecs = lists:zipwith( + fun(N, {Name, Opts}) -> mk_init_nodespec(N, Name, Opts, ClusterOpts) end, + lists:seq(1, length(Nodes)), + Nodes + ), + CoreNodes = [Node || #{name := Node, role := core} <- NodeSpecs], + Backend = + case length(CoreNodes) of + L when L == length(NodeSpecs) -> + mnesia; + _ -> + rlog + end, + lists:map( + fun(Spec0) -> + Spec1 = maps:merge(#{core_nodes => CoreNodes, db_backend => Backend}, Spec0), + Spec2 = merge_default_appspecs(Spec1, NodeSpecs), + Spec3 = merge_clustering_appspecs(Spec2, NodeSpecs), + Spec3 + end, + NodeSpecs + ). + +mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) -> + Node = node_name(Name), + BasePort = base_port(N), + WorkDir = maps:get(work_dir, ClusterOpts), + Defaults = #{ + name => Node, + role => core, + apps => [], + base_port => BasePort, + work_dir => filename:join([WorkDir, Node]), + driver => ct_slave + }, + maps:merge(Defaults, NodeOpts). + +merge_default_appspecs(#{apps := Apps} = Spec, NodeSpecs) -> + Spec#{apps => [mk_node_appspec(App, Spec, NodeSpecs) || App <- Apps]}. + +merge_clustering_appspecs(#{apps := Apps} = Spec, NodeSpecs) -> + AppsClustering = lists:map( + fun(App) -> + case lists:keyfind(App, 1, Apps) of + AppSpec = {App, _} -> + AppSpec; + false -> + {App, default_appspec(App, Spec, NodeSpecs)} + end + end, + ?APPS_CLUSTERING + ), + AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)], + Spec#{apps => AppsClustering ++ AppsRest}. + +mk_node_appspec({App, Opts}, Spec, NodeSpecs) -> + {App, emqx_cth_suite:merge_appspec(default_appspec(App, Spec, NodeSpecs), Opts)}; +mk_node_appspec(App, Spec, NodeSpecs) -> + {App, default_appspec(App, Spec, NodeSpecs)}. + +default_appspec(gen_rpc, #{name := Node}, NodeSpecs) -> + NodePorts = lists:foldl( + fun(#{name := CNode, base_port := Port}, Acc) -> + Acc#{CNode => {tcp, gen_rpc_port(Port)}} + end, + #{}, + NodeSpecs + ), + {tcp, Port} = maps:get(Node, NodePorts), + #{ + override_env => [ + % NOTE + % This is needed to make sure `gen_rpc` peers will find each other. + {port_discovery, manual}, + {tcp_server_port, Port}, + {client_config_per_node, {internal, NodePorts}} + ] + }; +default_appspec(mria, #{role := Role, db_backend := Backend}, _NodeSpecs) -> + #{ + override_env => [ + {node_role, Role}, + {db_backend, Backend} + ] + }; +default_appspec(ekka, Spec, _NodeSpecs) -> + Overrides = + case get_cluster_seeds(Spec) of + [_ | _] = Seeds -> + % NOTE + % Presumably, this is needed for replicants to find core nodes. + [{cluster_discovery, {static, [{seeds, Seeds}]}}]; + [] -> + [] + end, + #{ + override_env => Overrides + }; +default_appspec(emqx_conf, Spec, _NodeSpecs) -> + % NOTE + % This usually sets up a lot of `gen_rpc` / `mria` / `ekka` application envs in + % `emqx_config:init_load/2` during configuration mapping, so we need to keep them + % in sync with the values we set up here. + #{ + name := Node, + role := Role, + db_backend := Backend, + base_port := BasePort, + work_dir := WorkDir + } = Spec, + Listeners = [ + #{Type => #{default => #{bind => format("127.0.0.1:~p", [Port])}}} + || Type <- [tcp, ssl, ws, wss], + Port <- [listener_port(BasePort, Type)] + ], + Cluster = + case get_cluster_seeds(Spec) of + [_ | _] = Seeds -> + % NOTE + % Presumably, this is needed for replicants to find core nodes. + #{discovery_strategy => static, static => #{seeds => Seeds}}; + [] -> + #{} + end, + #{ + config => #{ + node => #{ + name => Node, + role => Role, + cookie => erlang:get_cookie(), + % TODO: will it be synced to the same value eventually? + data_dir => unicode:characters_to_binary(WorkDir), + db_backend => Backend + }, + cluster => Cluster, + rpc => #{ + % NOTE + % This (along with `gen_rpc` env overrides) is needed to make sure `gen_rpc` + % peers will find each other. + protocol => tcp, + tcp_server_port => gen_rpc_port(BasePort), + port_discovery => manual + }, + listeners => lists:foldl(fun maps:merge/2, #{}, Listeners) + } + }; +default_appspec(_App, _, _) -> + #{}. + +get_cluster_seeds(#{join_to := undefined}) -> + []; +get_cluster_seeds(#{join_to := Node}) -> + [Node]; +get_cluster_seeds(#{core_nodes := CoreNodes}) -> + CoreNodes. + +start_node_init(Spec = #{name := Node}) -> + Node = start_bare_node(Node, Spec), + pong = net_adm:ping(Node), + % Preserve node spec right on the remote node + ok = set_node_opts(Node, Spec), + % Make it possible to call `ct:pal` and friends (if running under rebar3) + _ = share_load_module(Node, cthr), + % Enable snabbkaffe trace forwarding + ok = snabbkaffe:forward_trace(Node), + ok. + +run_node_phase_cluster(Spec = #{name := Node}) -> + ok = load_apps(Node, Spec), + ok = start_apps_clustering(Node, Spec), + ok = maybe_join_cluster(Node, Spec), + ok. + +run_node_phase_apps(Spec = #{name := Node}) -> + ok = start_apps(Node, Spec), + ok. + +set_node_opts(Node, Spec) -> + erpc:call(Node, persistent_term, put, [{?MODULE, opts}, Spec]). + +get_node_opts(Node) -> + erpc:call(Node, persistent_term, get, [{?MODULE, opts}]). + +load_apps(Node, #{apps := Apps}) -> + erpc:call(Node, emqx_cth_suite, load_apps, [Apps]). + +start_apps_clustering(Node, #{apps := Apps} = Spec) -> + SuiteOpts = maps:with([work_dir], Spec), + AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING], + _Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]), + ok. + +start_apps(Node, #{apps := Apps} = Spec) -> + SuiteOpts = maps:with([work_dir], Spec), + AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)], + _Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]), + ok. + +maybe_join_cluster(_Node, #{role := replicant}) -> + ok; +maybe_join_cluster(Node, Spec) -> + case get_cluster_seeds(Spec) of + [JoinTo | _] -> + ok = join_cluster(Node, JoinTo); + [] -> + ok + end. + +join_cluster(Node, JoinTo) -> + case erpc:call(Node, ekka, join, [JoinTo]) of + ok -> + ok; + ignore -> + ok; + Error -> + error({failed_to_join_cluster, #{node => Node, error => Error}}) + end. + +%% + +stop(Nodes) -> + _ = emqx_utils:pmap(fun stop_node/1, Nodes, ?TIMEOUT_NODE_STOP_S * 1000), + ok. + +stop_node(Name) -> + Node = node_name(Name), + try get_node_opts(Node) of + Opts -> + stop_node(Name, Opts) + catch + error:{erpc, _} -> + ok + end. + +stop_node(Node, #{driver := ct_slave}) -> + case ct_slave:stop(Node, [{stop_timeout, ?TIMEOUT_NODE_STOP_S}]) of + {ok, _} -> + ok; + {error, Reason, _} when Reason == not_connected; Reason == not_started -> + ok + end; +stop_node(Node, #{driver := slave}) -> + slave:stop(Node). + +%% Ports + +base_port(Number) -> + 10000 + Number * 100. + +gen_rpc_port(BasePort) -> + BasePort - 1. + +listener_port(BasePort, tcp) -> + BasePort; +listener_port(BasePort, ssl) -> + BasePort + 1; +listener_port(BasePort, quic) -> + BasePort + 2; +listener_port(BasePort, ws) -> + BasePort + 3; +listener_port(BasePort, wss) -> + BasePort + 4. + +%% + +start_bare_node(Name, #{driver := ct_slave}) -> + {ok, Node} = ct_slave:start( + node_name(Name), + [ + {kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 20_000}, + {startup_timeout, 20_000}, + {erl_flags, erl_flags()}, + {env, []} + ] + ), + Node; +start_bare_node(Name, #{driver := slave}) -> + {ok, Node} = slave:start_link(host(), Name, ebin_path()), + Node. + +erl_flags() -> + %% One core and redirecting logs to master + "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path(). + +ebin_path() -> + string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + +is_lib(Path) -> + string:prefix(Path, code:lib_dir()) =:= nomatch andalso + string:str(Path, "_build/default/plugins") =:= 0. + +share_load_module(Node, Module) -> + case code:get_object_code(Module) of + {Module, Code, Filename} -> + {module, Module} = erpc:call(Node, code, load_binary, [Module, Filename, Code]), + ok; + error -> + error + end. + +node_name(Name) -> + case string:tokens(atom_to_list(Name), "@") of + [_Name, _Host] -> + %% the name already has a @ + Name; + _ -> + list_to_atom(atom_to_list(Name) ++ "@" ++ host()) + end. + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. + +%% + +format(Format, Args) -> + unicode:characters_to_binary(io_lib:format(Format, Args)). diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl new file mode 100644 index 000000000..aef0fc5e5 --- /dev/null +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -0,0 +1,374 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cth_suite). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/emqx_authentication.hrl"). + +-export([start/2]). +-export([stop/1]). + +-export([load_apps/1]). +-export([start_apps/2]). +-export([start_app/2]). +-export([start_app/3]). +-export([stop_apps/1]). + +-export([merge_appspec/2]). + +-export_type([appspec/0]). +-export_type([appspec_opts/0]). + +-define(NOW, + (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}])) +). + +-define(PAL(IMPORTANCE, FMT, ARGS), + case erlang:whereis(ct_logs) of + undefined -> + io:format("*** " ?MODULE_STRING " ~s @ ~p ***~n" ++ FMT ++ "~n", [?NOW, node() | ARGS]); + _ -> + ct:pal(?MODULE, IMPORTANCE, FMT, ARGS, [{heading, ?MODULE_STRING}]) + end +). + +%% + +-type appname() :: atom(). +-type appspec() :: {appname(), appspec_opts()}. + +%% Config structure serializable into HOCON document. +-type config() :: #{atom() => scalar() | [scalar()] | config() | [config()]}. +-type scalar() :: atom() | number() | string() | binary(). + +-type appspec_opts() :: #{ + %% 1. Enable loading application config + %% If not defined or set to `false`, this step will be skipped. + %% If application is missing a schema module, this step will fail. + %% Merging amounts to appending, unless `false` is used, then merge result is also `false`. + config => iodata() | config() | emqx_config:raw_config() | false, + + %% 2. Override the application environment + %% If not defined or set to `false`, this step will be skipped. + %% Merging amounts to appending, unless `false` is used, then merge result is `[]`. + override_env => [{atom(), term()}] | false, + + %% 3. Perform anything right before starting the application + %% If not defined or set to `false`, this step will be skipped. + %% Merging amounts to redefining. + before_start => fun(() -> _) | fun((appname()) -> _) | false, + + %% 4. Starting the application + %% If not defined or set to `true`, `application:ensure_all_started/1` is used. + %% If custom function is used, it should return list of all applications that were started. + %% If set to `false`, application will not be started. + %% Merging amounts to redefining. + start => fun(() -> {ok, [appname()]}) | fun((appname()) -> {ok, [appname()]}) | boolean(), + + %% 5. Perform anything right after starting the application + %% If not defined or set to `false`, this step will be skipped. + %% Merging amounts to redefining. + after_start => fun(() -> _) | fun((appname()) -> _) | false +}. + +%% @doc Start applications with a clean slate. +%% Provided appspecs will be merged with defaults defined in `default_appspec/1`. +-spec start([appname() | appspec()], SuiteOpts) -> + StartedApps :: [appname()] +when + SuiteOpts :: #{ + %% Working directory + %% Everything a test produces should go here. If this directory is not empty, + %% function will raise an error. + work_dir := file:name() + }. +start(Apps, SuiteOpts = #{work_dir := WorkDir}) -> + % 1. Prepare appspec instructions + AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps], + % 2. Load every app so that stuff scanning attributes of loaded modules works + ok = lists:foreach(fun load_appspec/1, AppSpecs), + % 3. Verify that we're running with a clean state. + ok = filelib:ensure_dir(filename:join(WorkDir, foo)), + ok = verify_clean_suite_state(SuiteOpts), + % 4. Setup isolated mnesia directory + ok = emqx_common_test_helpers:load(mnesia), + ok = application:set_env(mnesia, dir, filename:join([WorkDir, mnesia])), + % 5. Start ekka separately. + % For some reason it's designed to be started in non-regular way, so we have to track + % applications started in the process manually. + EkkaSpecs = [{App, proplists:get_value(App, AppSpecs, #{})} || App <- [gen_rpc, mria, ekka]], + EkkaApps = start_apps(EkkaSpecs, SuiteOpts), + % 6. Start apps following instructions. + RestSpecs = [AppSpec || AppSpec <- AppSpecs, not lists:member(AppSpec, EkkaSpecs)], + EkkaApps ++ start_appspecs(RestSpecs). + +load_apps(Apps) -> + lists:foreach(fun load_appspec/1, [mk_appspec(App, #{}) || App <- Apps]). + +load_appspec({App, _Opts}) -> + ok = emqx_common_test_helpers:load(App), + load_app_deps(App). + +load_app_deps(App) -> + AlreadyLoaded = [A || {A, _, _} <- application:loaded_applications()], + case application:get_key(App, applications) of + {ok, Deps} -> + Apps = Deps -- AlreadyLoaded, + ok = lists:foreach(fun emqx_common_test_helpers:load/1, Apps), + ok = lists:foreach(fun load_app_deps/1, Apps); + undefined -> + ok + end. + +start_apps(Apps, SuiteOpts) -> + start_appspecs([mk_appspec(App, SuiteOpts) || App <- Apps]). + +start_app(App, StartOpts) -> + start_app(App, StartOpts, #{}). + +start_app(App, StartOpts, SuiteOpts) -> + start_appspecs([mk_appspec({App, StartOpts}, SuiteOpts)]). + +start_appspecs(AppSpecs) -> + lists:flatmap( + fun({App, Spec}) -> start_appspec(App, Spec) end, + AppSpecs + ). + +mk_appspec({App, Opts}, SuiteOpts) -> + Defaults = default_appspec(App, SuiteOpts), + {App, merge_appspec(Defaults, init_spec(Opts))}; +mk_appspec(App, SuiteOpts) -> + Defaults = default_appspec(App, SuiteOpts), + {App, Defaults}. + +init_spec(Opts = #{}) -> + Opts; +init_spec(Config) when is_list(Config); is_binary(Config) -> + #{config => [Config, "\n"]}. + +start_appspec(App, StartOpts) -> + _ = log_appspec(App, StartOpts), + _ = maybe_configure_app(App, StartOpts), + _ = maybe_override_env(App, StartOpts), + _ = maybe_before_start(App, StartOpts), + case maybe_start(App, StartOpts) of + {ok, Started} -> + ?PAL(?STD_IMPORTANCE, "Started applications: ~0p", [Started]), + _ = maybe_after_start(App, StartOpts), + Started; + {error, Reason} -> + error({failed_to_start_app, App, Reason}) + end. + +log_appspec(App, StartOpts) when map_size(StartOpts) > 0 -> + Fmt = lists:flatmap( + fun(Opt) -> "~n * ~p: " ++ spec_fmt(fc, Opt) end, + maps:keys(StartOpts) + ), + Args = lists:flatmap( + fun({Opt, V}) -> [Opt, spec_fmt(ffun, {Opt, V})] end, + maps:to_list(StartOpts) + ), + ?PAL(?STD_IMPORTANCE, "Starting ~p with:" ++ Fmt, [App | Args]); +log_appspec(App, #{}) -> + ?PAL(?STD_IMPORTANCE, "Starting ~p", [App]). + +spec_fmt(fc, config) -> "~n~ts"; +spec_fmt(fc, _) -> "~p"; +spec_fmt(ffun, {config, C}) -> render_config(C); +spec_fmt(ffun, {_, X}) -> X. + +maybe_configure_app(_App, #{config := false}) -> + ok; +maybe_configure_app(App, #{config := Config}) -> + case app_schema(App) of + {ok, SchemaModule} -> + configure_app(SchemaModule, Config); + {error, Reason} -> + error({failed_to_configure_app, App, Reason}) + end; +maybe_configure_app(_App, #{}) -> + ok. + +configure_app(SchemaModule, Config) -> + ok = emqx_config:init_load(SchemaModule, render_config(Config)), + ok. + +maybe_override_env(App, #{override_env := Env = [{_, _} | _]}) -> + ok = application:set_env([{App, Env}]); +maybe_override_env(_App, #{}) -> + ok. + +maybe_before_start(App, #{before_start := Fun}) when is_function(Fun, 1) -> + Fun(App); +maybe_before_start(_App, #{before_start := Fun}) when is_function(Fun, 0) -> + Fun(); +maybe_before_start(_App, #{}) -> + ok. + +maybe_start(_App, #{start := false}) -> + {ok, []}; +maybe_start(_App, #{start := Fun}) when is_function(Fun, 0) -> + Fun(); +maybe_start(App, #{start := Fun}) when is_function(Fun, 1) -> + Fun(App); +maybe_start(App, #{}) -> + application:ensure_all_started(App). + +maybe_after_start(App, #{after_start := Fun}) when is_function(Fun, 1) -> + Fun(App); +maybe_after_start(_App, #{after_start := Fun}) when is_function(Fun, 0) -> + Fun(); +maybe_after_start(_App, #{}) -> + ok. + +-spec merge_appspec(appspec_opts(), appspec_opts()) -> + appspec_opts(). +merge_appspec(Opts1, Opts2) -> + maps:merge_with( + fun + (config, C1, C2) -> merge_config(C1, C2); + (override_env, E1, E2) -> merge_envs(E1, E2); + (_Opt, _Val1, Val2) -> Val2 + end, + init_spec(Opts1), + init_spec(Opts2) + ). + +merge_envs(false, E2) -> + E2; +merge_envs(_E, false) -> + []; +merge_envs(E1, E2) -> + E1 ++ E2. + +merge_config(false, C2) -> + C2; +merge_config(_C, false) -> + false; +merge_config(C1, C2) -> + [render_config(C1), "\n", render_config(C2)]. + +default_appspec(ekka, _SuiteOpts) -> + #{ + start => fun start_ekka/0 + }; +default_appspec(emqx, SuiteOpts) -> + #{ + override_env => [{data_dir, maps:get(work_dir, SuiteOpts, "data")}] + }; +default_appspec(emqx_authz, _SuiteOpts) -> + #{ + config => #{ + % NOTE + % Disable default authorization sources (i.e. acl.conf file rules). + authorization => #{sources => []} + } + }; +default_appspec(emqx_conf, SuiteOpts) -> + Config = #{ + node => #{ + name => node(), + cookie => erlang:get_cookie(), + % FIXME + data_dir => unicode:characters_to_binary(maps:get(work_dir, SuiteOpts, "data")) + } + }, + % NOTE + % Since `emqx_conf_schema` manages config for a lot of applications, it's good to include + % their defaults as well. + SharedConfig = lists:foldl( + fun(App, Acc) -> + emqx_utils_maps:deep_merge(Acc, default_config(App, SuiteOpts)) + end, + Config, + [ + emqx, + emqx_authz + ] + ), + #{ + config => SharedConfig, + % NOTE + % We inform `emqx` of our config loader before starting `emqx_conf` sothat it won't + % overwrite everything with a default configuration. + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end + }; +default_appspec(emqx_dashboard, _SuiteOpts) -> + #{ + after_start => fun() -> + true = emqx_dashboard_listener:is_ready(infinity) + end + }; +default_appspec(_, _) -> + #{}. + +default_config(App, SuiteOpts) -> + maps:get(config, default_appspec(App, SuiteOpts), #{}). + +%% + +start_ekka() -> + ok = emqx_common_test_helpers:start_ekka(), + {ok, [mnesia, ekka]}. + +%% + +-spec stop(_StartedApps :: [appname()]) -> + ok. +stop(Apps) -> + ok = stop_apps(Apps), + clean_suite_state(). + +-spec stop_apps(_StartedApps :: [appname()]) -> + ok. +stop_apps(Apps) -> + ok = lists:foreach(fun application:stop/1, lists:reverse(Apps)), + ok = lists:foreach(fun application:unload/1, Apps). + +%% + +verify_clean_suite_state(#{work_dir := WorkDir}) -> + {ok, []} = file:list_dir(WorkDir), + none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none), + [] = emqx_config:get_root_names(), + ok. + +clean_suite_state() -> + _ = persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY), + _ = emqx_config:erase_all(), + ok. + +%% + +app_schema(App) -> + Mod = list_to_atom(atom_to_list(App) ++ "_schema"), + try is_list(Mod:roots()) of + true -> {ok, Mod}; + false -> {error, schema_no_roots} + catch + error:undef -> + {error, schema_not_found} + end. + +render_config(Config = #{}) -> + unicode:characters_to_binary(hocon_pp:do(Config, #{})); +render_config(Config) -> + unicode:characters_to_binary(Config). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index d4bb9bbea..e280f4fe5 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -1114,7 +1114,7 @@ setup_node(Node, Port) -> %% We load configuration, and than set the special enviroment variable %% which says that emqx shouldn't load configuration at startup emqx_config:init_load(emqx_schema), - application:set_env(emqx, init_config_load_done, true), + emqx_app:set_config_loader(?MODULE), ok = emqx_config:put([listeners, tcp, default, bind], {{127, 0, 0, 1}, Port}), ok = emqx_config:put([listeners, ssl, default, bind], {{127, 0, 0, 1}, Port + 1}), diff --git a/apps/emqx_authz/test/emqx_authz_file_SUITE.erl b/apps/emqx_authz/test/emqx_authz_file_SUITE.erl index be8907feb..ec96522a5 100644 --- a/apps/emqx_authz/test/emqx_authz_file_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_file_SUITE.erl @@ -39,33 +39,21 @@ groups() -> []. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_authz], - fun set_special_configs/1 - ), - %% meck after authz started - meck:expect( - emqx_authz, - acl_conf_file, - fun() -> - emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf") - end - ), Config. end_per_suite(_Config) -> - ok = emqx_authz_test_lib:restore_authorizers(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authz]). - -init_per_testcase(_TestCase, Config) -> - ok = emqx_authz_test_lib:reset_authorizers(), - Config. - -set_special_configs(emqx_authz) -> - ok = emqx_authz_test_lib:reset_authorizers(); -set_special_configs(_) -> ok. +init_per_testcase(TestCase, Config) -> + Apps = emqx_cth_suite:start( + [{emqx_conf, "authorization.no_match = deny"}, emqx_authz], + #{work_dir => filename:join(?config(priv_dir, Config), TestCase)} + ), + [{tc_apps, Apps} | Config]. + +end_per_testcase(_TestCase, Config) -> + emqx_cth_suite:stop(?config(tc_apps, Config)). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 25452eb77..bc27afda2 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -24,8 +24,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). --define(SUITE_APPS, [emqx_conf, emqx_authn, emqx_management, emqx_rule_engine, emqx_bridge]). - -define(BRIDGE_TYPE_HTTP, <<"webhook">>). -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(URL(PORT, PATH), @@ -74,6 +72,19 @@ }). -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). +-define(APPSPECS, [ + emqx_conf, + emqx, + emqx_authn, + emqx_management, + {emqx_rule_engine, "rule_engine { rules {} }"}, + {emqx_bridge, "bridges {}"} +]). + +-define(APPSPEC_DASHBOARD, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} +). + all() -> [ {group, single}, @@ -104,105 +115,43 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -init_per_group(cluster, Config) -> - Cluster = mk_cluster_specs(Config), - ct:pal("Starting ~p", [Cluster]), - Nodes = [ - emqx_common_test_helpers:start_slave(Name, Opts) - || {Name, Opts} <- Cluster - ], - [NodePrimary | NodesRest] = Nodes, - ok = erpc:call(NodePrimary, fun() -> init_node(primary) end), - _ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest], - [{group, cluster}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config]; -init_per_group(cluster_later_join, Config) -> - Cluster = mk_cluster_specs(Config, #{join_to => undefined}), - ct:pal("Starting ~p", [Cluster]), - Nodes = [ - emqx_common_test_helpers:start_slave(Name, Opts) - || {Name, Opts} <- Cluster - ], - [NodePrimary | NodesRest] = Nodes, - ok = erpc:call(NodePrimary, fun() -> init_node(primary) end), - _ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest], - [{group, cluster_later_join}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config]; -init_per_group(_, Config) -> - ok = emqx_mgmt_api_test_util:init_suite(?SUITE_APPS), - ok = load_suite_config(emqx_rule_engine), - ok = load_suite_config(emqx_bridge), - [{group, single}, {api_node, node()} | Config]. +init_per_group(cluster = Name, Config) -> + Nodes = [NodePrimary | _] = mk_cluster(Name, Config), + init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +init_per_group(cluster_later_join = Name, Config) -> + Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), + init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +init_per_group(Name, Config) -> + WorkDir = filename:join(?config(priv_dir, Config), Name), + Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), + init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]). -mk_cluster_specs(Config) -> - mk_cluster_specs(Config, #{}). +init_api(Config) -> + APINode = ?config(node, Config), + {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []), + [{api, App} | Config]. -mk_cluster_specs(Config, Opts) -> - Specs = [ - {core, emqx_bridge_api_SUITE1, #{}}, - {core, emqx_bridge_api_SUITE2, #{}} - ], - CommonOpts = Opts#{ - env => [{emqx, boot_modules, [broker]}], - apps => [], - % NOTE - % We need to start all those apps _after_ the cluster becomes stable, in the - % `init_node/1`. This is because usual order is broken in very subtle way: - % 1. Node starts apps including `mria` and `emqx_conf` which starts `emqx_cluster_rpc`. - % 2. The `emqx_cluster_rpc` sets up a mnesia table subscription during initialization. - % 3. In the meantime `mria` joins the cluster and notices it should restart. - % 4. Mnesia subscription becomes lost during restarts (god knows why). - % Yet we need to load them before, so that mria / mnesia will know which tables - % should be created in the cluster. - % TODO - % We probably should hide these intricacies behind the `emqx_common_test_helpers`. - load_apps => ?SUITE_APPS ++ [emqx_dashboard], - env_handler => fun load_suite_config/1, - load_schema => false, - priv_data_dir => ?config(priv_dir, Config) - }, - emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts). +mk_cluster(Name, Config) -> + mk_cluster(Name, Config, #{}). -init_node(Type) -> - ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, fun load_suite_config/1), - case Type of - primary -> - ok = emqx_dashboard_desc_cache:init(), - ok = emqx_config:put( - [dashboard, listeners], - #{http => #{bind => 18083, proxy_header => false}} - ), - ok = emqx_dashboard:start_listeners(), - ready = emqx_dashboard_listener:regenerate_minirest_dispatch(), - emqx_common_test_http:create_default_app(); - regular -> - ok - end. - -load_suite_config(emqx_rule_engine) -> - ok = emqx_common_test_helpers:load_config( - emqx_rule_engine_schema, - <<"rule_engine { rules {} }">> - ); -load_suite_config(emqx_bridge) -> - ok = emqx_common_test_helpers:load_config( - emqx_bridge_schema, - <<"bridges {}">> - ); -load_suite_config(_) -> - ok. +mk_cluster(Name, Config, Opts) -> + Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD], + Node2Apps = ?APPSPECS, + emqx_cth_cluster:start( + [ + {emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}}, + {emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}} + ], + #{work_dir => filename:join(?config(priv_dir, Config), Name)} + ). end_per_group(Group, Config) when Group =:= cluster; Group =:= cluster_later_join -> - ok = lists:foreach( - fun(Node) -> - _ = erpc:call(Node, emqx_common_test_helpers, stop_apps, [?SUITE_APPS]), - emqx_common_test_helpers:stop_slave(Node) - end, - ?config(cluster_nodes, Config) - ); -end_per_group(_, _Config) -> - emqx_mgmt_api_test_util:end_suite(?SUITE_APPS), + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); +end_per_group(_, Config) -> + emqx_cth_suite:stop(?config(group_apps, Config)), ok. init_per_testcase(t_broken_bpapi_vsn, Config) -> @@ -228,7 +177,7 @@ end_per_testcase(t_old_bpapi_vsn, Config) -> end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), - Node = ?config(api_node, Config), + Node = ?config(node, Config), ok = emqx_common_test_helpers:call_janitor(), ok = stop_http_server(Sock, Acceptor), ok = erpc:call(Node, fun clear_resources/0), @@ -900,7 +849,7 @@ t_start_stop_inconsistent_bridge_cluster(Config) -> start_stop_inconsistent_bridge(Type, Config) -> Port = ?config(port, Config), URL = ?URL(Port, "abc"), - Node = ?config(api_node, Config), + Node = ?config(node, Config), erpc:call(Node, fun() -> meck:new(emqx_bridge_resource, [passthrough, no_link]), @@ -1351,9 +1300,7 @@ t_inconsistent_webhook_request_timeouts(Config) -> t_cluster_later_join_metrics(Config) -> Port = ?config(port, Config), - APINode = ?config(api_node, Config), - ClusterNodes = ?config(cluster_nodes, Config), - [OtherNode | _] = ClusterNodes -- [APINode], + [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config), URL1 = ?URL(Port, "path1"), Name = ?BRIDGE_NAME, BridgeParams = ?HTTP_BRIDGE(URL1, Name), @@ -1371,7 +1318,7 @@ t_cluster_later_join_metrics(Config) -> request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), %% Now join the other node join with the api node. - ok = erpc:call(OtherNode, ekka, join, [APINode]), + ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), %% Check metrics; shouldn't crash even if the bridge is not %% ready on the node that just joined the cluster. ?assertMatch( @@ -1419,8 +1366,9 @@ request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> URL = operation_path(Type, Op, BridgeID, Config), request(Method, URL, Body, Config); request(Method, URL, Body, Config) -> + AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)), Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, - emqx_mgmt_api_test_util:request_api(Method, URL, [], auth_header(Config), Body, Opts). + emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). request(Method, URL, Body, Decoder, Config) -> case request(Method, URL, Body, Config) of @@ -1436,11 +1384,8 @@ request_json(Method, URLLike, Config) -> request_json(Method, URLLike, Body, Config) -> request(Method, URLLike, Body, fun json/1, Config). -auth_header(Config) -> - erpc:call(?config(api_node, Config), emqx_common_test_http, default_auth_header, []). - operation_path(node, Oper, BridgeID, Config) -> - uri(["nodes", ?config(api_node, Config), "bridges", BridgeID, Oper]); + uri(["nodes", ?config(node, Config), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID, _Config) -> uri(["bridges", BridgeID, Oper]). @@ -1448,23 +1393,23 @@ enable_path(Enable, BridgeID) -> uri(["bridges", BridgeID, "enable", Enable]). publish_message(Topic, Body, Config) -> - Node = ?config(api_node, Config), + Node = ?config(node, Config), erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). update_config(Path, Value, Config) -> - Node = ?config(api_node, Config), + Node = ?config(node, Config), erpc:call(Node, emqx, update_config, [Path, Value]). get_raw_config(Path, Config) -> - Node = ?config(api_node, Config), + Node = ?config(node, Config), erpc:call(Node, emqx, get_raw_config, [Path]). add_user_auth(Chain, AuthenticatorID, User, Config) -> - Node = ?config(api_node, Config), + Node = ?config(node, Config), erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]). delete_user_auth(Chain, AuthenticatorID, User, Config) -> - Node = ?config(api_node, Config), + Node = ?config(node, Config), erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]). str(S) when is_list(S) -> S; diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 459e13676..c92c28971 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -22,6 +22,9 @@ -export([get_override_config_file/0]). -export([sync_data_from_node/0]). +%% Test purposes +-export([init_load_done/0]). + -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). @@ -46,7 +49,7 @@ stop(_State) -> %% This function is named 'override' due to historical reasons. get_override_config_file() -> Node = node(), - case emqx_app:get_init_config_load_done() of + case init_load_done() of false -> {error, #{node => Node, msg => "init_conf_load_not_done"}}; true -> @@ -91,7 +94,22 @@ sync_data_from_node() -> %% ------------------------------------------------------------------------------ init_load() -> - emqx_config:init_load(emqx_conf:schema_module()). + case emqx_app:get_config_loader() of + Module when Module == emqx; Module == emqx_conf -> + ok = emqx_config:init_load(emqx_conf:schema_module()), + ok = emqx_app:set_config_loader(emqx_conf), + ok; + Module -> + ?SLOG(debug, #{ + msg => "skip_init_config_load", + reason => "Some application has set another config loader", + loader => Module + }) + end. + +init_load_done() -> + % NOTE: Either us or some higher level (i.e. tests) code loaded config. + emqx_app:get_config_loader() =/= emqx. init_conf() -> %% Workaround for https://github.com/emqx/mria/issues/94: @@ -99,8 +117,7 @@ init_conf() -> _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]), {ok, TnxId} = sync_cluster_conf(), _ = emqx_app:set_init_tnx_id(TnxId), - ok = init_load(), - ok = emqx_app:set_init_config_load_done(). + ok = init_load(). cluster_nodes() -> mria:cluster_nodes(cores) -- [node()]. diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl index 34bf5c702..2e3b40b87 100644 --- a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -215,7 +215,7 @@ assert_no_cluster_conf_copied([Node | Nodes], File) -> assert_config_load_done(Nodes) -> lists:foreach( fun(Node) -> - Done = rpc:call(Node, emqx_app, get_init_config_load_done, []), + Done = rpc:call(Node, emqx_conf_app, init_load_done, []), ?assert(Done, #{node => Node}) end, Nodes @@ -240,7 +240,6 @@ start_cluster_async(Specs) -> cluster(Specs, Config) -> PrivDataDir = ?config(priv_dir, Config), Env = [ - {emqx, init_config_load_done, false}, {emqx, boot_modules, []} ], emqx_common_test_helpers:emqx_cluster(Specs, [ diff --git a/apps/emqx_ft/src/emqx_ft.app.src b/apps/emqx_ft/src/emqx_ft.app.src index 713774409..8c37c77a8 100644 --- a/apps/emqx_ft/src/emqx_ft.app.src +++ b/apps/emqx_ft/src/emqx_ft.app.src @@ -7,6 +7,7 @@ kernel, stdlib, gproc, + emqx, emqx_s3 ]}, {env, []}, diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 89cf3827e..290cda333 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -65,31 +65,25 @@ suite() -> [{timetrap, {seconds, 90}}]. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)), - Config. + % NOTE + % Inhibit local fs GC to simulate it isn't fast enough to collect + % complete transfers. + Storage = emqx_utils_maps:deep_merge( + emqx_ft_test_helpers:local_storage(Config), + #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}} + ), + Apps = emqx_cth_suite:start( + [ + {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}} + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_ft]), +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), ok. -set_special_configs(Config) -> - fun - (emqx_ft) -> - % NOTE - % Inhibit local fs GC to simulate it isn't fast enough to collect - % complete transfers. - Storage = emqx_utils_maps:deep_merge( - emqx_ft_test_helpers:local_storage(Config), - #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => <<"0s">>}}}} - ), - emqx_ft_test_helpers:load_config(#{ - <<"enable">> => true, - <<"storage">> => Storage - }); - (_) -> - ok - end. - init_per_testcase(Case, Config) -> ClientId = atom_to_binary(Case), case ?config(group, Config) of @@ -105,39 +99,32 @@ end_per_testcase(_Case, Config) -> ok. init_per_group(Group = cluster, Config) -> + WorkDir = ?config(priv_dir, Config), Cluster = mk_cluster_specs(Config), - ct:pal("Starting ~p", [Cluster]), - Nodes = [ - emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()}) - || {Name, Opts} <- Cluster - ], + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), [{group, Group}, {cluster_nodes, Nodes} | Config]; init_per_group(Group, Config) -> [{group, Group} | Config]. end_per_group(cluster, Config) -> - ok = lists:foreach( - fun emqx_ft_test_helpers:stop_additional_node/1, - ?config(cluster_nodes, Config) - ); + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); end_per_group(_Group, _Config) -> ok. -mk_cluster_specs(Config) -> - Specs = [ - {core, emqx_ft_SUITE1, #{listener_ports => [{tcp, 2883}]}}, - {core, emqx_ft_SUITE2, #{listener_ports => [{tcp, 3883}]}} - ], - CommOpts = [ - {env, [{emqx, boot_modules, [broker, listeners]}]}, - {apps, [emqx_ft]}, - {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, - {env_handler, set_special_configs(Config)} - ], - emqx_common_test_helpers:emqx_cluster( - Specs, - CommOpts - ). +mk_cluster_specs(_Config) -> + CommonOpts = #{ + role => core, + join_to => node(), + apps => [ + {emqx_conf, #{start => false}}, + {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}}, + {emqx_ft, "file_transfer { enable = true }"} + ] + }, + [ + {emqx_ft_SUITE1, CommonOpts}, + {emqx_ft_SUITE2, CommonOpts} + ]. %%-------------------------------------------------------------------- %% Tests diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 39d4bb7c3..25ad42d75 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -24,8 +24,6 @@ -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]). --define(SUITE_APPS, [emqx_conf, emqx_ft]). - all() -> [ {group, single}, @@ -35,62 +33,76 @@ all() -> groups() -> [ {single, [], emqx_common_test_helpers:all(?MODULE)}, - {cluster, [], emqx_common_test_helpers:all(?MODULE)} + {cluster, [], emqx_common_test_helpers:all(?MODULE) -- [t_ft_disabled]} ]. suite() -> [{timetrap, {seconds, 90}}]. init_per_suite(Config) -> - ok = emqx_mgmt_api_test_util:init_suite( - [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config) - ), - {ok, _} = emqx:update_config([rpc, port_discovery], manual), Config. + end_per_suite(_Config) -> - ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]), ok. +init_per_group(Group = single, Config) -> + WorkDir = ?config(priv_dir, Config), + Apps = emqx_cth_suite:start( + [ + {emqx, #{}}, + {emqx_ft, "file_transfer { enable = true }"}, + {emqx_management, #{}}, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => WorkDir} + ), + {ok, App} = emqx_common_test_http:create_default_app(), + [{group, Group}, {group_apps, Apps}, {api, App} | Config]; init_per_group(Group = cluster, Config) -> + WorkDir = ?config(priv_dir, Config), Cluster = mk_cluster_specs(Config), - ct:pal("Starting ~p", [Cluster]), - Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster], - InitResult = erpc:multicall(Nodes, fun() -> init_node(Config) end), - [] = [{Node, Error} || {Node, {R, Error}} <- lists:zip(Nodes, InitResult), R /= ok], - [{group, Group}, {cluster_nodes, Nodes} | Config]; -init_per_group(Group, Config) -> - [{group, Group} | Config]. + Nodes = [Node1 | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), + {ok, App} = erpc:call(Node1, emqx_common_test_http, create_default_app, []), + [{group, Group}, {cluster_nodes, Nodes}, {api, App} | Config]. +end_per_group(single, Config) -> + {ok, _} = emqx_common_test_http:delete_default_app(), + ok = emqx_cth_suite:stop(?config(group_apps, Config)); end_per_group(cluster, Config) -> - ok = lists:foreach( - fun emqx_ft_test_helpers:stop_additional_node/1, - ?config(cluster_nodes, Config) - ); + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); end_per_group(_Group, _Config) -> ok. mk_cluster_specs(_Config) -> - Specs = [ - {core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}}, - {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}}, - {replicant, emqx_ft_api_SUITE3, #{listener_ports => [{tcp, 4883}]}} + Apps = [ + {emqx_conf, #{start => false}}, + {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}}, + {emqx_ft, "file_transfer { enable = true }"}, + {emqx_management, #{}} ], - CommOpts = #{ - env => [ - {mria, db_backend, rlog}, - {emqx, boot_modules, [broker, listeners]} - ], - apps => [], - load_apps => ?SUITE_APPS, - conf => [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]] - }, - emqx_common_test_helpers:emqx_cluster( - Specs, - CommOpts - ). - -init_node(Config) -> - ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, emqx_ft_test_helpers:env_handler(Config)). + DashboardConfig = + "dashboard { \n" + " listeners.http { enable = true, bind = 0 } \n" + " default_username = \"\" \n" + " default_password = \"\" \n" + "}\n", + [ + {emqx_ft_api_SUITE1, #{ + role => core, + apps => Apps ++ + [ + {emqx_dashboard, DashboardConfig ++ "dashboard.listeners.http.bind = 18083"} + ] + }}, + {emqx_ft_api_SUITE2, #{ + role => core, + apps => Apps ++ [{emqx_dashboard, DashboardConfig}] + }}, + {emqx_ft_api_SUITE3, #{ + role => replicant, + apps => Apps ++ [{emqx_dashboard, DashboardConfig}] + }} + ]. init_per_testcase(Case, Config) -> [{tc, Case} | Config]. @@ -111,7 +123,7 @@ t_list_files(Config) -> ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), {ok, 200, #{<<"files">> := Files}} = - request_json(get, uri(["file_transfer", "files"])), + request_json(get, uri(["file_transfer", "files"]), Config), ?assertMatch( [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}], @@ -119,7 +131,7 @@ t_list_files(Config) -> ), {ok, 200, #{<<"files">> := FilesTransfer}} = - request_json(get, uri(["file_transfer", "files", ClientId, FileId])), + request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config), ?assertMatch( [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}], @@ -128,21 +140,23 @@ t_list_files(Config) -> ?assertMatch( {ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}}, - request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>])) + request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]), Config) ). t_download_transfer(Config) -> ClientId = client_id(Config), FileId = <<"f1">>, - Node = lists:last(test_nodes(Config)), - ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), + Nodes = [Node | _] = test_nodes(Config), + NodeUpload = lists:last(Nodes), + ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, NodeUpload), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, request_json( get, - uri(["file_transfer", "file"]) ++ query(#{fileref => FileId}) + uri(["file_transfer", "file"]) ++ query(#{fileref => FileId}), + Config ) ), @@ -151,7 +165,8 @@ t_download_transfer(Config) -> request( get, uri(["file_transfer", "file"]) ++ - query(#{fileref => FileId, node => <<"nonode@nohost">>}) + query(#{fileref => FileId, node => <<"nonode@nohost">>}), + Config ) ), @@ -160,7 +175,8 @@ t_download_transfer(Config) -> request( get, uri(["file_transfer", "file"]) ++ - query(#{fileref => <<"unknown_file">>, node => node()}) + query(#{fileref => <<"unknown_file">>, node => Node}), + Config ) ), @@ -169,7 +185,8 @@ t_download_transfer(Config) -> request_json( get, uri(["file_transfer", "file"]) ++ - query(#{fileref => <<>>, node => node()}) + query(#{fileref => <<>>, node => Node}), + Config ) ), @@ -178,14 +195,15 @@ t_download_transfer(Config) -> request_json( get, uri(["file_transfer", "file"]) ++ - query(#{fileref => <<"/etc/passwd">>, node => node()}) + query(#{fileref => <<"/etc/passwd">>, node => Node}), + Config ) ), {ok, 200, #{<<"files">> := [File]}} = - request_json(get, uri(["file_transfer", "files", ClientId, FileId])), + request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config), - {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)), + {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File), Config), ?assertEqual( <<"data">>, @@ -209,44 +227,47 @@ t_list_files_paging(Config) -> ?assertMatch( {ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}}, - request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3})) + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}), Config) ), {ok, 200, #{<<"files">> := Files}} = - request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})), + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config), ?assert(length(Files) >= NFiles), ?assertNotMatch( {ok, 200, #{<<"cursor">> := _}}, - request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})) + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config) ), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, - request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0})) + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}), Config) ), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, - request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>})) + request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}), Config) ), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, - request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>})) + request_json( + get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}), Config + ) ), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, request_json( get, - uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>}) + uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>}), + Config ) ), PageThrough = fun PageThrough(Query, Acc) -> - case request_json(get, uri(["file_transfer", "files"]) ++ query(Query)) of + case request_json(get, uri(["file_transfer", "files"]) ++ query(Query), Config) of {ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} -> PageThrough(Query#{following => Cursor}, Acc ++ FilesPage); {ok, 200, #{<<"files">> := FilesPage}} -> @@ -258,17 +279,18 @@ t_list_files_paging(Config) -> ?assertEqual(Files, PageThrough(#{limit => 8}, [])), ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])). -t_ft_disabled(_Config) -> +t_ft_disabled(Config) -> ?assertMatch( {ok, 200, _}, - request_json(get, uri(["file_transfer", "files"])) + request_json(get, uri(["file_transfer", "files"]), Config) ), ?assertMatch( {ok, 400, _}, request_json( get, - uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}) + uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}), + Config ) ), @@ -276,14 +298,15 @@ t_ft_disabled(_Config) -> ?assertMatch( {ok, 503, _}, - request_json(get, uri(["file_transfer", "files"])) + request_json(get, uri(["file_transfer", "files"]), Config) ), ?assertMatch( {ok, 503, _}, request_json( get, - uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()}) + uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()}), + Config ) ). @@ -308,11 +331,12 @@ mk_file_id(Prefix, N) -> mk_file_name(N) -> "file." ++ integer_to_list(N). -request(Method, Url) -> - emqx_mgmt_api_test_util:request(Method, Url, []). +request(Method, Url, Config) -> + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + emqx_mgmt_api_test_util:request_api(Method, Url, [], auth_header(Config), [], Opts). -request_json(Method, Url) -> - case emqx_mgmt_api_test_util:request(Method, Url, []) of +request_json(Method, Url, Config) -> + case request(Method, Url, Config) of {ok, Code, Body} -> {ok, Code, json(Body)}; Otherwise -> @@ -326,6 +350,10 @@ query(Params) -> KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)), "?" ++ string:join(KVs, "&"). +auth_header(Config) -> + #{api_key := ApiKey, api_secret := Secret} = ?config(api, Config), + emqx_common_test_http:auth_header(binary_to_list(ApiKey), binary_to_list(Secret)). + uri_encode(T) -> emqx_http_lib:uri_encode(to_list(T)). diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 50d8579fc..aff0741d2 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -36,11 +36,11 @@ all() -> ]. init_per_suite(Config) -> - Apps = application:ensure_all_started(gproc), + {ok, Apps} = application:ensure_all_started(gproc), [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - ok. +end_per_suite(Config) -> + emqx_cth_suite:stop_apps(?config(suite_apps, Config)). init_per_testcase(TC, Config) -> ok = snabbkaffe:start_trace(), diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index 0f949855f..bc0adf416 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -31,22 +31,20 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -init_per_testcase(_Case, Config) -> - _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema), - ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_ft], fun - (emqx_ft) -> - emqx_ft_test_helpers:load_config(#{}); - (_) -> - ok - end +init_per_testcase(Case, Config) -> + WorkDir = filename:join(?config(priv_dir, Config), Case), + Apps = emqx_cth_suite:start( + [ + {emqx_conf, #{}}, + {emqx_ft, #{config => "file_transfer {}"}} + ], + #{work_dir => WorkDir} ), - {ok, _} = emqx:update_config([rpc, port_discovery], manual), - Config. + [{suite_apps, Apps} | Config]. -end_per_testcase(_Case, _Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), - ok = emqx_config:erase(file_transfer). +end_per_testcase(_Case, Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), + ok. %%-------------------------------------------------------------------- %% Tests diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index e008bb2b1..b14fc7edd 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -22,45 +22,42 @@ -include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - _ = application:load(emqx_ft), - ok = emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start([emqx], #{work_dir => ?config(priv_dir, Config)}), + [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([]), +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), ok. init_per_testcase(TC, Config) -> SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]), ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]), - ok = emqx_common_test_helpers:start_app( + Started = emqx_cth_suite:start_app( emqx_ft, - fun(emqx_ft) -> - emqx_ft_test_helpers:load_config(#{ - <<"enable">> => true, - <<"storage">> => #{ - <<"local">> => #{ - <<"enable">> => true, - <<"segments">> => #{<<"root">> => SegmentsRoot}, - <<"exporter">> => #{ - <<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot} - } + #{ + config => emqx_ft_test_helpers:config(#{ + <<"local">> => #{ + <<"enable">> => true, + <<"segments">> => #{<<"root">> => SegmentsRoot}, + <<"exporter">> => #{ + <<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot} } } }) - end + } ), ok = snabbkaffe:start_trace(), - Config. + [{tc_apps, Started} | Config]. -end_per_testcase(_TC, _Config) -> +end_per_testcase(_TC, Config) -> ok = snabbkaffe:stop(), - ok = application:stop(emqx_ft), + ok = emqx_cth_suite:stop_apps(?config(tc_apps, Config)), ok. %% diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index 1b952bdd7..fbb3e7d6f 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -49,6 +49,9 @@ env_handler(Config) -> ok end. +config(Storage) -> + #{<<"file_transfer">> => #{<<"enable">> => true, <<"storage">> => Storage}}. + local_storage(Config) -> local_storage(Config, #{exporter => local}). diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index e86a76e1c..862d81ab8 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -200,7 +200,7 @@ t_api_listeners_list_not_ready(Config) when is_list(Config) -> L1 = get_tcp_listeners(Node1), %% test init_config not ready. - _ = rpc:call(Node1, application, set_env, [emqx, init_config_load_done, false]), + _ = rpc:call(Node1, emqx_app, set_config_loader, [emqx]), assert_config_load_not_done(Node1), L2 = get_tcp_listeners(Node1), @@ -283,12 +283,11 @@ get_tcp_listeners(Node) -> NodeStatus. assert_config_load_not_done(Node) -> - Done = rpc:call(Node, emqx_app, get_init_config_load_done, []), - ?assertNot(Done, #{node => Node}). + Prio = rpc:call(Node, emqx_app, get_config_loader, []), + ?assertEqual(emqx, Prio, #{node => Node}). cluster(Specs) -> Env = [ - {emqx, init_config_load_done, false}, {emqx, boot_modules, []} ], emqx_common_test_helpers:emqx_cluster(Specs, [ @@ -299,7 +298,7 @@ cluster(Specs) -> (emqx) -> application:set_env(emqx, boot_modules, []), %% test init_config not ready. - application:set_env(emqx, init_config_load_done, false), + emqx_app:set_config_loader(emqx), ok; (_) -> ok diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index bbd17dd5b..997715e24 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -32,7 +32,8 @@ init_suite(Apps, SetConfigs) when is_function(SetConfigs) -> init_suite(Apps, SetConfigs, Opts) -> application:load(emqx_management), emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], SetConfigs, Opts), - emqx_common_test_http:create_default_app(). + _ = emqx_common_test_http:create_default_app(), + ok. end_suite() -> end_suite([]). diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index 0325ab030..f9b9ef766 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -28,13 +28,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - [application:load(App) || App <- apps_to_start() ++ apps_to_load()], Config. end_per_suite(_Config) -> ok. -init_per_testcase(t_import_on_cluster, Config) -> +init_per_testcase(TC = t_import_on_cluster, Config0) -> %% Don't import listeners to avoid port conflicts %% when the same conf will be imported to another cluster meck:new(emqx_mgmt_listeners_conf, [passthrough]), @@ -51,23 +50,25 @@ init_per_testcase(t_import_on_cluster, Config) -> 1, {ok, #{changed => [], root_key => gateway}} ), + Config = [{tc_name, TC} | Config0], [{cluster, cluster(Config)} | setup(Config)]; -init_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) -> +init_per_testcase(TC = t_verify_imported_mnesia_tab_on_cluster, Config0) -> + Config = [{tc_name, TC} | Config0], [{cluster, cluster(Config)} | setup(Config)]; init_per_testcase(t_mnesia_bad_tab_schema, Config) -> meck:new(emqx_mgmt_data_backup, [passthrough]), - meck:expect(emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]), - setup(Config); -init_per_testcase(_TestCase, Config) -> - setup(Config). + meck:expect(TC = emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]), + setup([{tc_name, TC} | Config]); +init_per_testcase(TC, Config) -> + setup([{tc_name, TC} | Config]). end_per_testcase(t_import_on_cluster, Config) -> - cleanup_cluster(?config(cluster, Config)), + emqx_cth_cluster:stop(?config(cluster, Config)), cleanup(Config), meck:unload(emqx_mgmt_listeners_conf), meck:unload(emqx_gateway_conf); end_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) -> - cleanup_cluster(?config(cluster, Config)), + emqx_cth_cluster:stop(?config(cluster, Config)), cleanup(Config); end_per_testcase(t_mnesia_bad_tab_schema, Config) -> cleanup(Config), @@ -356,8 +357,6 @@ t_mnesia_bad_tab_schema(_Config) -> t_read_files(_Config) -> DataDir = emqx:data_dir(), - %% Relative "data" path is set in init_per_testcase/2, asserting it must be safe - ?assertEqual("data", DataDir), {ok, Cwd} = file:get_cwd(), AbsDataDir = filename:join(Cwd, DataDir), FileBaseName = "t_read_files_tmp_file", @@ -388,30 +387,12 @@ t_read_files(_Config) -> %%------------------------------------------------------------------------------ setup(Config) -> - %% avoid port conflicts if the cluster is started - AppHandler = fun - (emqx_dashboard) -> - ok = emqx_config:put([dashboard, listeners, http, bind], 0); - (_) -> - ok - end, - ok = emqx_common_test_helpers:start_apps(apps_to_start(), AppHandler), - PrevDataDir = application:get_env(emqx, data_dir), - application:set_env(emqx, data_dir, "data"), - [{previous_emqx_data_dir, PrevDataDir} | Config]. + WorkDir = filename:join(work_dir(Config), local), + Started = emqx_cth_suite:start(apps_to_start(), #{work_dir => WorkDir}), + [{suite_apps, Started} | Config]. cleanup(Config) -> - emqx_common_test_helpers:stop_apps(apps_to_start()), - case ?config(previous_emqx_data_dir, Config) of - undefined -> - application:unset_env(emqx, data_dir); - {ok, Val} -> - application:set_env(emqx, data_dir, Val) - end. - -cleanup_cluster(ClusterNodes) -> - [rpc:call(N, ekka, leave, []) || N <- lists:reverse(ClusterNodes)], - [emqx_common_test_helpers:stop_slave(N) || N <- ClusterNodes]. + emqx_cth_suite:stop(?config(suite_apps, Config)). users(Prefix) -> [ @@ -428,50 +409,18 @@ recompose_version(MajorInt, MinorInt, Patch) -> ). cluster(Config) -> - PrivDataDir = ?config(priv_dir, Config), - [{Core1, Core1Opts}, {Core2, Core2Opts}, {Replicant, ReplOpts}] = - emqx_common_test_helpers:emqx_cluster( - [ - {core, data_backup_core1}, - {core, data_backup_core2}, - {replicant, data_backup_replicant} - ], - #{ - priv_data_dir => PrivDataDir, - schema_mod => emqx_conf_schema, - apps => apps_to_start(), - load_apps => apps_to_start() ++ apps_to_load(), - env => [{mria, db_backend, rlog}], - load_schema => true, - start_autocluster => true, - listener_ports => [], - conf => [{[dashboard, listeners, http, bind], 0}], - env_handler => - fun(_) -> - application:set_env(emqx, boot_modules, [broker, router]) - end - } - ), - Node1 = emqx_common_test_helpers:start_slave(Core1, Core1Opts), - Node2 = emqx_common_test_helpers:start_slave(Core2, Core2Opts), - #{conf := _ReplConf, env := ReplEnv} = ReplOpts, - ClusterDiscovery = {static, [{seeds, [Node1, Node2]}]}, - ReplOpts1 = maps:remove( - join_to, - ReplOpts#{ - env => [{ekka, cluster_discovery, ClusterDiscovery} | ReplEnv], - env_handler => fun(_) -> - application:set_env(emqx, boot_modules, [broker, router]), - application:set_env( - ekka, - cluster_discovery, - ClusterDiscovery - ) - end - } + Nodes = emqx_cth_cluster:start( + [ + {data_backup_core1, #{role => core, apps => apps_to_start()}}, + {data_backup_core2, #{role => core, apps => apps_to_start()}}, + {data_backup_replicant, #{role => replicant, apps => apps_to_start()}} + ], + #{work_dir => work_dir(Config)} ), - ReplNode = emqx_common_test_helpers:start_slave(Replicant, ReplOpts1), - [Node1, Node2, ReplNode]. + Nodes. + +work_dir(Config) -> + filename:join(?config(priv_dir, Config), ?config(tc_name, Config)). create_test_tab(Attributes) -> ok = mria:create_table(data_backup_test, [ @@ -491,8 +440,8 @@ create_test_tab(Attributes) -> apps_to_start() -> [ - emqx, - emqx_conf, + {emqx_conf, "dashboard.listeners.http.bind = 0"}, + {emqx, #{override_env => [{boot_modules, [broker, router]}]}}, emqx_psk, emqx_management, emqx_dashboard, @@ -505,11 +454,9 @@ apps_to_start() -> emqx_gateway, emqx_exhook, emqx_bridge, - emqx_auto_subscribe - ]. + emqx_auto_subscribe, -apps_to_load() -> - [ + % loaded only emqx_gateway_lwm2m, emqx_gateway_coap, emqx_gateway_exproto, diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 8d168ec8b..d6dee2c1e 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -523,7 +523,6 @@ group_t_copy_plugin_to_a_new_node({init, Config}) -> #{ apps => [emqx_conf, emqx_plugins], env => [ - {emqx, init_config_load_done, false}, {emqx, boot_modules, []} ], load_schema => false @@ -621,7 +620,6 @@ group_t_copy_plugin_to_a_new_node_single_node({init, Config}) -> #{ apps => [emqx_conf, emqx_plugins], env => [ - {emqx, init_config_load_done, false}, {emqx, boot_modules, []} ], env_handler => fun @@ -690,7 +688,6 @@ group_t_cluster_leave({init, Config}) -> #{ apps => [emqx_conf, emqx_plugins], env => [ - {emqx, init_config_load_done, false}, {emqx, boot_modules, []} ], env_handler => fun diff --git a/mix.exs b/mix.exs index d5a55d6e7..1e18ef8ef 100644 --- a/mix.exs +++ b/mix.exs @@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.39.10", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.39.11", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, @@ -314,7 +314,8 @@ defmodule EMQXUmbrella.MixProject do :emqx_prometheus, :emqx_auto_subscribe, :emqx_slow_subs, - :emqx_plugins + :emqx_plugins, + :emqx_ft ], steps: steps, strip_beams: false diff --git a/rebar.config b/rebar.config index 94a33ec82..ff55d2e70 100644 --- a/rebar.config +++ b/rebar.config @@ -75,7 +75,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.11"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}