Merge pull request #12508 from keynslug/fix/explicit-tab-mgmt

feat: manage mria tables explicitly during apps startup
This commit is contained in:
Andrew Mayorov 2024-02-16 11:11:59 +01:00 committed by GitHub
commit a945892c52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
60 changed files with 652 additions and 960 deletions

View File

@ -21,12 +21,9 @@
-include("emqx.hrl").
-include("logger.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([create_tables/0]).
-export([start_link/0]).
%% API
-export([
activate/1,
@ -86,7 +83,7 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(
?ACTIVATED_ALARM,
[
@ -106,7 +103,8 @@ mnesia(boot) ->
{record_name, deactivated_alarm},
{attributes, record_info(fields, deactivated_alarm)}
]
).
),
[?ACTIVATED_ALARM, ?DEACTIVATED_ALARM].
%%--------------------------------------------------------------------
%% API

View File

@ -25,9 +25,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([create_tables/0]).
-export([start_link/0, stop/0]).
@ -79,7 +77,7 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
Options = [
{type, set},
{rlog_shard, ?COMMON_SHARD},
@ -89,7 +87,8 @@ mnesia(boot) ->
{storage_properties, [{ets, [{read_concurrency, true}]}]}
],
ok = mria:create_table(?BANNED_INDIVIDUAL_TAB, Options),
ok = mria:create_table(?BANNED_RULE_TAB, Options).
ok = mria:create_table(?BANNED_RULE_TAB, Options),
[?BANNED_INDIVIDUAL_TAB, ?BANNED_RULE_TAB].
%%--------------------------------------------------------------------
%% Data backup

View File

@ -23,6 +23,10 @@
-export([init/1]).
start_link() ->
ok = mria:wait_for_tables(
emqx_shared_sub:create_tables() ++
emqx_exclusive_subscription:create_tables()
),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%--------------------------------------------------------------------

View File

@ -32,6 +32,7 @@
%%--------------------------------------------------------------------
start_link() ->
ok = mria:wait_for_tables(emqx_banned:create_tables()),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%--------------------------------------------------------------------

View File

@ -22,14 +22,11 @@
-logger_header("[exclusive]").
%% Mnesia bootstrap
-export([mnesia/1]).
-export([create_tables/0]).
%% For upgrade
-export([on_add_module/0, on_delete_module/0]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([
check_subscribe/2,
unsubscribe/2,
@ -53,7 +50,7 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
StoreProps = [
{ets, [
{read_concurrency, true},
@ -68,14 +65,14 @@ mnesia(boot) ->
{attributes, record_info(fields, exclusive_subscription)},
{storage_properties, StoreProps}
]),
ok = mria_rlog:wait_for_shards([?EXCLUSIVE_SHARD], infinity).
[?TAB].
%%--------------------------------------------------------------------
%% Upgrade
%%--------------------------------------------------------------------
on_add_module() ->
mnesia(boot).
mria:wait_for_tables(create_tables()).
on_delete_module() ->
clear().

View File

@ -24,9 +24,7 @@
-include_lib("emqx/include/emqx_router.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([create_tables/0]).
-export([start_link/2]).
@ -123,7 +121,7 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
mria_config:set_dirty_shard(?ROUTE_SHARD, true),
ok = mria:create_table(?ROUTE_TAB, [
{type, bag},
@ -151,7 +149,8 @@ mnesia(boot) ->
{decentralized_counters, true}
]}
]}
]).
]),
[?ROUTE_TAB, ?ROUTE_TAB_FILTERS].
%%--------------------------------------------------------------------
%% Start a router

View File

@ -25,9 +25,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([create_tables/0]).
%% API
-export([
@ -63,7 +61,7 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(?ROUTING_NODE, [
{type, set},
{rlog_shard, ?ROUTE_SHARD},
@ -71,7 +69,8 @@ mnesia(boot) ->
{record_name, routing_node},
{attributes, record_info(fields, routing_node)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
]),
[?ROUTING_NODE].
%%--------------------------------------------------------------------
%% API

View File

@ -24,6 +24,11 @@
start_link() ->
%% Init and log routing table type
ok = mria:wait_for_tables(
emqx_trie:create_trie() ++
emqx_router:create_tables() ++
emqx_router_helper:create_tables()
),
ok = emqx_router:init_schema(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

View File

@ -25,9 +25,7 @@
-include("types.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([create_tables/0]).
%% APIs
-export([start_link/0]).
@ -107,14 +105,15 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(?TAB, [
{type, bag},
{rlog_shard, ?SHARED_SUB_SHARD},
{storage, ram_copies},
{record_name, emqx_shared_subscription},
{attributes, record_info(fields, emqx_shared_subscription)}
]).
]),
[?TAB].
%%--------------------------------------------------------------------
%% API

View File

@ -22,6 +22,7 @@
-export([init/1]).
start_link() ->
_ = mria:wait_for_tables(emqx_alarm:create_tables()),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->

View File

@ -20,13 +20,11 @@
%% Mnesia bootstrap
-export([
mnesia/1,
create_trie/0,
wait_for_tables/0,
create_session_trie/1
]).
-boot_mnesia({mnesia, [boot]}).
%% Trie APIs
-export([
insert/1,
@ -65,8 +63,8 @@
%%--------------------------------------------------------------------
%% @doc Create or replicate topics table.
-spec mnesia(boot | copy) -> ok.
mnesia(boot) ->
-spec create_trie() -> [mria:table()].
create_trie() ->
%% Optimize storage
StoreProps = [
{ets, [
@ -80,7 +78,8 @@ mnesia(boot) ->
{attributes, record_info(fields, ?TRIE)},
{type, ordered_set},
{storage_properties, StoreProps}
]).
]),
[?TRIE].
create_session_trie(Type) ->
Storage =

View File

@ -93,15 +93,22 @@ default_auth_header() ->
create_default_app() ->
Now = erlang:system_time(second),
ExpiredAt = Now + timer:minutes(10),
emqx_mgmt_auth:create(
?DEFAULT_APP_ID,
?DEFAULT_APP_KEY,
?DEFAULT_APP_SECRET,
true,
ExpiredAt,
<<"default app key for test">>,
?ROLE_API_SUPERUSER
).
case
emqx_mgmt_auth:create(
?DEFAULT_APP_ID,
?DEFAULT_APP_KEY,
?DEFAULT_APP_SECRET,
true,
ExpiredAt,
<<"default app key for test">>,
?ROLE_API_SUPERUSER
)
of
{ok, App} ->
{ok, App};
{error, name_already_existed} ->
{ok, _} = emqx_mgmt_auth:read(?DEFAULT_APP_ID)
end.
delete_default_app() ->
emqx_mgmt_auth:delete(?DEFAULT_APP_ID).

View File

@ -38,7 +38,7 @@
%% in `end_per_suite/1` or `end_per_group/2`) with the result from step 2.
-module(emqx_cth_cluster).
-export([start/1, start/2, restart/2]).
-export([start/1, start/2, restart/1, restart/2]).
-export([stop/1, stop_node/1]).
-export([start_bare_nodes/1, start_bare_nodes/2]).
@ -162,6 +162,9 @@ wait_clustered([Node | Nodes] = All, Check, Deadline) ->
wait_clustered(All, Check, Deadline)
end.
restart(NodeSpec) ->
restart(maps:get(name, NodeSpec), NodeSpec).
restart(Node, Spec) ->
ct:pal("Stopping peer node ~p", [Node]),
ok = emqx_cth_peer:stop(Node),

View File

@ -177,10 +177,9 @@ load_appspec({App, _Opts}) ->
load_app_deps(App).
load_app_deps(App) ->
AlreadyLoaded = [A || {A, _, _} <- application:loaded_applications()],
case application:get_key(App, applications) of
{ok, Deps} ->
Apps = Deps -- AlreadyLoaded,
Apps = [D || D <- Deps, application:get_key(D, id) == undefined],
ok = lists:foreach(fun emqx_common_test_helpers:load/1, Apps),
ok = lists:foreach(fun load_app_deps/1, Apps);
undefined ->
@ -471,9 +470,12 @@ clean_suite_state() ->
app_schema(App) ->
Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
try is_list(Mod:roots()) of
true -> {ok, Mod};
false -> {error, schema_no_roots}
try
Exports = Mod:module_info(exports),
case lists:member({roots, 0}, Exports) of
true -> {ok, Mod};
false -> {error, schema_no_roots}
end
catch
error:undef ->
{error, schema_not_found}

View File

@ -21,6 +21,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
[
@ -43,14 +44,14 @@ end_per_group(_, _) ->
ok.
init_per_suite(Config) ->
application:load(emqx),
ok = ekka:start(),
Config.
Apps = emqx_cth_suite:start(
[{emqx, #{override_env => [{boot_modules, [broker]}]}}],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema().
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(_TestCase, Config) ->
clear_tables(),
@ -185,7 +186,8 @@ t_delete3(_) ->
?assertEqual([], ?TRIE:match(<<"sensor">>)),
?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)).
clear_tables() -> emqx_trie:clear_tables().
clear_tables() ->
emqx_trie:clear_tables().
trans(Fun) ->
mria:transaction(?ROUTE_SHARD, Fun).

View File

@ -25,6 +25,7 @@
start(_StartType, _StartArgs) ->
ok = emqx_authz_mnesia:init_tables(),
ok = emqx_authn_mnesia:init_tables(),
ok = emqx_authn_scram_mnesia:init_tables(),
ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_mnesia),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SIMPLE, emqx_authn_mnesia),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_mnesia),

View File

@ -55,7 +55,7 @@
do_update_user/3
]).
-export([mnesia/1, init_tables/0]).
-export([init_tables/0]).
-export([backup_tables/0]).
@ -69,8 +69,6 @@
is_superuser :: boolean()
}).
-boot_mnesia({mnesia, [boot]}).
-define(TAB, ?MODULE).
-define(AUTHN_QSCHEMA, [
{<<"like_user_id">>, binary},
@ -83,8 +81,8 @@
%%------------------------------------------------------------------------------
%% @doc Create or replicate tables.
-spec mnesia(boot | copy) -> ok.
mnesia(boot) ->
-spec create_tables() -> [mria:table()].
create_tables() ->
ok = mria:create_table(?TAB, [
{rlog_shard, ?AUTHN_SHARD},
{type, ordered_set},
@ -92,12 +90,13 @@ mnesia(boot) ->
{record_name, user_info},
{attributes, record_info(fields, user_info)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
]),
[?TAB].
%% Init
-spec init_tables() -> ok.
init_tables() ->
ok = mria_rlog:wait_for_shards([?AUTHN_SHARD], infinity).
ok = mria:wait_for_tables(create_tables()).
%%------------------------------------------------------------------------------
%% Data backup

View File

@ -65,9 +65,7 @@
-type user_group() :: binary().
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([init_tables/0]).
-record(user_info, {
user_id,
@ -84,8 +82,8 @@
%%------------------------------------------------------------------------------
%% @doc Create or replicate tables.
-spec mnesia(boot | copy) -> ok.
mnesia(boot) ->
-spec create_tables() -> [mria:table()].
create_tables() ->
ok = mria:create_table(?TAB, [
{rlog_shard, ?AUTHN_SHARD},
{type, ordered_set},
@ -93,7 +91,12 @@ mnesia(boot) ->
{record_name, user_info},
{attributes, record_info(fields, user_info)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
]),
[?TAB].
-spec init_tables() -> ok.
init_tables() ->
mria:wait_for_tables(create_tables()).
%%------------------------------------------------------------------------------
%% Data backup

View File

@ -56,7 +56,6 @@
%% Management API
-export([
mnesia/1,
init_tables/0,
store_rules/2,
purge_rules/0,
@ -74,17 +73,16 @@
-compile(nowarn_export_all).
-endif.
-boot_mnesia({mnesia, [boot]}).
-spec mnesia(boot | copy) -> ok.
mnesia(boot) ->
-spec create_tables() -> [mria:table()].
create_tables() ->
ok = mria:create_table(?ACL_TABLE, [
{type, ordered_set},
{rlog_shard, ?ACL_SHARDED},
{storage, disc_copies},
{attributes, record_info(fields, ?ACL_TABLE)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
]),
[?ACL_TABLE].
%%--------------------------------------------------------------------
%% emqx_authz callbacks
@ -138,7 +136,7 @@ backup_tables() -> [?ACL_TABLE].
%% Init
-spec init_tables() -> ok.
init_tables() ->
ok = mria_rlog:wait_for_shards([?ACL_SHARDED], infinity).
ok = mria:wait_for_tables(create_tables()).
%% @doc Update authz rules
-spec store_rules(who(), rules()) -> ok.

View File

@ -17,7 +17,7 @@
-behaviour(gen_server).
%% API
-export([start_link/0, mnesia/1]).
-export([start_link/0, create_tables/0]).
%% Note: multicall functions are statically checked by
%% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't
@ -65,8 +65,6 @@
-export_type([tnx_id/0, succeed_num/0]).
-boot_mnesia({mnesia, [boot]}).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_conf.hrl").
@ -99,7 +97,8 @@
%%%===================================================================
%%% API
%%%===================================================================
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(?CLUSTER_MFA, [
{type, ordered_set},
{rlog_shard, ?CLUSTER_RPC_SHARD},
@ -113,7 +112,11 @@ mnesia(boot) ->
{storage, disc_copies},
{record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}
]).
]),
[
?CLUSTER_MFA,
?CLUSTER_COMMIT
].
start_link() ->
start_link(node(), ?MODULE, get_retry_ms()).

View File

@ -3,7 +3,7 @@
{vsn, "0.1.34"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]},
{applications, [kernel, stdlib]},
{env, []},
{modules, []}
]}.

View File

@ -27,6 +27,7 @@
-include("emqx_conf.hrl").
start(_StartType, _StartArgs) ->
ok = mria:wait_for_tables(emqx_cluster_rpc:create_tables()),
try
ok = init_conf()
catch

View File

@ -42,10 +42,8 @@ suite() -> [{timetrap, {minutes, 5}}].
groups() -> [].
init_per_suite(Config) ->
application:load(emqx_conf),
ok = ekka:start(),
ok = emqx_common_test_helpers:start_apps([]),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
ok = mria:wait_for_tables(emqx_cluster_rpc:create_tables()),
ok = emqx_config:put([node, cluster_call, retry_interval], 1000),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
@ -56,10 +54,6 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([]),
ekka:stop(),
mria:stop(),
meck:unload(mria),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
ok.

View File

@ -27,9 +27,10 @@ all() ->
t_copy_conf_override_on_restarts(Config) ->
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = cluster(
[cluster_spec({core, 1}), cluster_spec({core, 2}), cluster_spec({core, 3})], Config
?FUNCTION_NAME,
[cluster_spec({core, 1}), cluster_spec({core, 2}), cluster_spec({core, 3})],
Config
),
%% 1. Start all nodes
@ -42,7 +43,7 @@ t_copy_conf_override_on_restarts(Config) ->
%% 3. Restart nodes in the same order. This should not
%% crash and eventually all nodes should be ready.
start_cluster_async(Cluster),
restart_cluster_async(Cluster),
timer:sleep(15000),
@ -54,11 +55,12 @@ t_copy_conf_override_on_restarts(Config) ->
end.
t_copy_new_data_dir(Config) ->
net_kernel:start(['master1@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = cluster(
[cluster_spec({core, 4}), cluster_spec({core, 5}), cluster_spec({core, 6})], Config
?FUNCTION_NAME,
[cluster_spec({core, 4}), cluster_spec({core, 5}), cluster_spec({core, 6})],
Config
),
%% 1. Start all nodes
@ -81,11 +83,11 @@ t_copy_new_data_dir(Config) ->
end.
t_copy_deprecated_data_dir(Config) ->
net_kernel:start(['master2@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = cluster(
[cluster_spec({core, 7}), cluster_spec({core, 8}), cluster_spec({core, 9})], Config
?FUNCTION_NAME,
[cluster_spec({core, 7}), cluster_spec({core, 8}), cluster_spec({core, 9})],
Config
),
%% 1. Start all nodes
@ -108,11 +110,11 @@ t_copy_deprecated_data_dir(Config) ->
end.
t_no_copy_from_newer_version_node(Config) ->
net_kernel:start(['master2@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = cluster(
[cluster_spec({core, 10}), cluster_spec({core, 11}), cluster_spec({core, 12})], Config
?FUNCTION_NAME,
[cluster_spec({core, 10}), cluster_spec({core, 11}), cluster_spec({core, 12})],
Config
),
OKs = [ok, ok, ok],
[First | Rest] = Nodes = start_cluster(Cluster),
@ -222,39 +224,29 @@ assert_config_load_done(Nodes) ->
).
stop_cluster(Nodes) ->
emqx_utils:pmap(fun emqx_common_test_helpers:stop_peer/1, Nodes).
emqx_cth_cluster:stop(Nodes).
start_cluster(Specs) ->
[emqx_common_test_helpers:start_peer(Name, Opts) || {Name, Opts} <- Specs].
emqx_cth_cluster:start(Specs).
start_cluster_async(Specs) ->
restart_cluster_async(Specs) ->
[
begin
Opts1 = maps:remove(join_to, Opts),
spawn_link(fun() -> emqx_common_test_helpers:start_peer(Name, Opts1) end),
timer:sleep(7_000)
_Pid = spawn_link(emqx_cth_cluster, restart, [Spec]),
timer:sleep(1_000)
end
|| {Name, Opts} <- Specs
|| Spec <- Specs
].
cluster(Specs, Config) ->
PrivDataDir = ?config(priv_dir, Config),
Env = [
{emqx, boot_modules, []}
cluster(TC, Specs, Config) ->
Apps = [
{emqx, #{override_env => [{boot_modules, [broker]}]}},
{emqx_conf, #{}}
],
emqx_common_test_helpers:emqx_cluster(Specs, [
{env, Env},
{apps, [emqx_conf]},
{load_schema, false},
{priv_data_dir, PrivDataDir},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, []),
ok;
(_) ->
ok
end}
]).
emqx_cth_cluster:mk_nodespecs(
[{Name, #{role => Role, apps => Apps}} || {Role, Name} <- Specs],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
).
cluster_spec({Type, Num}) ->
{Type, list_to_atom(atom_to_list(?MODULE) ++ integer_to_list(Num))}.

View File

@ -22,12 +22,9 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-boot_mnesia({mnesia, [boot]}).
-behaviour(emqx_db_backup).
%% Mnesia bootstrap
-export([mnesia/1]).
-export([create_tables/0]).
-export([
add_user/4,
@ -70,7 +67,7 @@
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(?ADMIN, [
{type, set},
{rlog_shard, ?DASHBOARD_SHARD},
@ -83,7 +80,8 @@ mnesia(boot) ->
{write_concurrency, true}
]}
]}
]).
]),
[?ADMIN].
%%--------------------------------------------------------------------
%% Data backup

View File

@ -26,7 +26,12 @@
-include("emqx_dashboard.hrl").
start(_StartType, _StartArgs) ->
ok = mria_rlog:wait_for_shards([?DASHBOARD_SHARD], infinity),
Tables = lists:append([
emqx_dashboard_admin:create_tables(),
emqx_dashboard_token:create_tables(),
emqx_dashboard_monitor:create_tables()
]),
ok = mria:wait_for_tables(Tables),
{ok, Sup} = emqx_dashboard_sup:start_link(),
case emqx_dashboard:start_listeners() of
ok ->

View File

@ -22,8 +22,7 @@
-behaviour(gen_server).
-boot_mnesia({mnesia, [boot]}).
-export([create_tables/0]).
-export([start_link/0]).
-export([
@ -35,8 +34,6 @@
code_change/3
]).
-export([mnesia/1]).
-export([
samplers/0,
samplers/2,
@ -67,14 +64,15 @@
data :: map()
}).
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(?TAB, [
{type, set},
{local_content, true},
{storage, disc_copies},
{record_name, emqx_monit},
{attributes, record_info(fields, emqx_monit)}
]).
]),
[?TAB].
%% -------------------------------------------------------------------------------------------------
%% API

View File

@ -18,6 +18,8 @@
-include("emqx_dashboard.hrl").
-export([create_tables/0]).
-export([
sign/2,
verify/2,
@ -27,10 +29,6 @@
destroy_by_username/1
]).
-boot_mnesia({mnesia, [boot]}).
-export([mnesia/1]).
-ifdef(TEST).
-export([lookup_by_username/1, clean_expired_jwt/1]).
-endif.
@ -87,7 +85,7 @@ salt() ->
<<X:16/big-unsigned-integer>> = crypto:strong_rand_bytes(2),
iolist_to_binary(io_lib:format("~4.16.0b", [X])).
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(?TAB, [
{type, set},
{rlog_shard, ?DASHBOARD_SHARD},
@ -100,7 +98,8 @@ mnesia(boot) ->
{write_concurrency, true}
]}
]}
]).
]),
[?TAB].
%%--------------------------------------------------------------------
%% jwt apply

View File

@ -9,6 +9,7 @@
-include_lib("emqx_dashboard/include/emqx_dashboard.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_dashboard_sso_cli, [admins/1]).
@ -24,15 +25,19 @@
all() -> [t_add, t_passwd, t_del].
init_per_suite(Config) ->
_ = application:load(emqx_conf),
emqx_config:save_schema_mod_and_names(emqx_dashboard_schema),
emqx_mgmt_api_test_util:init_suite([emqx_dashboard, emqx_dashboard_sso]),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_dashboard,
emqx_dashboard_sso
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
All = emqx_dashboard_admin:all_users(),
[emqx_dashboard_admin:remove_user(Name) || #{username := Name} <- All],
emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_dashboard_sso]).
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
t_add(_) ->
admins(["add", "user1", "password1"]),

View File

@ -44,31 +44,28 @@ all() ->
].
init_per_suite(Config) ->
_ = application:load(emqx_conf),
emqx_config:save_schema_mod_and_names(emqx_dashboard_schema),
emqx_mgmt_api_test_util:init_suite([emqx_dashboard, emqx_dashboard_sso]),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
emqx_dashboard_sso
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
_ = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
All = emqx_dashboard_admin:all_users(),
[emqx_dashboard_admin:remove_user(Name) || #{username := Name} <- All],
emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_dashboard_sso]).
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(Case, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
?MODULE:Case({init, Config}),
Config.
end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}),
case erlang:whereis(node()) of
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
end,
ok.
?MODULE:Case({'end', Config}).
t_bad_create({init, Config}) ->
Config;

View File

@ -24,16 +24,11 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx_conf/include/emqx_conf.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster).
-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster").
-define(CONF_DEFAULT, <<
"\n"
"exhook {\n"
" servers = [\n"
" { name = default,\n"
@ -54,8 +49,6 @@
"}\n"
>>).
-import(emqx_common_test_helpers, [on_exit/1]).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
@ -63,47 +56,30 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Cfg) ->
application:load(emqx_conf),
ok = ekka:start(),
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok),
_ = emqx_exhook_demo_svr:start(),
load_cfg(?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_exhook]),
Cfg.
end_per_suite(_Cfg) ->
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
emqx_common_test_helpers:stop_apps([emqx_exhook]),
emqx_exhook_demo_svr:stop().
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
timer:sleep(200),
Config.
init_per_testcase(TC, Config) ->
Apps = emqx_cth_suite:start(
[
emqx,
{emqx_conf, emqx_conf(TC)},
{emqx_exhook, ?CONF_DEFAULT}
],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
[{tc_apps, Apps} | Config].
end_per_testcase(_, _Config) ->
case erlang:whereis(node()) of
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
end,
emqx_common_test_helpers:call_janitor(),
ok.
end_per_testcase(_, Config) ->
ok = emqx_cth_suite:stop(?config(tc_apps, Config)).
load_cfg(Cfg) ->
ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, Cfg).
emqx_conf(t_cluster_name) ->
io_lib:format("cluster.name = ~p", [?OTHER_CLUSTER_NAME_STRING]);
emqx_conf(_) ->
#{}.
%%--------------------------------------------------------------------
%% Test cases
@ -320,23 +296,6 @@ t_misc_test(_) ->
ok.
t_cluster_name(_) ->
SetEnvFun =
fun
(emqx) ->
application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM);
(emqx_exhook) ->
ok
end,
stop_apps([emqx, emqx_exhook]),
emqx_common_test_helpers:start_apps([emqx, emqx_exhook], SetEnvFun),
on_exit(fun() ->
stop_apps([emqx, emqx_exhook]),
load_cfg(?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_exhook]),
mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT])
end),
?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()),
emqx_exhook_mgr:disable(<<"default">>),
@ -364,7 +323,7 @@ t_stop_timeout(_) ->
),
%% stop application
application:stop(emqx_exhook),
ok = application:stop(emqx_exhook),
?block_until(#{?snk_kind := exhook_mgr_terminated}, 20000),
%% all exhook hooked point should be unloaded
@ -379,7 +338,7 @@ t_stop_timeout(_) ->
?assertEqual(false, lists:any(fun(M) -> M == emqx_exhook_handler end, Mods)),
%% ensure started for other tests
emqx_common_test_helpers:start_apps([emqx_exhook]),
{ok, _} = application:ensure_all_started(emqx_exhook),
snabbkaffe:stop(),
meck:unload(emqx_exhook_demo_svr).
@ -510,10 +469,6 @@ data_file(Name) ->
cert_file(Name) ->
data_file(filename:join(["certs", Name])).
%% FIXME: this creates inter-test dependency
stop_apps(Apps) ->
emqx_common_test_helpers:stop_apps(Apps, #{erase_all_configs => false}).
shuffle(List) ->
Sorted = lists:sort(lists:map(fun(L) -> {rand:uniform(), L} end, List)),
lists:map(fun({_, L}) -> L end, Sorted).

View File

@ -20,16 +20,13 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
-define(CONF_DEFAULT, <<
"\n"
"exhook {\n"
" servers =\n"
" [ { name = default,\n"
@ -56,54 +53,34 @@ all() ->
].
init_per_suite(Config) ->
application:load(emqx_conf),
ok = ekka:start(),
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok),
_ = emqx_exhook_demo_svr:start(),
load_cfg(?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_exhook]),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_management,
{emqx_exhook, ?CONF_DEFAULT},
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[Conf] = emqx:get_raw_config([exhook, servers]),
[{template, Conf} | Config].
[{suite_apps, Apps}, {template, Conf} | Config].
end_per_suite(Config) ->
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
emqx_mgmt_api_test_util:end_suite([emqx_exhook]),
emqx_exhook_demo_svr:stop(),
emqx_exhook_demo_svr:stop(<<"test1">>),
Config.
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(t_add, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
_ = emqx_exhook_demo_svr:start(<<"test1">>, 9001),
timer:sleep(200),
Config;
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
timer:sleep(200),
Config.
end_per_testcase(_, Config) ->
case erlang:whereis(node()) of
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
end,
Config.
load_cfg(Cfg) ->
ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, Cfg).
end_per_testcase(_, _Config) ->
ok.
%%--------------------------------------------------------------------
%% Test cases

View File

@ -27,7 +27,6 @@
-define(TARGET_HOOK, 'message.publish').
-define(CONF, <<
"\n"
"exhook {\n"
" servers = [\n"
" { name = succed,\n"
@ -48,24 +47,24 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Cfg) ->
application:load(emqx_conf),
meck:new(emqx_exhook_mgr, [non_strict, passthrough, no_link]),
meck:new(emqx_exhook_demo_svr, [non_strict, passthrough, no_link]),
meck:expect(emqx_exhook_mgr, refresh_tick, fun() -> ok end),
init_injections(hook_injects()),
emqx_exhook_SUITE:load_cfg(?CONF),
_ = emqx_exhook_demo_svr:start(),
_ = emqx_exhook_demo_svr:start(failed, 9001),
emqx_common_test_helpers:start_apps([emqx_exhook]),
Cfg.
Apps = emqx_cth_suite:start(
[emqx, {emqx_exhook, ?CONF}],
#{work_dir => emqx_cth_suite:work_dir(Cfg)}
),
[{suite_apps, Apps} | Cfg].
end_per_suite(_Cfg) ->
end_per_suite(Cfg) ->
meck:unload(emqx_exhook_demo_svr),
meck:unload(emqx_exhook_mgr),
emqx_exhook_demo_svr:stop(),
emqx_exhook_demo_svr:stop(failed),
emqx_common_test_helpers:stop_apps([emqx_exhook]).
ok = emqx_cth_suite:stop(?config(suite_apps, Cfg)).
init_per_testcase(_, Config) ->
clear_metrics(),

View File

@ -17,121 +17,32 @@ all() ->
init_per_suite(Config) ->
emqx_license_test_lib:mock_parser(),
_ = application:load(emqx_conf),
emqx_config:save_schema_mod_and_names(emqx_license_schema),
emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_license, "license { key = \"default\" }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
end_per_suite(Config) ->
emqx_license_test_lib:unmock_parser(),
emqx_common_test_helpers:stop_apps([emqx_license]),
ok.
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(Case, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Paths = set_override_paths(Case),
Config0 = setup_test(Case, Config),
Paths ++ Config0 ++ Config.
setup_test(Case, Config) ++ Config.
end_per_testcase(Case, Config) ->
clean_overrides(Case, Config),
teardown_test(Case, Config),
ok.
teardown_test(Case, Config).
set_override_paths(_TestCase) ->
[].
clean_overrides(_TestCase, _Config) ->
ok.
setup_test(TestCase, Config) when
TestCase =:= t_update_file_cluster_backup
->
DataDir = ?config(data_dir, Config),
{LicenseKey, _License} = mk_license(
[
%% license format version
"220111",
%% license type
"0",
%% customer type
"10",
%% customer name
"Foo",
%% customer email
"contact@foo.com",
%% deplayment name
"bar-deployment",
%% start date
"20220111",
%% days
"100000",
%% max connections
"19"
]
),
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
{apps, [emqx_conf, emqx_license]},
{load_schema, false},
{schema_mod, emqx_enterprise_schema},
{env_handler, fun
(emqx) ->
emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
%% emqx_config:save_schema_mod_and_names(emqx_license_schema),
application:set_env(emqx, boot_modules, []),
application:set_env(
emqx,
data_dir,
filename:join([
DataDir,
TestCase,
node()
])
),
ok;
(emqx_conf) ->
emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
%% emqx_config:save_schema_mod_and_names(emqx_license_schema),
application:set_env(
emqx,
data_dir,
filename:join([
DataDir,
TestCase,
node()
])
),
ok;
(emqx_license) ->
set_special_configs(emqx_license),
ok;
(_) ->
ok
end}
]
),
Nodes = [emqx_common_test_helpers:start_peer(Name, Opts) || {Name, Opts} <- Cluster],
[{nodes, Nodes}, {cluster, Cluster}, {old_license, LicenseKey}];
setup_test(_TestCase, _Config) ->
[].
teardown_test(_TestCase, _Config) ->
ok.
set_special_configs(emqx_license) ->
Config = #{key => default},
emqx_config:put([license], Config),
RawConfig = #{<<"key">> => <<"default">>},
emqx_config:put_raw([<<"license">>], RawConfig);
set_special_configs(_) ->
ok.
assert_on_nodes(Nodes, RunFun, CheckFun) ->
Res = [{N, erpc:call(N, RunFun)} || N <- Nodes],
lists:foreach(CheckFun, Res).
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------

View File

@ -14,34 +14,35 @@
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(CtConfig) ->
_ = application:load(emqx_conf),
init_per_suite(Config) ->
emqx_license_test_lib:mock_parser(),
ok = emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1),
CtConfig.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_license, #{
config => #{license => #{key => emqx_license_test_lib:default_test_license()}}
}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
end_per_suite(Config) ->
emqx_license_test_lib:unmock_parser(),
ok = emqx_common_test_helpers:stop_apps([emqx_license]).
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(t_default_limits, Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_license]),
ok = application:stop(emqx_license),
Config;
init_per_testcase(_Case, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(t_default_limits, _Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1);
{ok, _} = application:ensure_all_started(emqx_license);
end_per_testcase(_Case, _Config) ->
ok.
set_special_configs(emqx_license) ->
Config = #{key => emqx_license_test_lib:default_test_license()},
emqx_config:put([license], Config);
set_special_configs(_) ->
ok.
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------

View File

@ -14,32 +14,29 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
_ = application:load(emqx_conf),
emqx_config:save_schema_mod_and_names(emqx_license_schema),
emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_license, #{
config => #{license => #{key => emqx_license_test_lib:default_license()}}
}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
emqx_common_test_helpers:stop_apps([emqx_license]),
ok.
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(_Case, Config) ->
emqx_license_test_lib:mock_parser(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_Case, _Config) ->
emqx_license_test_lib:unmock_parser(),
ok.
set_special_configs(emqx_license) ->
Config = #{key => emqx_license_test_lib:default_license()},
emqx_config:put([license], Config),
RawConfig = #{<<"key">> => emqx_license_test_lib:default_license()},
emqx_config:put_raw([<<"license">>], RawConfig);
set_special_configs(_) ->
ok.
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------

View File

@ -7,7 +7,6 @@
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -20,41 +19,34 @@ all() ->
init_per_suite(Config) ->
emqx_license_test_lib:mock_parser(),
_ = application:load(emqx_conf),
emqx_config:save_schema_mod_and_names(emqx_license_schema),
emqx_common_test_helpers:start_apps([emqx_license, emqx_dashboard], fun set_special_configs/1),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_license, #{
config => #{
license => #{
key => emqx_license_test_lib:make_license(#{max_connections => "100"}),
connection_low_watermark => <<"75%">>,
connection_high_watermark => <<"80%">>
}
}
}},
{emqx_dashboard,
"dashboard {"
"\n listeners.http { enable = true, bind = 18083 }"
"\n default_username = \"license_admin\""
"\n}"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
emqx_common_test_helpers:stop_apps([emqx_license, emqx_dashboard]),
LicenseKey = emqx_license_test_lib:make_license(#{max_connections => "100"}),
Config = #{key => LicenseKey},
emqx_config:put([license], Config),
RawConfig = #{<<"key">> => LicenseKey},
emqx_config:put_raw([<<"license">>], RawConfig),
end_per_suite(Config) ->
emqx_license_test_lib:unmock_parser(),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(<<"license_admin">>);
set_special_configs(emqx_license) ->
LicenseKey = emqx_license_test_lib:make_license(#{max_connections => "100"}),
Config = #{
key => LicenseKey, connection_low_watermark => 0.75, connection_high_watermark => 0.8
},
emqx_config:put([license], Config),
RawConfig = #{
<<"key">> => LicenseKey,
<<"connection_low_watermark">> => <<"75%">>,
<<"connection_high_watermark">> => <<"80%">>
},
emqx_config:put_raw([<<"license">>], RawConfig),
ok;
set_special_configs(_) ->
ok.
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(_TestCase, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_TestCase, _Config) ->

View File

@ -14,26 +14,20 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
_ = application:load(emqx_conf),
emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_license, #{
config => #{license => #{key => emqx_license_test_lib:default_license()}}
}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
emqx_common_test_helpers:stop_apps([emqx_license]),
ok.
init_per_testcase(_Case, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_Case, _Config) ->
ok.
set_special_configs(emqx_license) ->
Config = #{key => emqx_license_test_lib:default_license()},
emqx_config:put([license], Config);
set_special_configs(_) ->
ok.
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
%%------------------------------------------------------------------------------
%% Tests

View File

@ -15,26 +15,20 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
_ = application:load(emqx_conf),
emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_license, #{
config => #{license => #{key => emqx_license_test_lib:default_license()}}
}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
emqx_common_test_helpers:stop_apps([emqx_license]),
ok.
init_per_testcase(_Case, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_Case, _Config) ->
ok.
set_special_configs(emqx_license) ->
Config = #{key => emqx_license_test_lib:default_license()},
emqx_config:put([license], Config);
set_special_configs(_) ->
ok.
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
%%------------------------------------------------------------------------------
%% Tests

View File

@ -28,6 +28,7 @@
-include("emqx_mgmt.hrl").
start(_Type, _Args) ->
ok = mria:wait_for_tables(emqx_mgmt_auth:create_tables()),
case emqx_mgmt_auth:init_bootstrap_file() of
ok ->
emqx_conf:add_handler([api_key], emqx_mgmt_auth),

View File

@ -22,8 +22,8 @@
-behaviour(emqx_db_backup).
%% API
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([create_tables/0]).
-behaviour(emqx_config_handler).
-export([
@ -70,7 +70,7 @@
-define(DEFAULT_HASH_LEN, 16).
mnesia(boot) ->
create_tables() ->
Fields = record_info(fields, ?APP),
ok = mria:create_table(?APP, [
{type, set},
@ -78,7 +78,8 @@ mnesia(boot) ->
{storage, disc_copies},
{record_name, ?APP},
{attributes, Fields}
]).
]),
[?APP].
%%--------------------------------------------------------------------
%% Data backup

View File

@ -37,13 +37,27 @@ end_per_suite(_) ->
%% cases
%%--------------------------------------------------------------------
t_cluster_query(_Config) ->
t_cluster_query(Config) ->
net_kernel:start(['master@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
[{Name, Opts}, {Name1, Opts1}] = cluster_specs(),
Node1 = emqx_common_test_helpers:start_peer(Name, Opts),
Node2 = emqx_common_test_helpers:start_peer(Name1, Opts1),
ListenerConf = fun(Port) ->
io_lib:format(
"\n listeners.tcp.default.bind = ~p"
"\n listeners.ssl.default.enable = false"
"\n listeners.ws.default.enable = false"
"\n listeners.wss.default.enable = false",
[Port]
)
end,
Nodes =
[Node1, Node2] = emqx_cth_cluster:start(
[
{corenode1, #{role => core, apps => [{emqx, ListenerConf(2883)}, emqx_management]}},
{corenode2, #{role => core, apps => [{emqx, ListenerConf(3883)}, emqx_management]}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
try
process_flag(trap_exit, true),
ClientLs1 = [start_emqtt_client(Node1, I, 2883) || I <- lists:seq(1, 10)],
@ -168,13 +182,19 @@ t_cluster_query(_Config) ->
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2)
after
emqx_common_test_helpers:stop_peer(Node1),
emqx_common_test_helpers:stop_peer(Node2)
end,
ok.
emqx_cth_cluster:stop(Nodes)
end.
t_bad_rpc(_) ->
emqx_mgmt_api_test_util:init_suite(),
t_bad_rpc(Config) ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
process_flag(trap_exit, true),
ClientLs1 = [start_emqtt_client(node(), I, 1883) || I <- lists:seq(1, 10)],
Path = emqx_mgmt_api_test_util:api_path(["clients?limit=2&page=2"]),
@ -187,35 +207,13 @@ t_bad_rpc(_) ->
after
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),
meck:unload(emqx),
emqx_mgmt_api_test_util:end_suite()
emqx_cth_suite:stop(Apps)
end.
%%--------------------------------------------------------------------
%% helpers
%%--------------------------------------------------------------------
cluster_specs() ->
Specs =
%% default listeners port
[
{core, corenode1, #{listener_ports => [{tcp, 2883}]}},
{core, corenode2, #{listener_ports => [{tcp, 3883}]}}
],
CommOpts =
[
{env, [{emqx, boot_modules, all}]},
{apps, []},
{conf, [
{[listeners, ssl, default, enable], false},
{[listeners, ws, default, enable], false},
{[listeners, wss, default, enable], false}
]}
],
emqx_common_test_helpers:emqx_cluster(
Specs,
CommOpts
).
start_emqtt_client(Node0, N, Port) ->
Node = atom_to_binary(Node0),
ClientId = iolist_to_binary([Node, "-", integer_to_binary(N)]),

View File

@ -27,14 +27,19 @@
all() ->
[
{group, with_defaults_in_file},
{group, without_defaults_in_file}
{group, without_defaults_in_file},
{group, max_connections}
].
groups() ->
AllTests = emqx_common_test_helpers:all(?MODULE),
MaxConnTests = [
t_max_connection_default
],
[
{with_defaults_in_file, AllTests},
{without_defaults_in_file, AllTests}
{with_defaults_in_file, AllTests -- MaxConnTests},
{without_defaults_in_file, AllTests -- MaxConnTests},
{max_connections, MaxConnTests}
].
init_per_suite(Config) ->
@ -44,29 +49,39 @@ end_per_suite(_Config) ->
ok.
init_per_group(without_defaults_in_file, Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config;
init_group_apps(#{}, Config);
init_per_group(with_defaults_in_file, Config) ->
%% we have to materialize the config file with default values for this test group
%% because we want to test the deletion of non-existing listener
%% if there is no config file, the such deletion would result in a deletion
%% of the default listener.
Name = atom_to_list(?MODULE) ++ "-default-listeners",
TmpConfFullPath = inject_tmp_config_content(Name, default_listeners_hocon_text()),
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
[{injected_conf_file, TmpConfFullPath} | Config].
PrivDir = ?config(priv_dir, Config),
FileName = filename:join([PrivDir, "etc", atom_to_list(?MODULE) ++ "-default-listeners"]),
ok = filelib:ensure_dir(FileName),
ok = file:write_file(FileName, default_listeners_hocon_text()),
init_group_apps("include \"" ++ FileName ++ "\"", Config);
init_per_group(max_connections, Config) ->
init_group_apps(
io_lib:format("listeners.tcp.max_connection_test {bind = \"0.0.0.0:~p\"}", [?PORT]),
Config
).
end_per_group(Group, Config) ->
emqx_conf:tombstone([listeners, tcp, new], #{override_to => cluster}),
emqx_conf:tombstone([listeners, tcp, new1], #{override_to => local}),
case Group =:= with_defaults_in_file of
true ->
{_, File} = lists:keyfind(injected_conf_file, 1, Config),
ok = file:delete(File);
false ->
ok
end,
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
init_group_apps(Config, CTConfig) ->
Apps = emqx_cth_suite:start(
[
{emqx_conf, Config},
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{
work_dir => emqx_cth_suite:work_dir(CTConfig)
}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | CTConfig].
end_per_group(_Group, Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(Case, Config) ->
try
@ -84,16 +99,6 @@ end_per_testcase(Case, Config) ->
ok
end.
t_max_connection_default({init, Config}) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]),
Port = integer_to_binary(?PORT),
Bin = <<"listeners.tcp.max_connection_test {bind = \"0.0.0.0:", Port/binary, "\"}">>,
TmpConfName = atom_to_list(?FUNCTION_NAME) ++ ".conf",
TmpConfFullPath = inject_tmp_config_content(TmpConfName, Bin),
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
[{tmp_config_file, TmpConfFullPath} | Config];
t_max_connection_default({'end', Config}) ->
ok = file:delete(proplists:get_value(tmp_config_file, Config));
t_max_connection_default(Config) when is_list(Config) ->
#{<<"listeners">> := Listeners} = emqx_mgmt_api_listeners:do_list_listeners(),
Target = lists:filter(
@ -189,13 +194,19 @@ t_wss_crud_listeners_by_id(Config) when is_list(Config) ->
crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, 34000).
t_api_listeners_list_not_ready(Config) when is_list(Config) ->
net_kernel:start(['listeners@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = [{Name, Opts}, {Name1, Opts1}] = cluster([core, core]),
ct:pal("Starting ~p", [Cluster]),
Node1 = emqx_common_test_helpers:start_peer(Name, Opts),
Node2 = emqx_common_test_helpers:start_peer(Name1, Opts1),
Apps = [
{emqx, #{after_start => fun() -> emqx_app:set_config_loader(emqx) end}},
{emqx_conf, #{}}
],
Nodes =
[Node1, Node2] = emqx_cth_cluster:start(
[
{t_api_listeners_list_not_ready1, #{role => core, apps => Apps}},
{t_api_listeners_list_not_ready2, #{role => core, apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
try
L1 = get_tcp_listeners(Node1),
@ -214,8 +225,7 @@ t_api_listeners_list_not_ready(Config) when is_list(Config) ->
?assert(length(L1) > length(L2), Comment),
?assertEqual(length(L2), length(L3), Comment)
after
emqx_common_test_helpers:stop_peer(Node1),
emqx_common_test_helpers:stop_peer(Node2)
emqx_cth_cluster:stop(Nodes)
end.
t_clear_certs(Config) when is_list(Config) ->
@ -296,25 +306,6 @@ assert_config_load_not_done(Node) ->
Prio = rpc:call(Node, emqx_app, get_config_loader, []),
?assertEqual(emqx, Prio, #{node => Node}).
cluster(Specs) ->
Env = [
{emqx, boot_modules, []}
],
emqx_common_test_helpers:emqx_cluster(Specs, [
{env, Env},
{apps, [emqx_conf]},
{load_schema, false},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, []),
%% test init_config not ready.
emqx_app:set_config_loader(emqx),
ok;
(_) ->
ok
end}
]).
crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, PortBase) ->
OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
@ -529,15 +520,3 @@ default_listeners_hocon_text() ->
Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}),
Config = #{<<"listeners">> => Listeners},
hocon_pp:do(Config, #{}).
%% inject a 'include' at the end of emqx.conf.all
%% the 'include' can be kept after test,
%% as long as the file has been deleted it is a no-op
inject_tmp_config_content(TmpFile, Content) ->
Etc = filename:join(["etc", "emqx.conf.all"]),
Inc = filename:join(["etc", TmpFile]),
ConfFile = emqx_common_test_helpers:app_path(emqx_conf, Etc),
TmpFileFullPath = emqx_common_test_helpers:app_path(emqx_conf, Inc),
ok = file:write_file(TmpFileFullPath, Content),
ok = file:write_file(ConfFile, ["\ninclude \"", TmpFileFullPath, "\"\n"], [append]),
TmpFileFullPath.

View File

@ -19,16 +19,25 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
Config.
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(t_log_path, Config) ->
emqx_config_logger:add_handler(),
@ -121,16 +130,17 @@ t_node_metrics_api(_) ->
emqx_mgmt_api_test_util:request_api(get, BadNodePath)
).
t_multiple_nodes_api(_) ->
net_kernel:start(['node_api@127.0.0.1', longnames]),
t_multiple_nodes_api(Config) ->
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Seq1 = list_to_atom(atom_to_list(?MODULE) ++ "1"),
Seq2 = list_to_atom(atom_to_list(?MODULE) ++ "2"),
Cluster = [{Name, Opts}, {Name1, Opts1}] = cluster([{core, Seq1}, {core, Seq2}]),
ct:pal("Starting ~p", [Cluster]),
Node1 = emqx_common_test_helpers:start_peer(Name, Opts),
Node2 = emqx_common_test_helpers:start_peer(Name1, Opts1),
Nodes =
[Node1, Node2] = emqx_cth_cluster:start(
[
{t_multiple_nodes_api1, #{role => core, apps => [emqx_conf, emqx_management]}},
{t_multiple_nodes_api2, #{role => core, apps => [emqx_conf, emqx_management]}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
try
{200, NodesList} = rpc:call(Node1, emqx_mgmt_api_nodes, nodes, [get, #{}]),
All = [Node1, Node2],
@ -148,22 +158,6 @@ t_multiple_nodes_api(_) ->
]),
?assertMatch(#{node := Node1}, Node11)
after
emqx_common_test_helpers:stop_peer(Node1),
emqx_common_test_helpers:stop_peer(Node2)
emqx_cth_cluster:stop(Nodes)
end,
ok.
cluster(Specs) ->
Env = [{emqx, boot_modules, []}],
emqx_common_test_helpers:emqx_cluster(Specs, [
{env, Env},
{apps, [emqx_conf, emqx_management]},
{load_schema, false},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, []),
ok;
(_) ->
ok
end}
]).

View File

@ -30,8 +30,9 @@ init_suite(Apps, SetConfigs) when is_function(SetConfigs) ->
init_suite(Apps, SetConfigs, #{}).
init_suite(Apps, SetConfigs, Opts) ->
application:load(emqx_management),
emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], SetConfigs, Opts),
emqx_common_test_helpers:start_apps(
Apps ++ [emqx_management, emqx_dashboard], SetConfigs, Opts
),
_ = emqx_common_test_http:create_default_app(),
ok.
@ -40,8 +41,7 @@ end_suite() ->
end_suite(Apps) ->
emqx_common_test_http:delete_default_app(),
emqx_common_test_helpers:stop_apps(Apps ++ [emqx_dashboard]),
application:unload(emqx_management),
emqx_common_test_helpers:stop_apps(Apps ++ [emqx_management, emqx_dashboard]),
ok.
set_special_configs(emqx_dashboard) ->

View File

@ -25,12 +25,8 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-export([
create_tables/0,
start_link/0,
on_message_publish/1
]).
@ -118,14 +114,16 @@
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
mnesia(boot) ->
create_tables() ->
ok = mria:create_table(?TAB, [
{type, ordered_set},
{storage, disc_copies},
{local_content, true},
{record_name, delayed_message},
{attributes, record_info(fields, delayed_message)}
]).
]),
[?TAB].
%%------------------------------------------------------------------------------
%% Hooks

View File

@ -24,6 +24,7 @@
]).
start(_Type, _Args) ->
ok = mria:wait_for_tables(emqx_delayed:create_tables()),
{ok, Sup} = emqx_modules_sup:start_link(),
maybe_enable_modules(),
{ok, Sup}.

View File

@ -73,23 +73,21 @@ end_per_group(_Group, _Config) ->
ok.
init_per_suite(Config) ->
WorkDir = proplists:get_value(data_dir, Config),
filelib:ensure_path(WorkDir),
OrigInstallDir = emqx_plugins:get_config(install_dir, undefined),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_plugins]),
emqx_plugins:put_config(install_dir, WorkDir),
[{orig_install_dir, OrigInstallDir} | Config].
WorkDir = emqx_cth_suite:work_dir(Config),
InstallDir = filename:join([WorkDir, "plugins"]),
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx_ctl,
{emqx_plugins, #{config => #{plugins => #{install_dir => InstallDir}}}}
],
#{work_dir => WorkDir}
),
ok = filelib:ensure_path(InstallDir),
[{suite_apps, Apps}, {install_dir, InstallDir} | Config].
end_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_config:erase(plugins),
%% restore config
case proplists:get_value(orig_install_dir, Config) of
undefined -> ok;
OrigInstallDir -> emqx_plugins:put_config(install_dir, OrigInstallDir)
end,
emqx_common_test_helpers:stop_apps([emqx_plugins, emqx_conf]),
ok.
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(TestCase, Config) ->
emqx_plugins:put_configured([]),
@ -206,7 +204,7 @@ t_demo_install_start_stop_uninstall(Config) ->
%% but since we are using hocon:load to load it
%% ad-hoc test files can be in hocon format
write_info_file(Config, NameVsn, Content) ->
WorkDir = proplists:get_value(data_dir, Config),
WorkDir = proplists:get_value(install_dir, Config),
InfoFile = filename:join([WorkDir, NameVsn, "release.json"]),
ok = filelib:ensure_dir(InfoFile),
ok = file:write_file(InfoFile, Content).
@ -371,7 +369,7 @@ t_bad_tar_gz({init, Config}) ->
t_bad_tar_gz({'end', _Config}) ->
ok;
t_bad_tar_gz(Config) ->
WorkDir = proplists:get_value(data_dir, Config),
WorkDir = proplists:get_value(install_dir, Config),
FakeTarTz = filename:join([WorkDir, "fake-vsn.tar.gz"]),
ok = file:write_file(FakeTarTz, "a\n"),
?assertMatch(
@ -396,7 +394,7 @@ t_bad_tar_gz(Config) ->
%% create with incomplete info file
%% failed install attempts should not leave behind extracted dir
t_bad_tar_gz2({init, Config}) ->
WorkDir = proplists:get_value(data_dir, Config),
WorkDir = proplists:get_value(install_dir, Config),
NameVsn = "foo-0.2",
%% this an invalid info file content (description missing)
BadInfo = "name=foo, rel_vsn=\"0.2\", rel_apps=[foo]",
@ -422,7 +420,7 @@ t_bad_tar_gz2(Config) ->
%% test that we even cleanup content that doesn't match the expected name-vsn
%% pattern
t_tar_vsn_content_mismatch({init, Config}) ->
WorkDir = proplists:get_value(data_dir, Config),
WorkDir = proplists:get_value(install_dir, Config),
NameVsn = "bad_tar-0.2",
%% this an invalid info file content
BadInfo = "name=foo, rel_vsn=\"0.2\", rel_apps=[\"foo-0.2\"], description=\"lorem ipsum\"",
@ -606,7 +604,7 @@ t_load_config_from_cli(Config) when is_list(Config) ->
ok.
group_t_copy_plugin_to_a_new_node({init, Config}) ->
WorkDir = proplists:get_value(data_dir, Config),
WorkDir = proplists:get_value(install_dir, Config),
FromInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_from)),
file:del_dir_r(FromInstallDir),
ok = filelib:ensure_path(FromInstallDir),
@ -614,25 +612,25 @@ group_t_copy_plugin_to_a_new_node({init, Config}) ->
file:del_dir_r(ToInstallDir),
ok = filelib:ensure_path(ToInstallDir),
#{package := Package, release_name := PluginName} = get_demo_plugin_package(FromInstallDir),
[{CopyFrom, CopyFromOpts}, {CopyTo, CopyToOpts}] =
emqx_common_test_helpers:emqx_cluster(
Apps = [
emqx,
emqx_conf,
emqx_ctl,
emqx_plugins
],
[SpecCopyFrom, SpecCopyTo] =
emqx_cth_cluster:mk_nodespecs(
[
{core, plugins_copy_from},
{core, plugins_copy_to}
{plugins_copy_from, #{role => core, apps => Apps}},
{plugins_copy_to, #{role => core, apps => Apps}}
],
#{
apps => [emqx_conf, emqx_plugins],
env => [
{emqx, boot_modules, []}
],
load_schema => false
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
}
),
CopyFromNode = emqx_common_test_helpers:start_peer(
CopyFrom, maps:remove(join_to, CopyFromOpts)
),
[CopyFromNode] = emqx_cth_cluster:start([SpecCopyFrom#{join_to => undefined}]),
ok = rpc:call(CopyFromNode, emqx_plugins, put_config, [install_dir, FromInstallDir]),
CopyToNode = emqx_common_test_helpers:start_peer(CopyTo, maps:remove(join_to, CopyToOpts)),
[CopyToNode] = emqx_cth_cluster:start([SpecCopyTo#{join_to => undefined}]),
ok = rpc:call(CopyToNode, emqx_plugins, put_config, [install_dir, ToInstallDir]),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]),
@ -656,16 +654,9 @@ group_t_copy_plugin_to_a_new_node({init, Config}) ->
| Config
];
group_t_copy_plugin_to_a_new_node({'end', Config}) ->
CopyFromNode = proplists:get_value(copy_from_node, Config),
CopyToNode = proplists:get_value(copy_to_node, Config),
ok = rpc:call(CopyFromNode, emqx_config, delete_override_conf_files, []),
ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []),
rpc:call(CopyToNode, ekka, leave, []),
rpc:call(CopyFromNode, ekka, leave, []),
ok = emqx_common_test_helpers:stop_peer(CopyToNode),
ok = emqx_common_test_helpers:stop_peer(CopyFromNode),
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
ok = file:del_dir_r(proplists:get_value(from_install_dir, Config));
CopyFromNode = ?config(copy_from_node, Config),
CopyToNode = ?config(copy_to_node, Config),
ok = emqx_cth_cluster:stop([CopyFromNode, CopyToNode]);
group_t_copy_plugin_to_a_new_node(Config) ->
CopyFromNode = proplists:get_value(copy_from_node, Config),
CopyToNode = proplists:get_value(copy_to_node, Config),
@ -706,62 +697,48 @@ group_t_copy_plugin_to_a_new_node(Config) ->
%% checks that we can start a cluster with a lone node.
group_t_copy_plugin_to_a_new_node_single_node({init, Config}) ->
PrivDataDir = ?config(priv_dir, Config),
ToInstallDir = filename:join(PrivDataDir, "plugins_copy_to"),
WorkDir = ?config(install_dir, Config),
ToInstallDir = filename:join(WorkDir, "plugins_copy_to"),
file:del_dir_r(ToInstallDir),
ok = filelib:ensure_path(ToInstallDir),
#{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
[{CopyTo, CopyToOpts}] =
emqx_common_test_helpers:emqx_cluster(
[
{core, plugins_copy_to}
],
#{
apps => [emqx_conf, emqx_plugins],
env => [
{emqx, boot_modules, []}
],
env_handler => fun
(emqx_plugins) ->
ok = emqx_plugins:put_config(install_dir, ToInstallDir),
%% this is to simulate an user setting the state
%% via environment variables before starting the node
ok = emqx_plugins:put_config(
states,
[#{name_vsn => NameVsn, enable => true}]
),
ok;
(_) ->
ok
end,
priv_data_dir => PrivDataDir,
schema_mod => emqx_conf_schema,
load_schema => true
Apps = [
emqx,
emqx_conf,
emqx_ctl,
{emqx_plugins, #{
config => #{
plugins => #{
install_dir => ToInstallDir,
states => [#{name_vsn => NameVsn, enable => true}]
}
}
),
}}
],
[CopyToNode] = emqx_cth_cluster:start(
[{plugins_copy_to, #{role => core, apps => Apps}}],
#{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
}
),
[
{to_install_dir, ToInstallDir},
{copy_to_node_name, CopyTo},
{copy_to_opts, CopyToOpts},
{copy_to_node, CopyToNode},
{name_vsn, NameVsn},
{plugin_name, PluginName}
| Config
];
group_t_copy_plugin_to_a_new_node_single_node({'end', Config}) ->
CopyToNode = proplists:get_value(copy_to_node_name, Config),
ok = emqx_common_test_helpers:stop_peer(CopyToNode),
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
ok;
CopyToNode = proplists:get_value(copy_to_node, Config),
ok = emqx_cth_cluster:stop([CopyToNode]);
group_t_copy_plugin_to_a_new_node_single_node(Config) ->
CopyTo = ?config(copy_to_node_name, Config),
CopyToOpts = ?config(copy_to_opts, Config),
CopyToNode = ?config(copy_to_node, Config),
ToInstallDir = ?config(to_install_dir, Config),
NameVsn = proplists:get_value(name_vsn, Config),
%% Start the node for the first time. The plugin should start
%% successfully even if it's not extracted yet. Simply starting
%% the node would crash if not working properly.
CopyToNode = emqx_common_test_helpers:start_peer(CopyTo, CopyToOpts),
ct:pal("~p config:\n ~p", [
CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config, [[], #{}])
]),
@ -775,52 +752,44 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
ok.
group_t_cluster_leave({init, Config}) ->
PrivDataDir = ?config(priv_dir, Config),
ToInstallDir = filename:join(PrivDataDir, "plugins_copy_to"),
WorkDir = ?config(install_dir, Config),
ToInstallDir = filename:join(WorkDir, "plugins_copy_to"),
file:del_dir_r(ToInstallDir),
ok = filelib:ensure_path(ToInstallDir),
#{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
Cluster =
emqx_common_test_helpers:emqx_cluster(
[core, core],
#{
apps => [emqx_conf, emqx_plugins],
env => [
{emqx, boot_modules, []}
],
env_handler => fun
(emqx_plugins) ->
ok = emqx_plugins:put_config(install_dir, ToInstallDir),
%% this is to simulate an user setting the state
%% via environment variables before starting the node
ok = emqx_plugins:put_config(
states,
[#{name_vsn => NameVsn, enable => true}]
),
ok;
(_) ->
ok
end,
priv_data_dir => PrivDataDir,
schema_mod => emqx_conf_schema,
load_schema => true
Apps = [
emqx,
emqx_conf,
emqx_ctl,
{emqx_plugins, #{
config => #{
plugins => #{
install_dir => ToInstallDir,
states => [#{name_vsn => NameVsn, enable => true}]
}
}
),
Nodes = [emqx_common_test_helpers:start_peer(Name, Opts) || {Name, Opts} <- Cluster],
}}
],
Nodes = emqx_cth_cluster:start(
[
{group_t_cluster_leave1, #{role => core, apps => Apps}},
{group_t_cluster_leave2, #{role => core, apps => Apps}}
],
#{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
}
),
[
{to_install_dir, ToInstallDir},
{cluster, Cluster},
{nodes, Nodes},
{name_vsn, NameVsn},
{plugin_name, PluginName}
| Config
];
group_t_cluster_leave({'end', Config}) ->
Nodes = proplists:get_value(nodes, Config),
[ok = emqx_common_test_helpers:stop_peer(N) || N <- Nodes],
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
ok;
Nodes = ?config(nodes, Config),
ok = emqx_cth_cluster:stop(Nodes);
group_t_cluster_leave(Config) ->
[N1, N2] = ?config(nodes, Config),
NameVsn = proplists:get_value(name_vsn, Config),

View File

@ -17,13 +17,12 @@
-module(emqx_prometheus_SUITE).
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(LEGACY_CONF_DEFAULT, <<
"\n"
"prometheus {\n"
" push_gateway_server = \"http://127.0.0.1:9091\"\n"
" interval = \"1s\"\n"
@ -38,6 +37,7 @@
" vm_msacc_collector = disabled\n"
"}\n"
>>).
-define(CONF_DEFAULT, #{
<<"prometheus">> =>
#{
@ -84,40 +84,29 @@ common_tests() ->
emqx_common_test_helpers:all(?MODULE).
init_per_group(new_config, Config) ->
init_group(),
load_config(),
emqx_common_test_helpers:start_apps([emqx_prometheus]),
%% coverage olp metrics
{ok, _} = emqx:update_config([overload_protection, enable], true),
Config;
Apps = emqx_cth_suite:start(
[
%% coverage olp metrics
{emqx, "overload_protection.enable = true"},
{emqx_license, "license.key = default"},
{emqx_prometheus, #{config => config(default)}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config];
init_per_group(legacy_config, Config) ->
init_group(),
load_legacy_config(),
emqx_common_test_helpers:start_apps([emqx_prometheus]),
{ok, _} = emqx:update_config([overload_protection, enable], false),
Config.
init_group() ->
application:load(emqx_conf),
ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok),
meck:new(emqx_license_checker, [non_strict, passthrough, no_link]),
meck:expect(emqx_license_checker, expiry_epoch, fun() -> 1859673600 end).
end_group() ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
meck:unload(emqx_license_checker),
emqx_common_test_helpers:stop_apps([emqx_prometheus]).
Apps = emqx_cth_suite:start(
[
{emqx, "overload_protection.enable = false"},
{emqx_license, "license.key = default"},
{emqx_prometheus, #{config => config(legacy)}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_group(_Group, Config) ->
end_group(),
Config.
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(t_assert_push, Config) ->
meck:new(httpc, [passthrough]),
@ -137,11 +126,10 @@ end_per_testcase(t_assert_push, _Config) ->
end_per_testcase(_Testcase, _Config) ->
ok.
load_config() ->
ok = emqx_common_test_helpers:load_config(emqx_prometheus_schema, ?CONF_DEFAULT).
load_legacy_config() ->
ok = emqx_common_test_helpers:load_config(emqx_prometheus_schema, ?LEGACY_CONF_DEFAULT).
config(default) ->
?CONF_DEFAULT;
config(legacy) ->
?LEGACY_CONF_DEFAULT.
%%--------------------------------------------------------------------
%% Test cases

View File

@ -21,9 +21,6 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
%%--------------------------------------------------------------------
%% Setups
@ -41,41 +38,38 @@ groups() ->
].
init_per_suite(Config) ->
emqx_prometheus_SUITE:init_group(),
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_management,
{emqx_prometheus, #{start => false}},
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
{emqx_license, "license.key = default"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
emqx_prometheus_SUITE:end_group(),
emqx_mgmt_api_test_util:end_suite([emqx_conf]),
Config.
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_group(new_config, Config) ->
emqx_common_test_helpers:start_apps(
[emqx_prometheus],
fun(App) -> set_special_configs(App, new_config) end
Apps = emqx_cth_suite:start_app(
emqx_prometheus,
#{config => emqx_prometheus_SUITE:config(default)}
),
Config;
[{group_apps, Apps} | Config];
init_per_group(legacy_config, Config) ->
emqx_common_test_helpers:start_apps(
[emqx_prometheus],
fun(App) -> set_special_configs(App, legacy_config) end
Apps = emqx_cth_suite:start_app(
emqx_prometheus,
#{config => emqx_prometheus_SUITE:config(legacy)}
),
Config.
[{group_apps, Apps} | Config].
end_per_group(_Group, Config) ->
_ = application:stop(emqx_prometheus),
Config.
set_special_configs(emqx_dashboard, _) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(emqx_prometheus, new_config) ->
emqx_prometheus_SUITE:load_config(),
ok;
set_special_configs(emqx_prometheus, legacy_config) ->
emqx_prometheus_SUITE:load_legacy_config(),
ok;
set_special_configs(_App, _) ->
ok.
ok = emqx_cth_suite:stop_apps(?config(group_apps, Config)).
%%--------------------------------------------------------------------
%% Cases

View File

@ -2,7 +2,7 @@
{application, emqx_psk, [
{description, "EMQX PSK"},
% strict semver, bump manually!
{vsn, "5.0.5"},
{vsn, "5.0.6"},
{modules, []},
{registered, [emqx_psk_sup]},
{applications, [kernel, stdlib]},

View File

@ -32,6 +32,7 @@
]).
-export([
create_tables/0,
start_link/0,
stop/0
]).
@ -63,10 +64,6 @@
extra :: term()
}).
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-include("emqx_psk.hrl").
-define(CR, 13).
@ -81,8 +78,8 @@
%%------------------------------------------------------------------------------
%% @doc Create or replicate tables.
-spec mnesia(boot | copy) -> ok.
mnesia(boot) ->
-spec create_tables() -> [mria:table()].
create_tables() ->
ok = mria:create_table(?TAB, [
{rlog_shard, ?PSK_SHARD},
{type, ordered_set},
@ -90,7 +87,8 @@ mnesia(boot) ->
{record_name, psk_entry},
{attributes, record_info(fields, psk_entry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
]),
[?TAB].
%%------------------------------------------------------------------------------
%% Data backup

View File

@ -26,7 +26,7 @@
-include("emqx_psk.hrl").
start(_Type, _Args) ->
ok = mria:wait_for_tables([?TAB]),
ok = mria:wait_for_tables(emqx_psk:create_tables()),
emqx_conf:add_handler([?PSK_KEY], emqx_psk),
{ok, Sup} = emqx_psk_sup:start_link(),
{ok, Sup}.

View File

@ -34,30 +34,25 @@ groups() ->
].
init_per_suite(Config) ->
meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_config, get, fun
([psk_authentication, enable]) -> true;
([psk_authentication, chunk_size]) -> 50;
(KeyPath) -> meck:passthrough([KeyPath])
end),
meck:expect(emqx_config, get, fun
([psk_authentication, init_file], _) ->
filename:join([
code:lib_dir(emqx_psk, test),
"data/init.psk"
]);
([psk_authentication, separator], _) ->
<<":">>;
(KeyPath, Default) ->
meck:passthrough([KeyPath, Default])
end),
emqx_common_test_helpers:start_apps([emqx_psk]),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
{emqx_psk, #{
config => #{
psk_authentication => #{
enable => true,
init_file => filename:join(?config(data_dir, Config), "init.psk"),
separator => <<":">>
}
}
}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) ->
meck:unload(emqx_config),
emqx_common_test_helpers:stop_apps([emqx_psk]),
ok.
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
t_psk_lookup(_) ->
PSKIdentity1 = <<"myclient1">>,

View File

@ -47,8 +47,6 @@ common_tests() ->
emqx_common_test_helpers:all(?MODULE) -- [t_reindex].
-define(BASE_CONF, <<
""
"\n"
"retainer {\n"
" enable = true\n"
" msg_clear_interval = 0s\n"
@ -64,7 +62,6 @@ common_tests() ->
" max_retained_messages = 0\n"
" }\n"
"}"
""
>>).
%%--------------------------------------------------------------------
@ -72,18 +69,14 @@ common_tests() ->
%%--------------------------------------------------------------------
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_conf]),
load_conf(),
emqx_limiter_sup:start_link(),
timer:sleep(200),
ok = application:ensure_started(?APP),
Config.
Apps = emqx_cth_suite:start(
[emqx, emqx_conf, app_spec()],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
emqx_common_test_helpers:stop_apps([?APP, emqx_conf]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_group(mnesia_without_indices, Config) ->
mnesia:clear_table(?TAB_INDEX_META),
@ -113,10 +106,8 @@ init_per_testcase(t_get_basic_usage_info, Config) ->
init_per_testcase(_TestCase, Config) ->
Config.
load_conf() ->
ok = emqx_config:delete_override_conf_files(),
emqx_ratelimiter_SUITE:init_config(),
ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF).
app_spec() ->
{emqx_retainer, ?BASE_CONF}.
%%--------------------------------------------------------------------
%% Test Cases

View File

@ -22,18 +22,20 @@
-include("emqx_retainer.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_retainer_SUITE:load_conf(),
%% Start Apps
emqx_common_test_helpers:start_apps([emqx_retainer]),
Config.
Apps = emqx_cth_suite:start(
[emqx, emqx_conf, emqx_retainer_SUITE:app_spec()],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_retainer]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)).
t_reindex_status(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]).

View File

@ -20,17 +20,19 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_retainer_SUITE:load_conf(),
%% Start Apps
emqx_common_test_helpers:start_apps([emqx_retainer]),
Config.
Apps = emqx_cth_suite:start(
[emqx, emqx_conf, emqx_retainer_SUITE:app_spec()],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_retainer]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)).
client_info(Key, Client) ->
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).

View File

@ -20,76 +20,51 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
-define(NOW, erlang:system_time(millisecond)).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(LANTENCY, 101).
-define(BASE_CONF, <<
""
"\n"
"slow_subs {\n"
" enable = true\n"
" top_k_num = 5\n"
" threshold = 100ms\n"
" expire_interval = 5m\n"
" stats_type = whole\n"
" }"
""
"}"
>>).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_conf),
ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_slow_subs, ?BASE_CONF}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?BASE_CONF),
emqx_common_test_helpers:start_apps([emqx_slow_subs]),
Config.
end_per_suite(_Config) ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
emqx_common_test_helpers:stop_apps([emqx_slow_subs]).
init_per_testcase(t_expire, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
update_config(<<"expire_interval">>, <<"1500ms">>),
Config;
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
Config.
end_per_testcase(_, _) ->
case erlang:whereis(node()) of
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
end,
ok.
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_pub(_) ->
_ = [stats_with_type(Type) || Type <- [whole, internal, response]],
ok.
t_expire(_) ->
_ = update_config(<<"expire_interval">>, <<"1500ms">>),
Now = ?NOW,
Each = fun(I) ->
ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),

View File

@ -20,10 +20,8 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_management/include/emqx_mgmt.hrl").
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
-define(HOST, "http://127.0.0.1:18083/").
@ -32,63 +30,43 @@
-define(BASE_PATH, "api").
-define(NOW, erlang:system_time(millisecond)).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, <<
""
"\n"
"slow_subs\n"
"{\n"
"slow_subs {\n"
" enable = true\n"
" top_k_num = 5,\n"
" expire_interval = 60s\n"
" stats_type = whole\n"
"}"
""
>>).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_conf),
ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok),
ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_slow_subs]),
{ok, _} = application:ensure_all_started(emqx_auth),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_auth,
emqx_conf,
emqx_management,
{emqx_slow_subs, ?CONF_DEFAULT},
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
_ = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
application:stop(emqx_auth),
emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]),
Config.
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
application:ensure_all_started(emqx_slow_subs),
timer:sleep(500),
{ok, _} = application:ensure_all_started(emqx_slow_subs),
Config.
end_per_testcase(_, Config) ->
application:stop(emqx_slow_subs),
case erlang:whereis(node()) of
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
end,
Config.
end_per_testcase(_, _Config) ->
ok = application:stop(emqx_slow_subs).
t_get_history(_) ->
Now = ?NOW,