Merge pull request #11120 from ft/EMQX-10329/tooling
test: add tooling to make testruns more isolated and predictable
This commit is contained in:
commit
8cc7022760
|
@ -12,16 +12,12 @@ ebin
|
||||||
!ebin/.placeholder
|
!ebin/.placeholder
|
||||||
.concrete/DEV_MODE
|
.concrete/DEV_MODE
|
||||||
.rebar
|
.rebar
|
||||||
test/ebin/*.beam
|
|
||||||
.exrc
|
.exrc
|
||||||
plugins/*/ebin
|
|
||||||
*.swp
|
*.swp
|
||||||
*.so
|
*.so
|
||||||
.erlang.mk/
|
.erlang.mk/
|
||||||
cover/
|
cover/
|
||||||
eunit.coverdata
|
|
||||||
test/ct.cover.spec
|
test/ct.cover.spec
|
||||||
ct.coverdata
|
|
||||||
.idea/
|
.idea/
|
||||||
_build
|
_build
|
||||||
.rebar3
|
.rebar3
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
|
||||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}},
|
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.11"}}},
|
||||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
|
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
|
||||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||||
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
||||||
|
|
|
@ -24,17 +24,14 @@
|
||||||
stop/1,
|
stop/1,
|
||||||
get_description/0,
|
get_description/0,
|
||||||
get_release/0,
|
get_release/0,
|
||||||
set_init_config_load_done/0,
|
set_config_loader/1,
|
||||||
get_init_config_load_done/0,
|
get_config_loader/0,
|
||||||
set_init_tnx_id/1,
|
set_init_tnx_id/1,
|
||||||
get_init_tnx_id/0
|
get_init_tnx_id/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
|
||||||
-define(APP, emqx).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Application callbacks
|
%% Application callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -62,11 +59,11 @@ stop(_State) -> ok.
|
||||||
%% @doc Call this function to make emqx boot without loading config,
|
%% @doc Call this function to make emqx boot without loading config,
|
||||||
%% in case we want to delegate the config load to a higher level app
|
%% in case we want to delegate the config load to a higher level app
|
||||||
%% which manages emqx app.
|
%% which manages emqx app.
|
||||||
set_init_config_load_done() ->
|
set_config_loader(Module) when is_atom(Module) ->
|
||||||
application:set_env(emqx, init_config_load_done, true).
|
application:set_env(emqx, config_loader, Module).
|
||||||
|
|
||||||
get_init_config_load_done() ->
|
get_config_loader() ->
|
||||||
application:get_env(emqx, init_config_load_done, false).
|
application:get_env(emqx, config_loader, emqx).
|
||||||
|
|
||||||
%% @doc Set the transaction id from which this node should start applying after boot.
|
%% @doc Set the transaction id from which this node should start applying after boot.
|
||||||
%% The transaction ID is received from the core node which we just copied the latest
|
%% The transaction ID is received from the core node which we just copied the latest
|
||||||
|
@ -79,9 +76,15 @@ get_init_tnx_id() ->
|
||||||
application:get_env(emqx, cluster_rpc_init_tnx_id, -1).
|
application:get_env(emqx, cluster_rpc_init_tnx_id, -1).
|
||||||
|
|
||||||
maybe_load_config() ->
|
maybe_load_config() ->
|
||||||
case get_init_config_load_done() of
|
case get_config_loader() of
|
||||||
true -> ok;
|
emqx ->
|
||||||
false -> emqx_config:init_load(emqx_schema)
|
emqx_config:init_load(emqx_schema);
|
||||||
|
Module ->
|
||||||
|
?SLOG(debug, #{
|
||||||
|
msg => "skip_init_config_load",
|
||||||
|
reason => "Some application has set another config loader",
|
||||||
|
loader => Module
|
||||||
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
maybe_start_listeners() ->
|
maybe_start_listeners() ->
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
%% @doc Start/Stop MQTT listeners.
|
%% @doc Start/Stop MQTT listeners.
|
||||||
-module(emqx_listeners).
|
-module(emqx_listeners).
|
||||||
|
|
||||||
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]).
|
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
-include("emqx_schema.hrl").
|
-include("emqx_schema.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
@ -98,7 +96,7 @@ format_list(Listener) ->
|
||||||
|
|
||||||
do_list_raw() ->
|
do_list_raw() ->
|
||||||
%% GET /listeners from other nodes returns [] when init config is not loaded.
|
%% GET /listeners from other nodes returns [] when init config is not loaded.
|
||||||
case emqx_app:get_init_config_load_done() of
|
case emqx_app:get_config_loader() =/= emqx of
|
||||||
true ->
|
true ->
|
||||||
Key = <<"listeners">>,
|
Key = <<"listeners">>,
|
||||||
Raw = emqx_config:get_raw([Key], #{}),
|
Raw = emqx_config:get_raw([Key], #{}),
|
||||||
|
|
|
@ -67,6 +67,7 @@
|
||||||
-export([
|
-export([
|
||||||
emqx_cluster/1,
|
emqx_cluster/1,
|
||||||
emqx_cluster/2,
|
emqx_cluster/2,
|
||||||
|
start_ekka/0,
|
||||||
start_epmd/0,
|
start_epmd/0,
|
||||||
start_slave/2,
|
start_slave/2,
|
||||||
stop_slave/1,
|
stop_slave/1,
|
||||||
|
@ -348,7 +349,7 @@ stop_apps(Apps, Opts) ->
|
||||||
[application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
|
[application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
|
||||||
ok = mria_mnesia:delete_schema(),
|
ok = mria_mnesia:delete_schema(),
|
||||||
%% to avoid inter-suite flakiness
|
%% to avoid inter-suite flakiness
|
||||||
application:unset_env(emqx, init_config_load_done),
|
application:unset_env(emqx, config_loader),
|
||||||
application:unset_env(emqx, boot_modules),
|
application:unset_env(emqx, boot_modules),
|
||||||
persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY),
|
persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY),
|
||||||
case Opts of
|
case Opts of
|
||||||
|
@ -910,7 +911,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
||||||
set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir),
|
set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir),
|
||||||
set_env_once("EMQX_NODE__COOKIE", Cookie),
|
set_env_once("EMQX_NODE__COOKIE", Cookie),
|
||||||
emqx_config:init_load(SchemaMod),
|
emqx_config:init_load(SchemaMod),
|
||||||
application:set_env(emqx, init_config_load_done, true)
|
emqx_app:set_config_loader(emqx_conf)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
%% Need to set this otherwise listeners will conflict between each other
|
%% Need to set this otherwise listeners will conflict between each other
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
create_default_app/0,
|
create_default_app/0,
|
||||||
delete_default_app/0,
|
delete_default_app/0,
|
||||||
default_auth_header/0,
|
default_auth_header/0,
|
||||||
|
auth_header/1,
|
||||||
auth_header/2
|
auth_header/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -72,6 +73,9 @@ do_request_api(Method, Request, HttpOpts) ->
|
||||||
get_http_data(ResponseBody) ->
|
get_http_data(ResponseBody) ->
|
||||||
emqx_utils_json:decode(ResponseBody, [return_maps]).
|
emqx_utils_json:decode(ResponseBody, [return_maps]).
|
||||||
|
|
||||||
|
auth_header(#{api_key := ApiKey, api_secret := Secret}) ->
|
||||||
|
auth_header(binary_to_list(ApiKey), binary_to_list(Secret)).
|
||||||
|
|
||||||
auth_header(User, Pass) ->
|
auth_header(User, Pass) ->
|
||||||
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
|
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
|
||||||
{"Authorization", "Basic " ++ Encoded}.
|
{"Authorization", "Basic " ++ Encoded}.
|
||||||
|
@ -87,8 +91,7 @@ create_default_app() ->
|
||||||
ExpiredAt = Now + timer:minutes(10),
|
ExpiredAt = Now + timer:minutes(10),
|
||||||
emqx_mgmt_auth:create(
|
emqx_mgmt_auth:create(
|
||||||
?DEFAULT_APP_ID, ?DEFAULT_APP_SECRET, true, ExpiredAt, <<"default app key for test">>
|
?DEFAULT_APP_ID, ?DEFAULT_APP_SECRET, true, ExpiredAt, <<"default app key for test">>
|
||||||
),
|
).
|
||||||
ok.
|
|
||||||
|
|
||||||
delete_default_app() ->
|
delete_default_app() ->
|
||||||
emqx_mgmt_auth:delete(?DEFAULT_APP_ID).
|
emqx_mgmt_auth:delete(?DEFAULT_APP_ID).
|
||||||
|
|
|
@ -0,0 +1,417 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_cth_cluster).
|
||||||
|
|
||||||
|
-export([start/2]).
|
||||||
|
-export([stop/1]).
|
||||||
|
|
||||||
|
-export([share_load_module/2]).
|
||||||
|
|
||||||
|
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
|
||||||
|
|
||||||
|
-define(TIMEOUT_NODE_START_MS, 15000).
|
||||||
|
-define(TIMEOUT_APPS_START_MS, 30000).
|
||||||
|
-define(TIMEOUT_NODE_STOP_S, 15).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-type nodespec() :: {_ShortName :: atom(), #{
|
||||||
|
% DB Role
|
||||||
|
% Default: `core`
|
||||||
|
role => core | replicant,
|
||||||
|
|
||||||
|
% DB Backend
|
||||||
|
% Default: `mnesia` if there are no replicants in cluster, otherwise `rlog`
|
||||||
|
%
|
||||||
|
% NOTE
|
||||||
|
% Default are chosen with the intention of lowering the chance of observing
|
||||||
|
% inconsistencies due to data races (i.e. missing mria shards on nodes where some
|
||||||
|
% application hasn't been started yet).
|
||||||
|
db_backend => mnesia | rlog,
|
||||||
|
|
||||||
|
% Applications to start on the node
|
||||||
|
% Default: only applications needed for clustering are started
|
||||||
|
%
|
||||||
|
% NOTES
|
||||||
|
% 1. Apps needed for clustering started unconditionally.
|
||||||
|
% * It's not possible to redefine their startup order.
|
||||||
|
% * It's possible to add `{ekka, #{start => false}}` appspec though.
|
||||||
|
% 2. There are defaults applied to some appspecs if they present.
|
||||||
|
% * We try to keep `emqx_conf` config consistent with default configuration of
|
||||||
|
% clustering applications.
|
||||||
|
apps => [emqx_cth_suite:appspec()],
|
||||||
|
|
||||||
|
base_port => inet:port_number(),
|
||||||
|
|
||||||
|
% Node to join to in clustering phase
|
||||||
|
% If set to `undefined` this node won't try to join the cluster
|
||||||
|
% Default: no (first core node is used to join to by default)
|
||||||
|
join_to => node() | undefined,
|
||||||
|
|
||||||
|
%% Working directory
|
||||||
|
%% If this directory is not empty, starting up the node applications will fail
|
||||||
|
%% Default: "${ClusterOpts.work_dir}/${nodename}"
|
||||||
|
work_dir => file:name(),
|
||||||
|
|
||||||
|
% Tooling to manage nodes
|
||||||
|
% Default: `ct_slave`.
|
||||||
|
driver => ct_slave | slave
|
||||||
|
}}.
|
||||||
|
|
||||||
|
-spec start([nodespec()], ClusterOpts) ->
|
||||||
|
[node()]
|
||||||
|
when
|
||||||
|
ClusterOpts :: #{
|
||||||
|
%% Working directory
|
||||||
|
%% Everything a test produces should go here. Each node's stuff should go in its
|
||||||
|
%% own directory.
|
||||||
|
work_dir := file:name()
|
||||||
|
}.
|
||||||
|
start(Nodes, ClusterOpts) ->
|
||||||
|
NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
|
||||||
|
ct:pal("Starting cluster: ~p", [NodeSpecs]),
|
||||||
|
% 1. Start bare nodes with only basic applications running
|
||||||
|
_ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS),
|
||||||
|
% 2. Start applications needed to enable clustering
|
||||||
|
% Generally, this causes some applications to restart, but we deliberately don't
|
||||||
|
% start them yet.
|
||||||
|
_ = lists:foreach(fun run_node_phase_cluster/1, NodeSpecs),
|
||||||
|
% 3. Start applications after cluster is formed
|
||||||
|
% Cluster-joins are complete, so they shouldn't restart in the background anymore.
|
||||||
|
_ = emqx_utils:pmap(fun run_node_phase_apps/1, NodeSpecs, ?TIMEOUT_APPS_START_MS),
|
||||||
|
[Node || #{name := Node} <- NodeSpecs].
|
||||||
|
|
||||||
|
mk_nodespecs(Nodes, ClusterOpts) ->
|
||||||
|
NodeSpecs = lists:zipwith(
|
||||||
|
fun(N, {Name, Opts}) -> mk_init_nodespec(N, Name, Opts, ClusterOpts) end,
|
||||||
|
lists:seq(1, length(Nodes)),
|
||||||
|
Nodes
|
||||||
|
),
|
||||||
|
CoreNodes = [Node || #{name := Node, role := core} <- NodeSpecs],
|
||||||
|
Backend =
|
||||||
|
case length(CoreNodes) of
|
||||||
|
L when L == length(NodeSpecs) ->
|
||||||
|
mnesia;
|
||||||
|
_ ->
|
||||||
|
rlog
|
||||||
|
end,
|
||||||
|
lists:map(
|
||||||
|
fun(Spec0) ->
|
||||||
|
Spec1 = maps:merge(#{core_nodes => CoreNodes, db_backend => Backend}, Spec0),
|
||||||
|
Spec2 = merge_default_appspecs(Spec1, NodeSpecs),
|
||||||
|
Spec3 = merge_clustering_appspecs(Spec2, NodeSpecs),
|
||||||
|
Spec3
|
||||||
|
end,
|
||||||
|
NodeSpecs
|
||||||
|
).
|
||||||
|
|
||||||
|
mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) ->
|
||||||
|
Node = node_name(Name),
|
||||||
|
BasePort = base_port(N),
|
||||||
|
WorkDir = maps:get(work_dir, ClusterOpts),
|
||||||
|
Defaults = #{
|
||||||
|
name => Node,
|
||||||
|
role => core,
|
||||||
|
apps => [],
|
||||||
|
base_port => BasePort,
|
||||||
|
work_dir => filename:join([WorkDir, Node]),
|
||||||
|
driver => ct_slave
|
||||||
|
},
|
||||||
|
maps:merge(Defaults, NodeOpts).
|
||||||
|
|
||||||
|
merge_default_appspecs(#{apps := Apps} = Spec, NodeSpecs) ->
|
||||||
|
Spec#{apps => [mk_node_appspec(App, Spec, NodeSpecs) || App <- Apps]}.
|
||||||
|
|
||||||
|
merge_clustering_appspecs(#{apps := Apps} = Spec, NodeSpecs) ->
|
||||||
|
AppsClustering = lists:map(
|
||||||
|
fun(App) ->
|
||||||
|
case lists:keyfind(App, 1, Apps) of
|
||||||
|
AppSpec = {App, _} ->
|
||||||
|
AppSpec;
|
||||||
|
false ->
|
||||||
|
{App, default_appspec(App, Spec, NodeSpecs)}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
?APPS_CLUSTERING
|
||||||
|
),
|
||||||
|
AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
|
||||||
|
Spec#{apps => AppsClustering ++ AppsRest}.
|
||||||
|
|
||||||
|
mk_node_appspec({App, Opts}, Spec, NodeSpecs) ->
|
||||||
|
{App, emqx_cth_suite:merge_appspec(default_appspec(App, Spec, NodeSpecs), Opts)};
|
||||||
|
mk_node_appspec(App, Spec, NodeSpecs) ->
|
||||||
|
{App, default_appspec(App, Spec, NodeSpecs)}.
|
||||||
|
|
||||||
|
default_appspec(gen_rpc, #{name := Node}, NodeSpecs) ->
|
||||||
|
NodePorts = lists:foldl(
|
||||||
|
fun(#{name := CNode, base_port := Port}, Acc) ->
|
||||||
|
Acc#{CNode => {tcp, gen_rpc_port(Port)}}
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
NodeSpecs
|
||||||
|
),
|
||||||
|
{tcp, Port} = maps:get(Node, NodePorts),
|
||||||
|
#{
|
||||||
|
override_env => [
|
||||||
|
% NOTE
|
||||||
|
% This is needed to make sure `gen_rpc` peers will find each other.
|
||||||
|
{port_discovery, manual},
|
||||||
|
{tcp_server_port, Port},
|
||||||
|
{client_config_per_node, {internal, NodePorts}}
|
||||||
|
]
|
||||||
|
};
|
||||||
|
default_appspec(mria, #{role := Role, db_backend := Backend}, _NodeSpecs) ->
|
||||||
|
#{
|
||||||
|
override_env => [
|
||||||
|
{node_role, Role},
|
||||||
|
{db_backend, Backend}
|
||||||
|
]
|
||||||
|
};
|
||||||
|
default_appspec(ekka, Spec, _NodeSpecs) ->
|
||||||
|
Overrides =
|
||||||
|
case get_cluster_seeds(Spec) of
|
||||||
|
[_ | _] = Seeds ->
|
||||||
|
% NOTE
|
||||||
|
% Presumably, this is needed for replicants to find core nodes.
|
||||||
|
[{cluster_discovery, {static, [{seeds, Seeds}]}}];
|
||||||
|
[] ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
#{
|
||||||
|
override_env => Overrides
|
||||||
|
};
|
||||||
|
default_appspec(emqx_conf, Spec, _NodeSpecs) ->
|
||||||
|
% NOTE
|
||||||
|
% This usually sets up a lot of `gen_rpc` / `mria` / `ekka` application envs in
|
||||||
|
% `emqx_config:init_load/2` during configuration mapping, so we need to keep them
|
||||||
|
% in sync with the values we set up here.
|
||||||
|
#{
|
||||||
|
name := Node,
|
||||||
|
role := Role,
|
||||||
|
db_backend := Backend,
|
||||||
|
base_port := BasePort,
|
||||||
|
work_dir := WorkDir
|
||||||
|
} = Spec,
|
||||||
|
Listeners = [
|
||||||
|
#{Type => #{default => #{bind => format("127.0.0.1:~p", [Port])}}}
|
||||||
|
|| Type <- [tcp, ssl, ws, wss],
|
||||||
|
Port <- [listener_port(BasePort, Type)]
|
||||||
|
],
|
||||||
|
Cluster =
|
||||||
|
case get_cluster_seeds(Spec) of
|
||||||
|
[_ | _] = Seeds ->
|
||||||
|
% NOTE
|
||||||
|
% Presumably, this is needed for replicants to find core nodes.
|
||||||
|
#{discovery_strategy => static, static => #{seeds => Seeds}};
|
||||||
|
[] ->
|
||||||
|
#{}
|
||||||
|
end,
|
||||||
|
#{
|
||||||
|
config => #{
|
||||||
|
node => #{
|
||||||
|
name => Node,
|
||||||
|
role => Role,
|
||||||
|
cookie => erlang:get_cookie(),
|
||||||
|
% TODO: will it be synced to the same value eventually?
|
||||||
|
data_dir => unicode:characters_to_binary(WorkDir),
|
||||||
|
db_backend => Backend
|
||||||
|
},
|
||||||
|
cluster => Cluster,
|
||||||
|
rpc => #{
|
||||||
|
% NOTE
|
||||||
|
% This (along with `gen_rpc` env overrides) is needed to make sure `gen_rpc`
|
||||||
|
% peers will find each other.
|
||||||
|
protocol => tcp,
|
||||||
|
tcp_server_port => gen_rpc_port(BasePort),
|
||||||
|
port_discovery => manual
|
||||||
|
},
|
||||||
|
listeners => lists:foldl(fun maps:merge/2, #{}, Listeners)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
default_appspec(_App, _, _) ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
|
get_cluster_seeds(#{join_to := undefined}) ->
|
||||||
|
[];
|
||||||
|
get_cluster_seeds(#{join_to := Node}) ->
|
||||||
|
[Node];
|
||||||
|
get_cluster_seeds(#{core_nodes := CoreNodes}) ->
|
||||||
|
CoreNodes.
|
||||||
|
|
||||||
|
start_node_init(Spec = #{name := Node}) ->
|
||||||
|
Node = start_bare_node(Node, Spec),
|
||||||
|
pong = net_adm:ping(Node),
|
||||||
|
% Preserve node spec right on the remote node
|
||||||
|
ok = set_node_opts(Node, Spec),
|
||||||
|
% Make it possible to call `ct:pal` and friends (if running under rebar3)
|
||||||
|
_ = share_load_module(Node, cthr),
|
||||||
|
% Enable snabbkaffe trace forwarding
|
||||||
|
ok = snabbkaffe:forward_trace(Node),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
run_node_phase_cluster(Spec = #{name := Node}) ->
|
||||||
|
ok = load_apps(Node, Spec),
|
||||||
|
ok = start_apps_clustering(Node, Spec),
|
||||||
|
ok = maybe_join_cluster(Node, Spec),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
run_node_phase_apps(Spec = #{name := Node}) ->
|
||||||
|
ok = start_apps(Node, Spec),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
set_node_opts(Node, Spec) ->
|
||||||
|
erpc:call(Node, persistent_term, put, [{?MODULE, opts}, Spec]).
|
||||||
|
|
||||||
|
get_node_opts(Node) ->
|
||||||
|
erpc:call(Node, persistent_term, get, [{?MODULE, opts}]).
|
||||||
|
|
||||||
|
load_apps(Node, #{apps := Apps}) ->
|
||||||
|
erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
|
||||||
|
|
||||||
|
start_apps_clustering(Node, #{apps := Apps} = Spec) ->
|
||||||
|
SuiteOpts = maps:with([work_dir], Spec),
|
||||||
|
AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING],
|
||||||
|
_Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
start_apps(Node, #{apps := Apps} = Spec) ->
|
||||||
|
SuiteOpts = maps:with([work_dir], Spec),
|
||||||
|
AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
|
||||||
|
_Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
maybe_join_cluster(_Node, #{role := replicant}) ->
|
||||||
|
ok;
|
||||||
|
maybe_join_cluster(Node, Spec) ->
|
||||||
|
case get_cluster_seeds(Spec) of
|
||||||
|
[JoinTo | _] ->
|
||||||
|
ok = join_cluster(Node, JoinTo);
|
||||||
|
[] ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
join_cluster(Node, JoinTo) ->
|
||||||
|
case erpc:call(Node, ekka, join, [JoinTo]) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
ignore ->
|
||||||
|
ok;
|
||||||
|
Error ->
|
||||||
|
error({failed_to_join_cluster, #{node => Node, error => Error}})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
stop(Nodes) ->
|
||||||
|
_ = emqx_utils:pmap(fun stop_node/1, Nodes, ?TIMEOUT_NODE_STOP_S * 1000),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
stop_node(Name) ->
|
||||||
|
Node = node_name(Name),
|
||||||
|
try get_node_opts(Node) of
|
||||||
|
Opts ->
|
||||||
|
stop_node(Name, Opts)
|
||||||
|
catch
|
||||||
|
error:{erpc, _} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
stop_node(Node, #{driver := ct_slave}) ->
|
||||||
|
case ct_slave:stop(Node, [{stop_timeout, ?TIMEOUT_NODE_STOP_S}]) of
|
||||||
|
{ok, _} ->
|
||||||
|
ok;
|
||||||
|
{error, Reason, _} when Reason == not_connected; Reason == not_started ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
stop_node(Node, #{driver := slave}) ->
|
||||||
|
slave:stop(Node).
|
||||||
|
|
||||||
|
%% Ports
|
||||||
|
|
||||||
|
base_port(Number) ->
|
||||||
|
10000 + Number * 100.
|
||||||
|
|
||||||
|
gen_rpc_port(BasePort) ->
|
||||||
|
BasePort - 1.
|
||||||
|
|
||||||
|
listener_port(BasePort, tcp) ->
|
||||||
|
BasePort;
|
||||||
|
listener_port(BasePort, ssl) ->
|
||||||
|
BasePort + 1;
|
||||||
|
listener_port(BasePort, quic) ->
|
||||||
|
BasePort + 2;
|
||||||
|
listener_port(BasePort, ws) ->
|
||||||
|
BasePort + 3;
|
||||||
|
listener_port(BasePort, wss) ->
|
||||||
|
BasePort + 4.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
start_bare_node(Name, #{driver := ct_slave}) ->
|
||||||
|
{ok, Node} = ct_slave:start(
|
||||||
|
node_name(Name),
|
||||||
|
[
|
||||||
|
{kill_if_fail, true},
|
||||||
|
{monitor_master, true},
|
||||||
|
{init_timeout, 20_000},
|
||||||
|
{startup_timeout, 20_000},
|
||||||
|
{erl_flags, erl_flags()},
|
||||||
|
{env, []}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
Node;
|
||||||
|
start_bare_node(Name, #{driver := slave}) ->
|
||||||
|
{ok, Node} = slave:start_link(host(), Name, ebin_path()),
|
||||||
|
Node.
|
||||||
|
|
||||||
|
erl_flags() ->
|
||||||
|
%% One core and redirecting logs to master
|
||||||
|
"+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().
|
||||||
|
|
||||||
|
ebin_path() ->
|
||||||
|
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
||||||
|
|
||||||
|
is_lib(Path) ->
|
||||||
|
string:prefix(Path, code:lib_dir()) =:= nomatch andalso
|
||||||
|
string:str(Path, "_build/default/plugins") =:= 0.
|
||||||
|
|
||||||
|
share_load_module(Node, Module) ->
|
||||||
|
case code:get_object_code(Module) of
|
||||||
|
{Module, Code, Filename} ->
|
||||||
|
{module, Module} = erpc:call(Node, code, load_binary, [Module, Filename, Code]),
|
||||||
|
ok;
|
||||||
|
error ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
node_name(Name) ->
|
||||||
|
case string:tokens(atom_to_list(Name), "@") of
|
||||||
|
[_Name, _Host] ->
|
||||||
|
%% the name already has a @
|
||||||
|
Name;
|
||||||
|
_ ->
|
||||||
|
list_to_atom(atom_to_list(Name) ++ "@" ++ host())
|
||||||
|
end.
|
||||||
|
|
||||||
|
host() ->
|
||||||
|
[_, Host] = string:tokens(atom_to_list(node()), "@"),
|
||||||
|
Host.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
format(Format, Args) ->
|
||||||
|
unicode:characters_to_binary(io_lib:format(Format, Args)).
|
|
@ -0,0 +1,374 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_cth_suite).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_authentication.hrl").
|
||||||
|
|
||||||
|
-export([start/2]).
|
||||||
|
-export([stop/1]).
|
||||||
|
|
||||||
|
-export([load_apps/1]).
|
||||||
|
-export([start_apps/2]).
|
||||||
|
-export([start_app/2]).
|
||||||
|
-export([start_app/3]).
|
||||||
|
-export([stop_apps/1]).
|
||||||
|
|
||||||
|
-export([merge_appspec/2]).
|
||||||
|
|
||||||
|
-export_type([appspec/0]).
|
||||||
|
-export_type([appspec_opts/0]).
|
||||||
|
|
||||||
|
-define(NOW,
|
||||||
|
(calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}]))
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(PAL(IMPORTANCE, FMT, ARGS),
|
||||||
|
case erlang:whereis(ct_logs) of
|
||||||
|
undefined ->
|
||||||
|
io:format("*** " ?MODULE_STRING " ~s @ ~p ***~n" ++ FMT ++ "~n", [?NOW, node() | ARGS]);
|
||||||
|
_ ->
|
||||||
|
ct:pal(?MODULE, IMPORTANCE, FMT, ARGS, [{heading, ?MODULE_STRING}])
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-type appname() :: atom().
|
||||||
|
-type appspec() :: {appname(), appspec_opts()}.
|
||||||
|
|
||||||
|
%% Config structure serializable into HOCON document.
|
||||||
|
-type config() :: #{atom() => scalar() | [scalar()] | config() | [config()]}.
|
||||||
|
-type scalar() :: atom() | number() | string() | binary().
|
||||||
|
|
||||||
|
-type appspec_opts() :: #{
|
||||||
|
%% 1. Enable loading application config
|
||||||
|
%% If not defined or set to `false`, this step will be skipped.
|
||||||
|
%% If application is missing a schema module, this step will fail.
|
||||||
|
%% Merging amounts to appending, unless `false` is used, then merge result is also `false`.
|
||||||
|
config => iodata() | config() | emqx_config:raw_config() | false,
|
||||||
|
|
||||||
|
%% 2. Override the application environment
|
||||||
|
%% If not defined or set to `false`, this step will be skipped.
|
||||||
|
%% Merging amounts to appending, unless `false` is used, then merge result is `[]`.
|
||||||
|
override_env => [{atom(), term()}] | false,
|
||||||
|
|
||||||
|
%% 3. Perform anything right before starting the application
|
||||||
|
%% If not defined or set to `false`, this step will be skipped.
|
||||||
|
%% Merging amounts to redefining.
|
||||||
|
before_start => fun(() -> _) | fun((appname()) -> _) | false,
|
||||||
|
|
||||||
|
%% 4. Starting the application
|
||||||
|
%% If not defined or set to `true`, `application:ensure_all_started/1` is used.
|
||||||
|
%% If custom function is used, it should return list of all applications that were started.
|
||||||
|
%% If set to `false`, application will not be started.
|
||||||
|
%% Merging amounts to redefining.
|
||||||
|
start => fun(() -> {ok, [appname()]}) | fun((appname()) -> {ok, [appname()]}) | boolean(),
|
||||||
|
|
||||||
|
%% 5. Perform anything right after starting the application
|
||||||
|
%% If not defined or set to `false`, this step will be skipped.
|
||||||
|
%% Merging amounts to redefining.
|
||||||
|
after_start => fun(() -> _) | fun((appname()) -> _) | false
|
||||||
|
}.
|
||||||
|
|
||||||
|
%% @doc Start applications with a clean slate.
|
||||||
|
%% Provided appspecs will be merged with defaults defined in `default_appspec/1`.
|
||||||
|
-spec start([appname() | appspec()], SuiteOpts) ->
|
||||||
|
StartedApps :: [appname()]
|
||||||
|
when
|
||||||
|
SuiteOpts :: #{
|
||||||
|
%% Working directory
|
||||||
|
%% Everything a test produces should go here. If this directory is not empty,
|
||||||
|
%% function will raise an error.
|
||||||
|
work_dir := file:name()
|
||||||
|
}.
|
||||||
|
start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
|
||||||
|
% 1. Prepare appspec instructions
|
||||||
|
AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps],
|
||||||
|
% 2. Load every app so that stuff scanning attributes of loaded modules works
|
||||||
|
ok = lists:foreach(fun load_appspec/1, AppSpecs),
|
||||||
|
% 3. Verify that we're running with a clean state.
|
||||||
|
ok = filelib:ensure_dir(filename:join(WorkDir, foo)),
|
||||||
|
ok = verify_clean_suite_state(SuiteOpts),
|
||||||
|
% 4. Setup isolated mnesia directory
|
||||||
|
ok = emqx_common_test_helpers:load(mnesia),
|
||||||
|
ok = application:set_env(mnesia, dir, filename:join([WorkDir, mnesia])),
|
||||||
|
% 5. Start ekka separately.
|
||||||
|
% For some reason it's designed to be started in non-regular way, so we have to track
|
||||||
|
% applications started in the process manually.
|
||||||
|
EkkaSpecs = [{App, proplists:get_value(App, AppSpecs, #{})} || App <- [gen_rpc, mria, ekka]],
|
||||||
|
EkkaApps = start_apps(EkkaSpecs, SuiteOpts),
|
||||||
|
% 6. Start apps following instructions.
|
||||||
|
RestSpecs = [AppSpec || AppSpec <- AppSpecs, not lists:member(AppSpec, EkkaSpecs)],
|
||||||
|
EkkaApps ++ start_appspecs(RestSpecs).
|
||||||
|
|
||||||
|
load_apps(Apps) ->
|
||||||
|
lists:foreach(fun load_appspec/1, [mk_appspec(App, #{}) || App <- Apps]).
|
||||||
|
|
||||||
|
load_appspec({App, _Opts}) ->
|
||||||
|
ok = emqx_common_test_helpers:load(App),
|
||||||
|
load_app_deps(App).
|
||||||
|
|
||||||
|
load_app_deps(App) ->
|
||||||
|
AlreadyLoaded = [A || {A, _, _} <- application:loaded_applications()],
|
||||||
|
case application:get_key(App, applications) of
|
||||||
|
{ok, Deps} ->
|
||||||
|
Apps = Deps -- AlreadyLoaded,
|
||||||
|
ok = lists:foreach(fun emqx_common_test_helpers:load/1, Apps),
|
||||||
|
ok = lists:foreach(fun load_app_deps/1, Apps);
|
||||||
|
undefined ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
start_apps(Apps, SuiteOpts) ->
|
||||||
|
start_appspecs([mk_appspec(App, SuiteOpts) || App <- Apps]).
|
||||||
|
|
||||||
|
start_app(App, StartOpts) ->
|
||||||
|
start_app(App, StartOpts, #{}).
|
||||||
|
|
||||||
|
start_app(App, StartOpts, SuiteOpts) ->
|
||||||
|
start_appspecs([mk_appspec({App, StartOpts}, SuiteOpts)]).
|
||||||
|
|
||||||
|
start_appspecs(AppSpecs) ->
|
||||||
|
lists:flatmap(
|
||||||
|
fun({App, Spec}) -> start_appspec(App, Spec) end,
|
||||||
|
AppSpecs
|
||||||
|
).
|
||||||
|
|
||||||
|
mk_appspec({App, Opts}, SuiteOpts) ->
|
||||||
|
Defaults = default_appspec(App, SuiteOpts),
|
||||||
|
{App, merge_appspec(Defaults, init_spec(Opts))};
|
||||||
|
mk_appspec(App, SuiteOpts) ->
|
||||||
|
Defaults = default_appspec(App, SuiteOpts),
|
||||||
|
{App, Defaults}.
|
||||||
|
|
||||||
|
init_spec(Opts = #{}) ->
|
||||||
|
Opts;
|
||||||
|
init_spec(Config) when is_list(Config); is_binary(Config) ->
|
||||||
|
#{config => [Config, "\n"]}.
|
||||||
|
|
||||||
|
start_appspec(App, StartOpts) ->
|
||||||
|
_ = log_appspec(App, StartOpts),
|
||||||
|
_ = maybe_configure_app(App, StartOpts),
|
||||||
|
_ = maybe_override_env(App, StartOpts),
|
||||||
|
_ = maybe_before_start(App, StartOpts),
|
||||||
|
case maybe_start(App, StartOpts) of
|
||||||
|
{ok, Started} ->
|
||||||
|
?PAL(?STD_IMPORTANCE, "Started applications: ~0p", [Started]),
|
||||||
|
_ = maybe_after_start(App, StartOpts),
|
||||||
|
Started;
|
||||||
|
{error, Reason} ->
|
||||||
|
error({failed_to_start_app, App, Reason})
|
||||||
|
end.
|
||||||
|
|
||||||
|
log_appspec(App, StartOpts) when map_size(StartOpts) > 0 ->
|
||||||
|
Fmt = lists:flatmap(
|
||||||
|
fun(Opt) -> "~n * ~p: " ++ spec_fmt(fc, Opt) end,
|
||||||
|
maps:keys(StartOpts)
|
||||||
|
),
|
||||||
|
Args = lists:flatmap(
|
||||||
|
fun({Opt, V}) -> [Opt, spec_fmt(ffun, {Opt, V})] end,
|
||||||
|
maps:to_list(StartOpts)
|
||||||
|
),
|
||||||
|
?PAL(?STD_IMPORTANCE, "Starting ~p with:" ++ Fmt, [App | Args]);
|
||||||
|
log_appspec(App, #{}) ->
|
||||||
|
?PAL(?STD_IMPORTANCE, "Starting ~p", [App]).
|
||||||
|
|
||||||
|
spec_fmt(fc, config) -> "~n~ts";
|
||||||
|
spec_fmt(fc, _) -> "~p";
|
||||||
|
spec_fmt(ffun, {config, C}) -> render_config(C);
|
||||||
|
spec_fmt(ffun, {_, X}) -> X.
|
||||||
|
|
||||||
|
maybe_configure_app(_App, #{config := false}) ->
|
||||||
|
ok;
|
||||||
|
maybe_configure_app(App, #{config := Config}) ->
|
||||||
|
case app_schema(App) of
|
||||||
|
{ok, SchemaModule} ->
|
||||||
|
configure_app(SchemaModule, Config);
|
||||||
|
{error, Reason} ->
|
||||||
|
error({failed_to_configure_app, App, Reason})
|
||||||
|
end;
|
||||||
|
maybe_configure_app(_App, #{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
configure_app(SchemaModule, Config) ->
|
||||||
|
ok = emqx_config:init_load(SchemaModule, render_config(Config)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
maybe_override_env(App, #{override_env := Env = [{_, _} | _]}) ->
|
||||||
|
ok = application:set_env([{App, Env}]);
|
||||||
|
maybe_override_env(_App, #{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
maybe_before_start(App, #{before_start := Fun}) when is_function(Fun, 1) ->
|
||||||
|
Fun(App);
|
||||||
|
maybe_before_start(_App, #{before_start := Fun}) when is_function(Fun, 0) ->
|
||||||
|
Fun();
|
||||||
|
maybe_before_start(_App, #{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
maybe_start(_App, #{start := false}) ->
|
||||||
|
{ok, []};
|
||||||
|
maybe_start(_App, #{start := Fun}) when is_function(Fun, 0) ->
|
||||||
|
Fun();
|
||||||
|
maybe_start(App, #{start := Fun}) when is_function(Fun, 1) ->
|
||||||
|
Fun(App);
|
||||||
|
maybe_start(App, #{}) ->
|
||||||
|
application:ensure_all_started(App).
|
||||||
|
|
||||||
|
maybe_after_start(App, #{after_start := Fun}) when is_function(Fun, 1) ->
|
||||||
|
Fun(App);
|
||||||
|
maybe_after_start(_App, #{after_start := Fun}) when is_function(Fun, 0) ->
|
||||||
|
Fun();
|
||||||
|
maybe_after_start(_App, #{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec merge_appspec(appspec_opts(), appspec_opts()) ->
|
||||||
|
appspec_opts().
|
||||||
|
merge_appspec(Opts1, Opts2) ->
|
||||||
|
maps:merge_with(
|
||||||
|
fun
|
||||||
|
(config, C1, C2) -> merge_config(C1, C2);
|
||||||
|
(override_env, E1, E2) -> merge_envs(E1, E2);
|
||||||
|
(_Opt, _Val1, Val2) -> Val2
|
||||||
|
end,
|
||||||
|
init_spec(Opts1),
|
||||||
|
init_spec(Opts2)
|
||||||
|
).
|
||||||
|
|
||||||
|
merge_envs(false, E2) ->
|
||||||
|
E2;
|
||||||
|
merge_envs(_E, false) ->
|
||||||
|
[];
|
||||||
|
merge_envs(E1, E2) ->
|
||||||
|
E1 ++ E2.
|
||||||
|
|
||||||
|
merge_config(false, C2) ->
|
||||||
|
C2;
|
||||||
|
merge_config(_C, false) ->
|
||||||
|
false;
|
||||||
|
merge_config(C1, C2) ->
|
||||||
|
[render_config(C1), "\n", render_config(C2)].
|
||||||
|
|
||||||
|
default_appspec(ekka, _SuiteOpts) ->
|
||||||
|
#{
|
||||||
|
start => fun start_ekka/0
|
||||||
|
};
|
||||||
|
default_appspec(emqx, SuiteOpts) ->
|
||||||
|
#{
|
||||||
|
override_env => [{data_dir, maps:get(work_dir, SuiteOpts, "data")}]
|
||||||
|
};
|
||||||
|
default_appspec(emqx_authz, _SuiteOpts) ->
|
||||||
|
#{
|
||||||
|
config => #{
|
||||||
|
% NOTE
|
||||||
|
% Disable default authorization sources (i.e. acl.conf file rules).
|
||||||
|
authorization => #{sources => []}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
default_appspec(emqx_conf, SuiteOpts) ->
|
||||||
|
Config = #{
|
||||||
|
node => #{
|
||||||
|
name => node(),
|
||||||
|
cookie => erlang:get_cookie(),
|
||||||
|
% FIXME
|
||||||
|
data_dir => unicode:characters_to_binary(maps:get(work_dir, SuiteOpts, "data"))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
% NOTE
|
||||||
|
% Since `emqx_conf_schema` manages config for a lot of applications, it's good to include
|
||||||
|
% their defaults as well.
|
||||||
|
SharedConfig = lists:foldl(
|
||||||
|
fun(App, Acc) ->
|
||||||
|
emqx_utils_maps:deep_merge(Acc, default_config(App, SuiteOpts))
|
||||||
|
end,
|
||||||
|
Config,
|
||||||
|
[
|
||||||
|
emqx,
|
||||||
|
emqx_authz
|
||||||
|
]
|
||||||
|
),
|
||||||
|
#{
|
||||||
|
config => SharedConfig,
|
||||||
|
% NOTE
|
||||||
|
% We inform `emqx` of our config loader before starting `emqx_conf` sothat it won't
|
||||||
|
% overwrite everything with a default configuration.
|
||||||
|
before_start => fun() ->
|
||||||
|
emqx_app:set_config_loader(?MODULE)
|
||||||
|
end
|
||||||
|
};
|
||||||
|
default_appspec(emqx_dashboard, _SuiteOpts) ->
|
||||||
|
#{
|
||||||
|
after_start => fun() ->
|
||||||
|
true = emqx_dashboard_listener:is_ready(infinity)
|
||||||
|
end
|
||||||
|
};
|
||||||
|
default_appspec(_, _) ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
|
default_config(App, SuiteOpts) ->
|
||||||
|
maps:get(config, default_appspec(App, SuiteOpts), #{}).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
start_ekka() ->
|
||||||
|
ok = emqx_common_test_helpers:start_ekka(),
|
||||||
|
{ok, [mnesia, ekka]}.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-spec stop(_StartedApps :: [appname()]) ->
|
||||||
|
ok.
|
||||||
|
stop(Apps) ->
|
||||||
|
ok = stop_apps(Apps),
|
||||||
|
clean_suite_state().
|
||||||
|
|
||||||
|
-spec stop_apps(_StartedApps :: [appname()]) ->
|
||||||
|
ok.
|
||||||
|
stop_apps(Apps) ->
|
||||||
|
ok = lists:foreach(fun application:stop/1, lists:reverse(Apps)),
|
||||||
|
ok = lists:foreach(fun application:unload/1, Apps).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
verify_clean_suite_state(#{work_dir := WorkDir}) ->
|
||||||
|
{ok, []} = file:list_dir(WorkDir),
|
||||||
|
none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none),
|
||||||
|
[] = emqx_config:get_root_names(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
clean_suite_state() ->
|
||||||
|
_ = persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY),
|
||||||
|
_ = emqx_config:erase_all(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
app_schema(App) ->
|
||||||
|
Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
|
||||||
|
try is_list(Mod:roots()) of
|
||||||
|
true -> {ok, Mod};
|
||||||
|
false -> {error, schema_no_roots}
|
||||||
|
catch
|
||||||
|
error:undef ->
|
||||||
|
{error, schema_not_found}
|
||||||
|
end.
|
||||||
|
|
||||||
|
render_config(Config = #{}) ->
|
||||||
|
unicode:characters_to_binary(hocon_pp:do(Config, #{}));
|
||||||
|
render_config(Config) ->
|
||||||
|
unicode:characters_to_binary(Config).
|
|
@ -1114,7 +1114,7 @@ setup_node(Node, Port) ->
|
||||||
%% We load configuration, and than set the special enviroment variable
|
%% We load configuration, and than set the special enviroment variable
|
||||||
%% which says that emqx shouldn't load configuration at startup
|
%% which says that emqx shouldn't load configuration at startup
|
||||||
emqx_config:init_load(emqx_schema),
|
emqx_config:init_load(emqx_schema),
|
||||||
application:set_env(emqx, init_config_load_done, true),
|
emqx_app:set_config_loader(?MODULE),
|
||||||
|
|
||||||
ok = emqx_config:put([listeners, tcp, default, bind], {{127, 0, 0, 1}, Port}),
|
ok = emqx_config:put([listeners, tcp, default, bind], {{127, 0, 0, 1}, Port}),
|
||||||
ok = emqx_config:put([listeners, ssl, default, bind], {{127, 0, 0, 1}, Port + 1}),
|
ok = emqx_config:put([listeners, ssl, default, bind], {{127, 0, 0, 1}, Port + 1}),
|
||||||
|
|
|
@ -39,33 +39,21 @@ groups() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
|
||||||
[emqx_conf, emqx_authz],
|
|
||||||
fun set_special_configs/1
|
|
||||||
),
|
|
||||||
%% meck after authz started
|
|
||||||
meck:expect(
|
|
||||||
emqx_authz,
|
|
||||||
acl_conf_file,
|
|
||||||
fun() ->
|
|
||||||
emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf")
|
|
||||||
end
|
|
||||||
),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = emqx_authz_test_lib:restore_authorizers(),
|
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authz]).
|
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
|
||||||
ok = emqx_authz_test_lib:reset_authorizers(),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
set_special_configs(emqx_authz) ->
|
|
||||||
ok = emqx_authz_test_lib:reset_authorizers();
|
|
||||||
set_special_configs(_) ->
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(TestCase, Config) ->
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[{emqx_conf, "authorization.no_match = deny"}, emqx_authz],
|
||||||
|
#{work_dir => filename:join(?config(priv_dir, Config), TestCase)}
|
||||||
|
),
|
||||||
|
[{tc_apps, Apps} | Config].
|
||||||
|
|
||||||
|
end_per_testcase(_TestCase, Config) ->
|
||||||
|
emqx_cth_suite:stop(?config(tc_apps, Config)).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -24,8 +24,6 @@
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("snabbkaffe/include/test_macros.hrl").
|
-include_lib("snabbkaffe/include/test_macros.hrl").
|
||||||
|
|
||||||
-define(SUITE_APPS, [emqx_conf, emqx_authn, emqx_management, emqx_rule_engine, emqx_bridge]).
|
|
||||||
|
|
||||||
-define(BRIDGE_TYPE_HTTP, <<"webhook">>).
|
-define(BRIDGE_TYPE_HTTP, <<"webhook">>).
|
||||||
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
|
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
|
||||||
-define(URL(PORT, PATH),
|
-define(URL(PORT, PATH),
|
||||||
|
@ -74,6 +72,19 @@
|
||||||
}).
|
}).
|
||||||
-define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
|
-define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
|
||||||
|
|
||||||
|
-define(APPSPECS, [
|
||||||
|
emqx_conf,
|
||||||
|
emqx,
|
||||||
|
emqx_authn,
|
||||||
|
emqx_management,
|
||||||
|
{emqx_rule_engine, "rule_engine { rules {} }"},
|
||||||
|
{emqx_bridge, "bridges {}"}
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(APPSPEC_DASHBOARD,
|
||||||
|
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||||
|
).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[
|
[
|
||||||
{group, single},
|
{group, single},
|
||||||
|
@ -104,105 +115,43 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_group(cluster, Config) ->
|
init_per_group(cluster = Name, Config) ->
|
||||||
Cluster = mk_cluster_specs(Config),
|
Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
|
||||||
ct:pal("Starting ~p", [Cluster]),
|
init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
|
||||||
Nodes = [
|
init_per_group(cluster_later_join = Name, Config) ->
|
||||||
emqx_common_test_helpers:start_slave(Name, Opts)
|
Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
|
||||||
|| {Name, Opts} <- Cluster
|
init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
|
||||||
],
|
init_per_group(Name, Config) ->
|
||||||
[NodePrimary | NodesRest] = Nodes,
|
WorkDir = filename:join(?config(priv_dir, Config), Name),
|
||||||
ok = erpc:call(NodePrimary, fun() -> init_node(primary) end),
|
Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
|
||||||
_ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest],
|
init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]).
|
||||||
[{group, cluster}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config];
|
|
||||||
init_per_group(cluster_later_join, Config) ->
|
|
||||||
Cluster = mk_cluster_specs(Config, #{join_to => undefined}),
|
|
||||||
ct:pal("Starting ~p", [Cluster]),
|
|
||||||
Nodes = [
|
|
||||||
emqx_common_test_helpers:start_slave(Name, Opts)
|
|
||||||
|| {Name, Opts} <- Cluster
|
|
||||||
],
|
|
||||||
[NodePrimary | NodesRest] = Nodes,
|
|
||||||
ok = erpc:call(NodePrimary, fun() -> init_node(primary) end),
|
|
||||||
_ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest],
|
|
||||||
[{group, cluster_later_join}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config];
|
|
||||||
init_per_group(_, Config) ->
|
|
||||||
ok = emqx_mgmt_api_test_util:init_suite(?SUITE_APPS),
|
|
||||||
ok = load_suite_config(emqx_rule_engine),
|
|
||||||
ok = load_suite_config(emqx_bridge),
|
|
||||||
[{group, single}, {api_node, node()} | Config].
|
|
||||||
|
|
||||||
mk_cluster_specs(Config) ->
|
init_api(Config) ->
|
||||||
mk_cluster_specs(Config, #{}).
|
APINode = ?config(node, Config),
|
||||||
|
{ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []),
|
||||||
|
[{api, App} | Config].
|
||||||
|
|
||||||
mk_cluster_specs(Config, Opts) ->
|
mk_cluster(Name, Config) ->
|
||||||
Specs = [
|
mk_cluster(Name, Config, #{}).
|
||||||
{core, emqx_bridge_api_SUITE1, #{}},
|
|
||||||
{core, emqx_bridge_api_SUITE2, #{}}
|
|
||||||
],
|
|
||||||
CommonOpts = Opts#{
|
|
||||||
env => [{emqx, boot_modules, [broker]}],
|
|
||||||
apps => [],
|
|
||||||
% NOTE
|
|
||||||
% We need to start all those apps _after_ the cluster becomes stable, in the
|
|
||||||
% `init_node/1`. This is because usual order is broken in very subtle way:
|
|
||||||
% 1. Node starts apps including `mria` and `emqx_conf` which starts `emqx_cluster_rpc`.
|
|
||||||
% 2. The `emqx_cluster_rpc` sets up a mnesia table subscription during initialization.
|
|
||||||
% 3. In the meantime `mria` joins the cluster and notices it should restart.
|
|
||||||
% 4. Mnesia subscription becomes lost during restarts (god knows why).
|
|
||||||
% Yet we need to load them before, so that mria / mnesia will know which tables
|
|
||||||
% should be created in the cluster.
|
|
||||||
% TODO
|
|
||||||
% We probably should hide these intricacies behind the `emqx_common_test_helpers`.
|
|
||||||
load_apps => ?SUITE_APPS ++ [emqx_dashboard],
|
|
||||||
env_handler => fun load_suite_config/1,
|
|
||||||
load_schema => false,
|
|
||||||
priv_data_dir => ?config(priv_dir, Config)
|
|
||||||
},
|
|
||||||
emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts).
|
|
||||||
|
|
||||||
init_node(Type) ->
|
mk_cluster(Name, Config, Opts) ->
|
||||||
ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, fun load_suite_config/1),
|
Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD],
|
||||||
case Type of
|
Node2Apps = ?APPSPECS,
|
||||||
primary ->
|
emqx_cth_cluster:start(
|
||||||
ok = emqx_dashboard_desc_cache:init(),
|
[
|
||||||
ok = emqx_config:put(
|
{emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}},
|
||||||
[dashboard, listeners],
|
{emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}}
|
||||||
#{http => #{bind => 18083, proxy_header => false}}
|
],
|
||||||
),
|
#{work_dir => filename:join(?config(priv_dir, Config), Name)}
|
||||||
ok = emqx_dashboard:start_listeners(),
|
).
|
||||||
ready = emqx_dashboard_listener:regenerate_minirest_dispatch(),
|
|
||||||
emqx_common_test_http:create_default_app();
|
|
||||||
regular ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
load_suite_config(emqx_rule_engine) ->
|
|
||||||
ok = emqx_common_test_helpers:load_config(
|
|
||||||
emqx_rule_engine_schema,
|
|
||||||
<<"rule_engine { rules {} }">>
|
|
||||||
);
|
|
||||||
load_suite_config(emqx_bridge) ->
|
|
||||||
ok = emqx_common_test_helpers:load_config(
|
|
||||||
emqx_bridge_schema,
|
|
||||||
<<"bridges {}">>
|
|
||||||
);
|
|
||||||
load_suite_config(_) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
end_per_group(Group, Config) when
|
end_per_group(Group, Config) when
|
||||||
Group =:= cluster;
|
Group =:= cluster;
|
||||||
Group =:= cluster_later_join
|
Group =:= cluster_later_join
|
||||||
->
|
->
|
||||||
ok = lists:foreach(
|
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
|
||||||
fun(Node) ->
|
end_per_group(_, Config) ->
|
||||||
_ = erpc:call(Node, emqx_common_test_helpers, stop_apps, [?SUITE_APPS]),
|
emqx_cth_suite:stop(?config(group_apps, Config)),
|
||||||
emqx_common_test_helpers:stop_slave(Node)
|
|
||||||
end,
|
|
||||||
?config(cluster_nodes, Config)
|
|
||||||
);
|
|
||||||
end_per_group(_, _Config) ->
|
|
||||||
emqx_mgmt_api_test_util:end_suite(?SUITE_APPS),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(t_broken_bpapi_vsn, Config) ->
|
init_per_testcase(t_broken_bpapi_vsn, Config) ->
|
||||||
|
@ -228,7 +177,7 @@ end_per_testcase(t_old_bpapi_vsn, Config) ->
|
||||||
end_per_testcase(_, Config) ->
|
end_per_testcase(_, Config) ->
|
||||||
Sock = ?config(sock, Config),
|
Sock = ?config(sock, Config),
|
||||||
Acceptor = ?config(acceptor, Config),
|
Acceptor = ?config(acceptor, Config),
|
||||||
Node = ?config(api_node, Config),
|
Node = ?config(node, Config),
|
||||||
ok = emqx_common_test_helpers:call_janitor(),
|
ok = emqx_common_test_helpers:call_janitor(),
|
||||||
ok = stop_http_server(Sock, Acceptor),
|
ok = stop_http_server(Sock, Acceptor),
|
||||||
ok = erpc:call(Node, fun clear_resources/0),
|
ok = erpc:call(Node, fun clear_resources/0),
|
||||||
|
@ -900,7 +849,7 @@ t_start_stop_inconsistent_bridge_cluster(Config) ->
|
||||||
start_stop_inconsistent_bridge(Type, Config) ->
|
start_stop_inconsistent_bridge(Type, Config) ->
|
||||||
Port = ?config(port, Config),
|
Port = ?config(port, Config),
|
||||||
URL = ?URL(Port, "abc"),
|
URL = ?URL(Port, "abc"),
|
||||||
Node = ?config(api_node, Config),
|
Node = ?config(node, Config),
|
||||||
|
|
||||||
erpc:call(Node, fun() ->
|
erpc:call(Node, fun() ->
|
||||||
meck:new(emqx_bridge_resource, [passthrough, no_link]),
|
meck:new(emqx_bridge_resource, [passthrough, no_link]),
|
||||||
|
@ -1351,9 +1300,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
|
||||||
|
|
||||||
t_cluster_later_join_metrics(Config) ->
|
t_cluster_later_join_metrics(Config) ->
|
||||||
Port = ?config(port, Config),
|
Port = ?config(port, Config),
|
||||||
APINode = ?config(api_node, Config),
|
[PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config),
|
||||||
ClusterNodes = ?config(cluster_nodes, Config),
|
|
||||||
[OtherNode | _] = ClusterNodes -- [APINode],
|
|
||||||
URL1 = ?URL(Port, "path1"),
|
URL1 = ?URL(Port, "path1"),
|
||||||
Name = ?BRIDGE_NAME,
|
Name = ?BRIDGE_NAME,
|
||||||
BridgeParams = ?HTTP_BRIDGE(URL1, Name),
|
BridgeParams = ?HTTP_BRIDGE(URL1, Name),
|
||||||
|
@ -1371,7 +1318,7 @@ t_cluster_later_join_metrics(Config) ->
|
||||||
request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
|
request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
|
||||||
),
|
),
|
||||||
%% Now join the other node join with the api node.
|
%% Now join the other node join with the api node.
|
||||||
ok = erpc:call(OtherNode, ekka, join, [APINode]),
|
ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
|
||||||
%% Check metrics; shouldn't crash even if the bridge is not
|
%% Check metrics; shouldn't crash even if the bridge is not
|
||||||
%% ready on the node that just joined the cluster.
|
%% ready on the node that just joined the cluster.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
@ -1419,8 +1366,9 @@ request(Method, {operation, Type, Op, BridgeID}, Body, Config) ->
|
||||||
URL = operation_path(Type, Op, BridgeID, Config),
|
URL = operation_path(Type, Op, BridgeID, Config),
|
||||||
request(Method, URL, Body, Config);
|
request(Method, URL, Body, Config);
|
||||||
request(Method, URL, Body, Config) ->
|
request(Method, URL, Body, Config) ->
|
||||||
|
AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)),
|
||||||
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
|
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
|
||||||
emqx_mgmt_api_test_util:request_api(Method, URL, [], auth_header(Config), Body, Opts).
|
emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts).
|
||||||
|
|
||||||
request(Method, URL, Body, Decoder, Config) ->
|
request(Method, URL, Body, Decoder, Config) ->
|
||||||
case request(Method, URL, Body, Config) of
|
case request(Method, URL, Body, Config) of
|
||||||
|
@ -1436,11 +1384,8 @@ request_json(Method, URLLike, Config) ->
|
||||||
request_json(Method, URLLike, Body, Config) ->
|
request_json(Method, URLLike, Body, Config) ->
|
||||||
request(Method, URLLike, Body, fun json/1, Config).
|
request(Method, URLLike, Body, fun json/1, Config).
|
||||||
|
|
||||||
auth_header(Config) ->
|
|
||||||
erpc:call(?config(api_node, Config), emqx_common_test_http, default_auth_header, []).
|
|
||||||
|
|
||||||
operation_path(node, Oper, BridgeID, Config) ->
|
operation_path(node, Oper, BridgeID, Config) ->
|
||||||
uri(["nodes", ?config(api_node, Config), "bridges", BridgeID, Oper]);
|
uri(["nodes", ?config(node, Config), "bridges", BridgeID, Oper]);
|
||||||
operation_path(cluster, Oper, BridgeID, _Config) ->
|
operation_path(cluster, Oper, BridgeID, _Config) ->
|
||||||
uri(["bridges", BridgeID, Oper]).
|
uri(["bridges", BridgeID, Oper]).
|
||||||
|
|
||||||
|
@ -1448,23 +1393,23 @@ enable_path(Enable, BridgeID) ->
|
||||||
uri(["bridges", BridgeID, "enable", Enable]).
|
uri(["bridges", BridgeID, "enable", Enable]).
|
||||||
|
|
||||||
publish_message(Topic, Body, Config) ->
|
publish_message(Topic, Body, Config) ->
|
||||||
Node = ?config(api_node, Config),
|
Node = ?config(node, Config),
|
||||||
erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]).
|
erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]).
|
||||||
|
|
||||||
update_config(Path, Value, Config) ->
|
update_config(Path, Value, Config) ->
|
||||||
Node = ?config(api_node, Config),
|
Node = ?config(node, Config),
|
||||||
erpc:call(Node, emqx, update_config, [Path, Value]).
|
erpc:call(Node, emqx, update_config, [Path, Value]).
|
||||||
|
|
||||||
get_raw_config(Path, Config) ->
|
get_raw_config(Path, Config) ->
|
||||||
Node = ?config(api_node, Config),
|
Node = ?config(node, Config),
|
||||||
erpc:call(Node, emqx, get_raw_config, [Path]).
|
erpc:call(Node, emqx, get_raw_config, [Path]).
|
||||||
|
|
||||||
add_user_auth(Chain, AuthenticatorID, User, Config) ->
|
add_user_auth(Chain, AuthenticatorID, User, Config) ->
|
||||||
Node = ?config(api_node, Config),
|
Node = ?config(node, Config),
|
||||||
erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]).
|
erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]).
|
||||||
|
|
||||||
delete_user_auth(Chain, AuthenticatorID, User, Config) ->
|
delete_user_auth(Chain, AuthenticatorID, User, Config) ->
|
||||||
Node = ?config(api_node, Config),
|
Node = ?config(node, Config),
|
||||||
erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]).
|
erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]).
|
||||||
|
|
||||||
str(S) when is_list(S) -> S;
|
str(S) when is_list(S) -> S;
|
||||||
|
|
|
@ -22,6 +22,9 @@
|
||||||
-export([get_override_config_file/0]).
|
-export([get_override_config_file/0]).
|
||||||
-export([sync_data_from_node/0]).
|
-export([sync_data_from_node/0]).
|
||||||
|
|
||||||
|
%% Test purposes
|
||||||
|
-export([init_load_done/0]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include("emqx_conf.hrl").
|
-include("emqx_conf.hrl").
|
||||||
|
|
||||||
|
@ -46,7 +49,7 @@ stop(_State) ->
|
||||||
%% This function is named 'override' due to historical reasons.
|
%% This function is named 'override' due to historical reasons.
|
||||||
get_override_config_file() ->
|
get_override_config_file() ->
|
||||||
Node = node(),
|
Node = node(),
|
||||||
case emqx_app:get_init_config_load_done() of
|
case init_load_done() of
|
||||||
false ->
|
false ->
|
||||||
{error, #{node => Node, msg => "init_conf_load_not_done"}};
|
{error, #{node => Node, msg => "init_conf_load_not_done"}};
|
||||||
true ->
|
true ->
|
||||||
|
@ -91,7 +94,22 @@ sync_data_from_node() ->
|
||||||
%% ------------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------------
|
||||||
|
|
||||||
init_load() ->
|
init_load() ->
|
||||||
emqx_config:init_load(emqx_conf:schema_module()).
|
case emqx_app:get_config_loader() of
|
||||||
|
Module when Module == emqx; Module == emqx_conf ->
|
||||||
|
ok = emqx_config:init_load(emqx_conf:schema_module()),
|
||||||
|
ok = emqx_app:set_config_loader(emqx_conf),
|
||||||
|
ok;
|
||||||
|
Module ->
|
||||||
|
?SLOG(debug, #{
|
||||||
|
msg => "skip_init_config_load",
|
||||||
|
reason => "Some application has set another config loader",
|
||||||
|
loader => Module
|
||||||
|
})
|
||||||
|
end.
|
||||||
|
|
||||||
|
init_load_done() ->
|
||||||
|
% NOTE: Either us or some higher level (i.e. tests) code loaded config.
|
||||||
|
emqx_app:get_config_loader() =/= emqx.
|
||||||
|
|
||||||
init_conf() ->
|
init_conf() ->
|
||||||
%% Workaround for https://github.com/emqx/mria/issues/94:
|
%% Workaround for https://github.com/emqx/mria/issues/94:
|
||||||
|
@ -99,8 +117,7 @@ init_conf() ->
|
||||||
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
|
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
|
||||||
{ok, TnxId} = sync_cluster_conf(),
|
{ok, TnxId} = sync_cluster_conf(),
|
||||||
_ = emqx_app:set_init_tnx_id(TnxId),
|
_ = emqx_app:set_init_tnx_id(TnxId),
|
||||||
ok = init_load(),
|
ok = init_load().
|
||||||
ok = emqx_app:set_init_config_load_done().
|
|
||||||
|
|
||||||
cluster_nodes() ->
|
cluster_nodes() ->
|
||||||
mria:cluster_nodes(cores) -- [node()].
|
mria:cluster_nodes(cores) -- [node()].
|
||||||
|
|
|
@ -215,7 +215,7 @@ assert_no_cluster_conf_copied([Node | Nodes], File) ->
|
||||||
assert_config_load_done(Nodes) ->
|
assert_config_load_done(Nodes) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Node) ->
|
fun(Node) ->
|
||||||
Done = rpc:call(Node, emqx_app, get_init_config_load_done, []),
|
Done = rpc:call(Node, emqx_conf_app, init_load_done, []),
|
||||||
?assert(Done, #{node => Node})
|
?assert(Done, #{node => Node})
|
||||||
end,
|
end,
|
||||||
Nodes
|
Nodes
|
||||||
|
@ -240,7 +240,6 @@ start_cluster_async(Specs) ->
|
||||||
cluster(Specs, Config) ->
|
cluster(Specs, Config) ->
|
||||||
PrivDataDir = ?config(priv_dir, Config),
|
PrivDataDir = ?config(priv_dir, Config),
|
||||||
Env = [
|
Env = [
|
||||||
{emqx, init_config_load_done, false},
|
|
||||||
{emqx, boot_modules, []}
|
{emqx, boot_modules, []}
|
||||||
],
|
],
|
||||||
emqx_common_test_helpers:emqx_cluster(Specs, [
|
emqx_common_test_helpers:emqx_cluster(Specs, [
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
kernel,
|
kernel,
|
||||||
stdlib,
|
stdlib,
|
||||||
gproc,
|
gproc,
|
||||||
|
emqx,
|
||||||
emqx_s3
|
emqx_s3
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -65,31 +65,25 @@ suite() ->
|
||||||
[{timetrap, {seconds, 90}}].
|
[{timetrap, {seconds, 90}}].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)),
|
% NOTE
|
||||||
Config.
|
% Inhibit local fs GC to simulate it isn't fast enough to collect
|
||||||
|
% complete transfers.
|
||||||
|
Storage = emqx_utils_maps:deep_merge(
|
||||||
|
emqx_ft_test_helpers:local_storage(Config),
|
||||||
|
#{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
|
||||||
|
),
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[
|
||||||
|
{emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}}
|
||||||
|
],
|
||||||
|
#{work_dir => ?config(priv_dir, Config)}
|
||||||
|
),
|
||||||
|
[{suite_apps, Apps} | Config].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
|
ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
set_special_configs(Config) ->
|
|
||||||
fun
|
|
||||||
(emqx_ft) ->
|
|
||||||
% NOTE
|
|
||||||
% Inhibit local fs GC to simulate it isn't fast enough to collect
|
|
||||||
% complete transfers.
|
|
||||||
Storage = emqx_utils_maps:deep_merge(
|
|
||||||
emqx_ft_test_helpers:local_storage(Config),
|
|
||||||
#{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => <<"0s">>}}}}
|
|
||||||
),
|
|
||||||
emqx_ft_test_helpers:load_config(#{
|
|
||||||
<<"enable">> => true,
|
|
||||||
<<"storage">> => Storage
|
|
||||||
});
|
|
||||||
(_) ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
init_per_testcase(Case, Config) ->
|
init_per_testcase(Case, Config) ->
|
||||||
ClientId = atom_to_binary(Case),
|
ClientId = atom_to_binary(Case),
|
||||||
case ?config(group, Config) of
|
case ?config(group, Config) of
|
||||||
|
@ -105,39 +99,32 @@ end_per_testcase(_Case, Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_group(Group = cluster, Config) ->
|
init_per_group(Group = cluster, Config) ->
|
||||||
|
WorkDir = ?config(priv_dir, Config),
|
||||||
Cluster = mk_cluster_specs(Config),
|
Cluster = mk_cluster_specs(Config),
|
||||||
ct:pal("Starting ~p", [Cluster]),
|
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
|
||||||
Nodes = [
|
|
||||||
emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()})
|
|
||||||
|| {Name, Opts} <- Cluster
|
|
||||||
],
|
|
||||||
[{group, Group}, {cluster_nodes, Nodes} | Config];
|
[{group, Group}, {cluster_nodes, Nodes} | Config];
|
||||||
init_per_group(Group, Config) ->
|
init_per_group(Group, Config) ->
|
||||||
[{group, Group} | Config].
|
[{group, Group} | Config].
|
||||||
|
|
||||||
end_per_group(cluster, Config) ->
|
end_per_group(cluster, Config) ->
|
||||||
ok = lists:foreach(
|
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
|
||||||
fun emqx_ft_test_helpers:stop_additional_node/1,
|
|
||||||
?config(cluster_nodes, Config)
|
|
||||||
);
|
|
||||||
end_per_group(_Group, _Config) ->
|
end_per_group(_Group, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
mk_cluster_specs(Config) ->
|
mk_cluster_specs(_Config) ->
|
||||||
Specs = [
|
CommonOpts = #{
|
||||||
{core, emqx_ft_SUITE1, #{listener_ports => [{tcp, 2883}]}},
|
role => core,
|
||||||
{core, emqx_ft_SUITE2, #{listener_ports => [{tcp, 3883}]}}
|
join_to => node(),
|
||||||
],
|
apps => [
|
||||||
CommOpts = [
|
{emqx_conf, #{start => false}},
|
||||||
{env, [{emqx, boot_modules, [broker, listeners]}]},
|
{emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
|
||||||
{apps, [emqx_ft]},
|
{emqx_ft, "file_transfer { enable = true }"}
|
||||||
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
|
]
|
||||||
{env_handler, set_special_configs(Config)}
|
},
|
||||||
],
|
[
|
||||||
emqx_common_test_helpers:emqx_cluster(
|
{emqx_ft_SUITE1, CommonOpts},
|
||||||
Specs,
|
{emqx_ft_SUITE2, CommonOpts}
|
||||||
CommOpts
|
].
|
||||||
).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Tests
|
%% Tests
|
||||||
|
|
|
@ -24,8 +24,6 @@
|
||||||
|
|
||||||
-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
|
-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
|
||||||
|
|
||||||
-define(SUITE_APPS, [emqx_conf, emqx_ft]).
|
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[
|
[
|
||||||
{group, single},
|
{group, single},
|
||||||
|
@ -35,62 +33,76 @@ all() ->
|
||||||
groups() ->
|
groups() ->
|
||||||
[
|
[
|
||||||
{single, [], emqx_common_test_helpers:all(?MODULE)},
|
{single, [], emqx_common_test_helpers:all(?MODULE)},
|
||||||
{cluster, [], emqx_common_test_helpers:all(?MODULE)}
|
{cluster, [], emqx_common_test_helpers:all(?MODULE) -- [t_ft_disabled]}
|
||||||
].
|
].
|
||||||
|
|
||||||
suite() ->
|
suite() ->
|
||||||
[{timetrap, {seconds, 90}}].
|
[{timetrap, {seconds, 90}}].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_mgmt_api_test_util:init_suite(
|
|
||||||
[emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
|
|
||||||
),
|
|
||||||
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
init_per_group(Group = single, Config) ->
|
||||||
|
WorkDir = ?config(priv_dir, Config),
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[
|
||||||
|
{emqx, #{}},
|
||||||
|
{emqx_ft, "file_transfer { enable = true }"},
|
||||||
|
{emqx_management, #{}},
|
||||||
|
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||||
|
],
|
||||||
|
#{work_dir => WorkDir}
|
||||||
|
),
|
||||||
|
{ok, App} = emqx_common_test_http:create_default_app(),
|
||||||
|
[{group, Group}, {group_apps, Apps}, {api, App} | Config];
|
||||||
init_per_group(Group = cluster, Config) ->
|
init_per_group(Group = cluster, Config) ->
|
||||||
|
WorkDir = ?config(priv_dir, Config),
|
||||||
Cluster = mk_cluster_specs(Config),
|
Cluster = mk_cluster_specs(Config),
|
||||||
ct:pal("Starting ~p", [Cluster]),
|
Nodes = [Node1 | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
|
||||||
Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
|
{ok, App} = erpc:call(Node1, emqx_common_test_http, create_default_app, []),
|
||||||
InitResult = erpc:multicall(Nodes, fun() -> init_node(Config) end),
|
[{group, Group}, {cluster_nodes, Nodes}, {api, App} | Config].
|
||||||
[] = [{Node, Error} || {Node, {R, Error}} <- lists:zip(Nodes, InitResult), R /= ok],
|
|
||||||
[{group, Group}, {cluster_nodes, Nodes} | Config];
|
|
||||||
init_per_group(Group, Config) ->
|
|
||||||
[{group, Group} | Config].
|
|
||||||
|
|
||||||
|
end_per_group(single, Config) ->
|
||||||
|
{ok, _} = emqx_common_test_http:delete_default_app(),
|
||||||
|
ok = emqx_cth_suite:stop(?config(group_apps, Config));
|
||||||
end_per_group(cluster, Config) ->
|
end_per_group(cluster, Config) ->
|
||||||
ok = lists:foreach(
|
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
|
||||||
fun emqx_ft_test_helpers:stop_additional_node/1,
|
|
||||||
?config(cluster_nodes, Config)
|
|
||||||
);
|
|
||||||
end_per_group(_Group, _Config) ->
|
end_per_group(_Group, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
mk_cluster_specs(_Config) ->
|
mk_cluster_specs(_Config) ->
|
||||||
Specs = [
|
Apps = [
|
||||||
{core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}},
|
{emqx_conf, #{start => false}},
|
||||||
{core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}},
|
{emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
|
||||||
{replicant, emqx_ft_api_SUITE3, #{listener_ports => [{tcp, 4883}]}}
|
{emqx_ft, "file_transfer { enable = true }"},
|
||||||
|
{emqx_management, #{}}
|
||||||
],
|
],
|
||||||
CommOpts = #{
|
DashboardConfig =
|
||||||
env => [
|
"dashboard { \n"
|
||||||
{mria, db_backend, rlog},
|
" listeners.http { enable = true, bind = 0 } \n"
|
||||||
{emqx, boot_modules, [broker, listeners]}
|
" default_username = \"\" \n"
|
||||||
],
|
" default_password = \"\" \n"
|
||||||
apps => [],
|
"}\n",
|
||||||
load_apps => ?SUITE_APPS,
|
[
|
||||||
conf => [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]
|
{emqx_ft_api_SUITE1, #{
|
||||||
},
|
role => core,
|
||||||
emqx_common_test_helpers:emqx_cluster(
|
apps => Apps ++
|
||||||
Specs,
|
[
|
||||||
CommOpts
|
{emqx_dashboard, DashboardConfig ++ "dashboard.listeners.http.bind = 18083"}
|
||||||
).
|
]
|
||||||
|
}},
|
||||||
init_node(Config) ->
|
{emqx_ft_api_SUITE2, #{
|
||||||
ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, emqx_ft_test_helpers:env_handler(Config)).
|
role => core,
|
||||||
|
apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
|
||||||
|
}},
|
||||||
|
{emqx_ft_api_SUITE3, #{
|
||||||
|
role => replicant,
|
||||||
|
apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
|
||||||
|
}}
|
||||||
|
].
|
||||||
|
|
||||||
init_per_testcase(Case, Config) ->
|
init_per_testcase(Case, Config) ->
|
||||||
[{tc, Case} | Config].
|
[{tc, Case} | Config].
|
||||||
|
@ -111,7 +123,7 @@ t_list_files(Config) ->
|
||||||
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
|
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
|
||||||
|
|
||||||
{ok, 200, #{<<"files">> := Files}} =
|
{ok, 200, #{<<"files">> := Files}} =
|
||||||
request_json(get, uri(["file_transfer", "files"])),
|
request_json(get, uri(["file_transfer", "files"]), Config),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
|
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
|
||||||
|
@ -119,7 +131,7 @@ t_list_files(Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 200, #{<<"files">> := FilesTransfer}} =
|
{ok, 200, #{<<"files">> := FilesTransfer}} =
|
||||||
request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
|
request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
|
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
|
||||||
|
@ -128,21 +140,23 @@ t_list_files(Config) ->
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
|
{ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
|
||||||
request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]))
|
request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]), Config)
|
||||||
).
|
).
|
||||||
|
|
||||||
t_download_transfer(Config) ->
|
t_download_transfer(Config) ->
|
||||||
ClientId = client_id(Config),
|
ClientId = client_id(Config),
|
||||||
FileId = <<"f1">>,
|
FileId = <<"f1">>,
|
||||||
|
|
||||||
Node = lists:last(test_nodes(Config)),
|
Nodes = [Node | _] = test_nodes(Config),
|
||||||
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
|
NodeUpload = lists:last(Nodes),
|
||||||
|
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, NodeUpload),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||||
request_json(
|
request_json(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "file"]) ++ query(#{fileref => FileId})
|
uri(["file_transfer", "file"]) ++ query(#{fileref => FileId}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -151,7 +165,8 @@ t_download_transfer(Config) ->
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "file"]) ++
|
uri(["file_transfer", "file"]) ++
|
||||||
query(#{fileref => FileId, node => <<"nonode@nohost">>})
|
query(#{fileref => FileId, node => <<"nonode@nohost">>}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -160,7 +175,8 @@ t_download_transfer(Config) ->
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "file"]) ++
|
uri(["file_transfer", "file"]) ++
|
||||||
query(#{fileref => <<"unknown_file">>, node => node()})
|
query(#{fileref => <<"unknown_file">>, node => Node}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -169,7 +185,8 @@ t_download_transfer(Config) ->
|
||||||
request_json(
|
request_json(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "file"]) ++
|
uri(["file_transfer", "file"]) ++
|
||||||
query(#{fileref => <<>>, node => node()})
|
query(#{fileref => <<>>, node => Node}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -178,14 +195,15 @@ t_download_transfer(Config) ->
|
||||||
request_json(
|
request_json(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "file"]) ++
|
uri(["file_transfer", "file"]) ++
|
||||||
query(#{fileref => <<"/etc/passwd">>, node => node()})
|
query(#{fileref => <<"/etc/passwd">>, node => Node}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 200, #{<<"files">> := [File]}} =
|
{ok, 200, #{<<"files">> := [File]}} =
|
||||||
request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
|
request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
|
||||||
|
|
||||||
{ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)),
|
{ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File), Config),
|
||||||
|
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
<<"data">>,
|
<<"data">>,
|
||||||
|
@ -209,44 +227,47 @@ t_list_files_paging(Config) ->
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
|
{ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
|
||||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}))
|
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}), Config)
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 200, #{<<"files">> := Files}} =
|
{ok, 200, #{<<"files">> := Files}} =
|
||||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})),
|
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config),
|
||||||
|
|
||||||
?assert(length(Files) >= NFiles),
|
?assert(length(Files) >= NFiles),
|
||||||
|
|
||||||
?assertNotMatch(
|
?assertNotMatch(
|
||||||
{ok, 200, #{<<"cursor">> := _}},
|
{ok, 200, #{<<"cursor">> := _}},
|
||||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}))
|
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config)
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}))
|
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}), Config)
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}))
|
request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}), Config)
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}))
|
request_json(
|
||||||
|
get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}), Config
|
||||||
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||||
request_json(
|
request_json(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>})
|
uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
PageThrough = fun PageThrough(Query, Acc) ->
|
PageThrough = fun PageThrough(Query, Acc) ->
|
||||||
case request_json(get, uri(["file_transfer", "files"]) ++ query(Query)) of
|
case request_json(get, uri(["file_transfer", "files"]) ++ query(Query), Config) of
|
||||||
{ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
|
{ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
|
||||||
PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
|
PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
|
||||||
{ok, 200, #{<<"files">> := FilesPage}} ->
|
{ok, 200, #{<<"files">> := FilesPage}} ->
|
||||||
|
@ -258,17 +279,18 @@ t_list_files_paging(Config) ->
|
||||||
?assertEqual(Files, PageThrough(#{limit => 8}, [])),
|
?assertEqual(Files, PageThrough(#{limit => 8}, [])),
|
||||||
?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
|
?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
|
||||||
|
|
||||||
t_ft_disabled(_Config) ->
|
t_ft_disabled(Config) ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 200, _},
|
{ok, 200, _},
|
||||||
request_json(get, uri(["file_transfer", "files"]))
|
request_json(get, uri(["file_transfer", "files"]), Config)
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 400, _},
|
{ok, 400, _},
|
||||||
request_json(
|
request_json(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>})
|
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -276,14 +298,15 @@ t_ft_disabled(_Config) ->
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 503, _},
|
{ok, 503, _},
|
||||||
request_json(get, uri(["file_transfer", "files"]))
|
request_json(get, uri(["file_transfer", "files"]), Config)
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 503, _},
|
{ok, 503, _},
|
||||||
request_json(
|
request_json(
|
||||||
get,
|
get,
|
||||||
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()})
|
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()}),
|
||||||
|
Config
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -308,11 +331,12 @@ mk_file_id(Prefix, N) ->
|
||||||
mk_file_name(N) ->
|
mk_file_name(N) ->
|
||||||
"file." ++ integer_to_list(N).
|
"file." ++ integer_to_list(N).
|
||||||
|
|
||||||
request(Method, Url) ->
|
request(Method, Url, Config) ->
|
||||||
emqx_mgmt_api_test_util:request(Method, Url, []).
|
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
|
||||||
|
emqx_mgmt_api_test_util:request_api(Method, Url, [], auth_header(Config), [], Opts).
|
||||||
|
|
||||||
request_json(Method, Url) ->
|
request_json(Method, Url, Config) ->
|
||||||
case emqx_mgmt_api_test_util:request(Method, Url, []) of
|
case request(Method, Url, Config) of
|
||||||
{ok, Code, Body} ->
|
{ok, Code, Body} ->
|
||||||
{ok, Code, json(Body)};
|
{ok, Code, json(Body)};
|
||||||
Otherwise ->
|
Otherwise ->
|
||||||
|
@ -326,6 +350,10 @@ query(Params) ->
|
||||||
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
|
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
|
||||||
"?" ++ string:join(KVs, "&").
|
"?" ++ string:join(KVs, "&").
|
||||||
|
|
||||||
|
auth_header(Config) ->
|
||||||
|
#{api_key := ApiKey, api_secret := Secret} = ?config(api, Config),
|
||||||
|
emqx_common_test_http:auth_header(binary_to_list(ApiKey), binary_to_list(Secret)).
|
||||||
|
|
||||||
uri_encode(T) ->
|
uri_encode(T) ->
|
||||||
emqx_http_lib:uri_encode(to_list(T)).
|
emqx_http_lib:uri_encode(to_list(T)).
|
||||||
|
|
||||||
|
|
|
@ -36,11 +36,11 @@ all() ->
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
Apps = application:ensure_all_started(gproc),
|
{ok, Apps} = application:ensure_all_started(gproc),
|
||||||
[{suite_apps, Apps} | Config].
|
[{suite_apps, Apps} | Config].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
ok.
|
emqx_cth_suite:stop_apps(?config(suite_apps, Config)).
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
|
|
|
@ -31,22 +31,20 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(_Case, Config) ->
|
init_per_testcase(Case, Config) ->
|
||||||
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
|
WorkDir = filename:join(?config(priv_dir, Config), Case),
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
Apps = emqx_cth_suite:start(
|
||||||
[emqx_conf, emqx_ft], fun
|
[
|
||||||
(emqx_ft) ->
|
{emqx_conf, #{}},
|
||||||
emqx_ft_test_helpers:load_config(#{});
|
{emqx_ft, #{config => "file_transfer {}"}}
|
||||||
(_) ->
|
],
|
||||||
ok
|
#{work_dir => WorkDir}
|
||||||
end
|
|
||||||
),
|
),
|
||||||
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
|
[{suite_apps, Apps} | Config].
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_testcase(_Case, _Config) ->
|
end_per_testcase(_Case, Config) ->
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
|
ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
|
||||||
ok = emqx_config:erase(file_transfer).
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Tests
|
%% Tests
|
||||||
|
|
|
@ -22,45 +22,42 @@
|
||||||
-include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl").
|
-include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl").
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
-include_lib("snabbkaffe/include/test_macros.hrl").
|
-include_lib("snabbkaffe/include/test_macros.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_ft),
|
Apps = emqx_cth_suite:start([emqx], #{work_dir => ?config(priv_dir, Config)}),
|
||||||
ok = emqx_common_test_helpers:start_apps([]),
|
[{suite_apps, Apps} | Config].
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
ok = emqx_common_test_helpers:stop_apps([]),
|
ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]),
|
SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]),
|
||||||
ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]),
|
ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]),
|
||||||
ok = emqx_common_test_helpers:start_app(
|
Started = emqx_cth_suite:start_app(
|
||||||
emqx_ft,
|
emqx_ft,
|
||||||
fun(emqx_ft) ->
|
#{
|
||||||
emqx_ft_test_helpers:load_config(#{
|
config => emqx_ft_test_helpers:config(#{
|
||||||
<<"enable">> => true,
|
<<"local">> => #{
|
||||||
<<"storage">> => #{
|
<<"enable">> => true,
|
||||||
<<"local">> => #{
|
<<"segments">> => #{<<"root">> => SegmentsRoot},
|
||||||
<<"enable">> => true,
|
<<"exporter">> => #{
|
||||||
<<"segments">> => #{<<"root">> => SegmentsRoot},
|
<<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot}
|
||||||
<<"exporter">> => #{
|
|
||||||
<<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
end
|
}
|
||||||
),
|
),
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
Config.
|
[{tc_apps, Started} | Config].
|
||||||
|
|
||||||
end_per_testcase(_TC, _Config) ->
|
end_per_testcase(_TC, Config) ->
|
||||||
ok = snabbkaffe:stop(),
|
ok = snabbkaffe:stop(),
|
||||||
ok = application:stop(emqx_ft),
|
ok = emqx_cth_suite:stop_apps(?config(tc_apps, Config)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
|
@ -49,6 +49,9 @@ env_handler(Config) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
config(Storage) ->
|
||||||
|
#{<<"file_transfer">> => #{<<"enable">> => true, <<"storage">> => Storage}}.
|
||||||
|
|
||||||
local_storage(Config) ->
|
local_storage(Config) ->
|
||||||
local_storage(Config, #{exporter => local}).
|
local_storage(Config, #{exporter => local}).
|
||||||
|
|
||||||
|
|
|
@ -200,7 +200,7 @@ t_api_listeners_list_not_ready(Config) when is_list(Config) ->
|
||||||
L1 = get_tcp_listeners(Node1),
|
L1 = get_tcp_listeners(Node1),
|
||||||
|
|
||||||
%% test init_config not ready.
|
%% test init_config not ready.
|
||||||
_ = rpc:call(Node1, application, set_env, [emqx, init_config_load_done, false]),
|
_ = rpc:call(Node1, emqx_app, set_config_loader, [emqx]),
|
||||||
assert_config_load_not_done(Node1),
|
assert_config_load_not_done(Node1),
|
||||||
|
|
||||||
L2 = get_tcp_listeners(Node1),
|
L2 = get_tcp_listeners(Node1),
|
||||||
|
@ -283,12 +283,11 @@ get_tcp_listeners(Node) ->
|
||||||
NodeStatus.
|
NodeStatus.
|
||||||
|
|
||||||
assert_config_load_not_done(Node) ->
|
assert_config_load_not_done(Node) ->
|
||||||
Done = rpc:call(Node, emqx_app, get_init_config_load_done, []),
|
Prio = rpc:call(Node, emqx_app, get_config_loader, []),
|
||||||
?assertNot(Done, #{node => Node}).
|
?assertEqual(emqx, Prio, #{node => Node}).
|
||||||
|
|
||||||
cluster(Specs) ->
|
cluster(Specs) ->
|
||||||
Env = [
|
Env = [
|
||||||
{emqx, init_config_load_done, false},
|
|
||||||
{emqx, boot_modules, []}
|
{emqx, boot_modules, []}
|
||||||
],
|
],
|
||||||
emqx_common_test_helpers:emqx_cluster(Specs, [
|
emqx_common_test_helpers:emqx_cluster(Specs, [
|
||||||
|
@ -299,7 +298,7 @@ cluster(Specs) ->
|
||||||
(emqx) ->
|
(emqx) ->
|
||||||
application:set_env(emqx, boot_modules, []),
|
application:set_env(emqx, boot_modules, []),
|
||||||
%% test init_config not ready.
|
%% test init_config not ready.
|
||||||
application:set_env(emqx, init_config_load_done, false),
|
emqx_app:set_config_loader(emqx),
|
||||||
ok;
|
ok;
|
||||||
(_) ->
|
(_) ->
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -32,7 +32,8 @@ init_suite(Apps, SetConfigs) when is_function(SetConfigs) ->
|
||||||
init_suite(Apps, SetConfigs, Opts) ->
|
init_suite(Apps, SetConfigs, Opts) ->
|
||||||
application:load(emqx_management),
|
application:load(emqx_management),
|
||||||
emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], SetConfigs, Opts),
|
emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], SetConfigs, Opts),
|
||||||
emqx_common_test_http:create_default_app().
|
_ = emqx_common_test_http:create_default_app(),
|
||||||
|
ok.
|
||||||
|
|
||||||
end_suite() ->
|
end_suite() ->
|
||||||
end_suite([]).
|
end_suite([]).
|
||||||
|
|
|
@ -28,13 +28,12 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
[application:load(App) || App <- apps_to_start() ++ apps_to_load()],
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(t_import_on_cluster, Config) ->
|
init_per_testcase(TC = t_import_on_cluster, Config0) ->
|
||||||
%% Don't import listeners to avoid port conflicts
|
%% Don't import listeners to avoid port conflicts
|
||||||
%% when the same conf will be imported to another cluster
|
%% when the same conf will be imported to another cluster
|
||||||
meck:new(emqx_mgmt_listeners_conf, [passthrough]),
|
meck:new(emqx_mgmt_listeners_conf, [passthrough]),
|
||||||
|
@ -51,23 +50,25 @@ init_per_testcase(t_import_on_cluster, Config) ->
|
||||||
1,
|
1,
|
||||||
{ok, #{changed => [], root_key => gateway}}
|
{ok, #{changed => [], root_key => gateway}}
|
||||||
),
|
),
|
||||||
|
Config = [{tc_name, TC} | Config0],
|
||||||
[{cluster, cluster(Config)} | setup(Config)];
|
[{cluster, cluster(Config)} | setup(Config)];
|
||||||
init_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
|
init_per_testcase(TC = t_verify_imported_mnesia_tab_on_cluster, Config0) ->
|
||||||
|
Config = [{tc_name, TC} | Config0],
|
||||||
[{cluster, cluster(Config)} | setup(Config)];
|
[{cluster, cluster(Config)} | setup(Config)];
|
||||||
init_per_testcase(t_mnesia_bad_tab_schema, Config) ->
|
init_per_testcase(t_mnesia_bad_tab_schema, Config) ->
|
||||||
meck:new(emqx_mgmt_data_backup, [passthrough]),
|
meck:new(emqx_mgmt_data_backup, [passthrough]),
|
||||||
meck:expect(emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]),
|
meck:expect(TC = emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]),
|
||||||
setup(Config);
|
setup([{tc_name, TC} | Config]);
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
setup(Config).
|
setup([{tc_name, TC} | Config]).
|
||||||
|
|
||||||
end_per_testcase(t_import_on_cluster, Config) ->
|
end_per_testcase(t_import_on_cluster, Config) ->
|
||||||
cleanup_cluster(?config(cluster, Config)),
|
emqx_cth_cluster:stop(?config(cluster, Config)),
|
||||||
cleanup(Config),
|
cleanup(Config),
|
||||||
meck:unload(emqx_mgmt_listeners_conf),
|
meck:unload(emqx_mgmt_listeners_conf),
|
||||||
meck:unload(emqx_gateway_conf);
|
meck:unload(emqx_gateway_conf);
|
||||||
end_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
|
end_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
|
||||||
cleanup_cluster(?config(cluster, Config)),
|
emqx_cth_cluster:stop(?config(cluster, Config)),
|
||||||
cleanup(Config);
|
cleanup(Config);
|
||||||
end_per_testcase(t_mnesia_bad_tab_schema, Config) ->
|
end_per_testcase(t_mnesia_bad_tab_schema, Config) ->
|
||||||
cleanup(Config),
|
cleanup(Config),
|
||||||
|
@ -356,8 +357,6 @@ t_mnesia_bad_tab_schema(_Config) ->
|
||||||
|
|
||||||
t_read_files(_Config) ->
|
t_read_files(_Config) ->
|
||||||
DataDir = emqx:data_dir(),
|
DataDir = emqx:data_dir(),
|
||||||
%% Relative "data" path is set in init_per_testcase/2, asserting it must be safe
|
|
||||||
?assertEqual("data", DataDir),
|
|
||||||
{ok, Cwd} = file:get_cwd(),
|
{ok, Cwd} = file:get_cwd(),
|
||||||
AbsDataDir = filename:join(Cwd, DataDir),
|
AbsDataDir = filename:join(Cwd, DataDir),
|
||||||
FileBaseName = "t_read_files_tmp_file",
|
FileBaseName = "t_read_files_tmp_file",
|
||||||
|
@ -388,30 +387,12 @@ t_read_files(_Config) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
setup(Config) ->
|
setup(Config) ->
|
||||||
%% avoid port conflicts if the cluster is started
|
WorkDir = filename:join(work_dir(Config), local),
|
||||||
AppHandler = fun
|
Started = emqx_cth_suite:start(apps_to_start(), #{work_dir => WorkDir}),
|
||||||
(emqx_dashboard) ->
|
[{suite_apps, Started} | Config].
|
||||||
ok = emqx_config:put([dashboard, listeners, http, bind], 0);
|
|
||||||
(_) ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
ok = emqx_common_test_helpers:start_apps(apps_to_start(), AppHandler),
|
|
||||||
PrevDataDir = application:get_env(emqx, data_dir),
|
|
||||||
application:set_env(emqx, data_dir, "data"),
|
|
||||||
[{previous_emqx_data_dir, PrevDataDir} | Config].
|
|
||||||
|
|
||||||
cleanup(Config) ->
|
cleanup(Config) ->
|
||||||
emqx_common_test_helpers:stop_apps(apps_to_start()),
|
emqx_cth_suite:stop(?config(suite_apps, Config)).
|
||||||
case ?config(previous_emqx_data_dir, Config) of
|
|
||||||
undefined ->
|
|
||||||
application:unset_env(emqx, data_dir);
|
|
||||||
{ok, Val} ->
|
|
||||||
application:set_env(emqx, data_dir, Val)
|
|
||||||
end.
|
|
||||||
|
|
||||||
cleanup_cluster(ClusterNodes) ->
|
|
||||||
[rpc:call(N, ekka, leave, []) || N <- lists:reverse(ClusterNodes)],
|
|
||||||
[emqx_common_test_helpers:stop_slave(N) || N <- ClusterNodes].
|
|
||||||
|
|
||||||
users(Prefix) ->
|
users(Prefix) ->
|
||||||
[
|
[
|
||||||
|
@ -428,50 +409,18 @@ recompose_version(MajorInt, MinorInt, Patch) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
cluster(Config) ->
|
cluster(Config) ->
|
||||||
PrivDataDir = ?config(priv_dir, Config),
|
Nodes = emqx_cth_cluster:start(
|
||||||
[{Core1, Core1Opts}, {Core2, Core2Opts}, {Replicant, ReplOpts}] =
|
[
|
||||||
emqx_common_test_helpers:emqx_cluster(
|
{data_backup_core1, #{role => core, apps => apps_to_start()}},
|
||||||
[
|
{data_backup_core2, #{role => core, apps => apps_to_start()}},
|
||||||
{core, data_backup_core1},
|
{data_backup_replicant, #{role => replicant, apps => apps_to_start()}}
|
||||||
{core, data_backup_core2},
|
],
|
||||||
{replicant, data_backup_replicant}
|
#{work_dir => work_dir(Config)}
|
||||||
],
|
|
||||||
#{
|
|
||||||
priv_data_dir => PrivDataDir,
|
|
||||||
schema_mod => emqx_conf_schema,
|
|
||||||
apps => apps_to_start(),
|
|
||||||
load_apps => apps_to_start() ++ apps_to_load(),
|
|
||||||
env => [{mria, db_backend, rlog}],
|
|
||||||
load_schema => true,
|
|
||||||
start_autocluster => true,
|
|
||||||
listener_ports => [],
|
|
||||||
conf => [{[dashboard, listeners, http, bind], 0}],
|
|
||||||
env_handler =>
|
|
||||||
fun(_) ->
|
|
||||||
application:set_env(emqx, boot_modules, [broker, router])
|
|
||||||
end
|
|
||||||
}
|
|
||||||
),
|
|
||||||
Node1 = emqx_common_test_helpers:start_slave(Core1, Core1Opts),
|
|
||||||
Node2 = emqx_common_test_helpers:start_slave(Core2, Core2Opts),
|
|
||||||
#{conf := _ReplConf, env := ReplEnv} = ReplOpts,
|
|
||||||
ClusterDiscovery = {static, [{seeds, [Node1, Node2]}]},
|
|
||||||
ReplOpts1 = maps:remove(
|
|
||||||
join_to,
|
|
||||||
ReplOpts#{
|
|
||||||
env => [{ekka, cluster_discovery, ClusterDiscovery} | ReplEnv],
|
|
||||||
env_handler => fun(_) ->
|
|
||||||
application:set_env(emqx, boot_modules, [broker, router]),
|
|
||||||
application:set_env(
|
|
||||||
ekka,
|
|
||||||
cluster_discovery,
|
|
||||||
ClusterDiscovery
|
|
||||||
)
|
|
||||||
end
|
|
||||||
}
|
|
||||||
),
|
),
|
||||||
ReplNode = emqx_common_test_helpers:start_slave(Replicant, ReplOpts1),
|
Nodes.
|
||||||
[Node1, Node2, ReplNode].
|
|
||||||
|
work_dir(Config) ->
|
||||||
|
filename:join(?config(priv_dir, Config), ?config(tc_name, Config)).
|
||||||
|
|
||||||
create_test_tab(Attributes) ->
|
create_test_tab(Attributes) ->
|
||||||
ok = mria:create_table(data_backup_test, [
|
ok = mria:create_table(data_backup_test, [
|
||||||
|
@ -491,8 +440,8 @@ create_test_tab(Attributes) ->
|
||||||
|
|
||||||
apps_to_start() ->
|
apps_to_start() ->
|
||||||
[
|
[
|
||||||
emqx,
|
{emqx_conf, "dashboard.listeners.http.bind = 0"},
|
||||||
emqx_conf,
|
{emqx, #{override_env => [{boot_modules, [broker, router]}]}},
|
||||||
emqx_psk,
|
emqx_psk,
|
||||||
emqx_management,
|
emqx_management,
|
||||||
emqx_dashboard,
|
emqx_dashboard,
|
||||||
|
@ -505,11 +454,9 @@ apps_to_start() ->
|
||||||
emqx_gateway,
|
emqx_gateway,
|
||||||
emqx_exhook,
|
emqx_exhook,
|
||||||
emqx_bridge,
|
emqx_bridge,
|
||||||
emqx_auto_subscribe
|
emqx_auto_subscribe,
|
||||||
].
|
|
||||||
|
|
||||||
apps_to_load() ->
|
% loaded only
|
||||||
[
|
|
||||||
emqx_gateway_lwm2m,
|
emqx_gateway_lwm2m,
|
||||||
emqx_gateway_coap,
|
emqx_gateway_coap,
|
||||||
emqx_gateway_exproto,
|
emqx_gateway_exproto,
|
||||||
|
|
|
@ -523,7 +523,6 @@ group_t_copy_plugin_to_a_new_node({init, Config}) ->
|
||||||
#{
|
#{
|
||||||
apps => [emqx_conf, emqx_plugins],
|
apps => [emqx_conf, emqx_plugins],
|
||||||
env => [
|
env => [
|
||||||
{emqx, init_config_load_done, false},
|
|
||||||
{emqx, boot_modules, []}
|
{emqx, boot_modules, []}
|
||||||
],
|
],
|
||||||
load_schema => false
|
load_schema => false
|
||||||
|
@ -621,7 +620,6 @@ group_t_copy_plugin_to_a_new_node_single_node({init, Config}) ->
|
||||||
#{
|
#{
|
||||||
apps => [emqx_conf, emqx_plugins],
|
apps => [emqx_conf, emqx_plugins],
|
||||||
env => [
|
env => [
|
||||||
{emqx, init_config_load_done, false},
|
|
||||||
{emqx, boot_modules, []}
|
{emqx, boot_modules, []}
|
||||||
],
|
],
|
||||||
env_handler => fun
|
env_handler => fun
|
||||||
|
@ -690,7 +688,6 @@ group_t_cluster_leave({init, Config}) ->
|
||||||
#{
|
#{
|
||||||
apps => [emqx_conf, emqx_plugins],
|
apps => [emqx_conf, emqx_plugins],
|
||||||
env => [
|
env => [
|
||||||
{emqx, init_config_load_done, false},
|
|
||||||
{emqx, boot_modules, []}
|
{emqx, boot_modules, []}
|
||||||
],
|
],
|
||||||
env_handler => fun
|
env_handler => fun
|
||||||
|
|
5
mix.exs
5
mix.exs
|
@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
# in conflict by emqtt and hocon
|
# in conflict by emqtt and hocon
|
||||||
{:getopt, "1.0.2", override: true},
|
{:getopt, "1.0.2", override: true},
|
||||||
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
|
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
|
||||||
{:hocon, github: "emqx/hocon", tag: "0.39.10", override: true},
|
{:hocon, github: "emqx/hocon", tag: "0.39.11", override: true},
|
||||||
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
|
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
|
||||||
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
|
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
|
||||||
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
|
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
|
||||||
|
@ -314,7 +314,8 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
:emqx_prometheus,
|
:emqx_prometheus,
|
||||||
:emqx_auto_subscribe,
|
:emqx_auto_subscribe,
|
||||||
:emqx_slow_subs,
|
:emqx_slow_subs,
|
||||||
:emqx_plugins
|
:emqx_plugins,
|
||||||
|
:emqx_ft
|
||||||
],
|
],
|
||||||
steps: steps,
|
steps: steps,
|
||||||
strip_beams: false
|
strip_beams: false
|
||||||
|
|
|
@ -75,7 +75,7 @@
|
||||||
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
||||||
, {getopt, "1.0.2"}
|
, {getopt, "1.0.2"}
|
||||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
|
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}}
|
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.11"}}}
|
||||||
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
|
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
|
||||||
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
||||||
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
|
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
|
||||||
|
|
Loading…
Reference in New Issue