emqx/apps/emqx_node_rebalance/test/emqx_node_rebalance_SUITE.erl

277 lines
7.7 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(
emqx_eviction_agent_test_helpers,
[
emqtt_connect_many/1,
emqtt_connect_many/2,
emqtt_try_connect/1,
stop_many/1,
case_specific_node_name/3,
start_cluster/3,
stop_cluster/1
]
).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start([emqx], #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) ->
NodeNames =
[
case_specific_node_name(?MODULE, Case, '_donor'),
case_specific_node_name(?MODULE, Case, '_recipient')
],
ClusterNodes = start_cluster(
Config,
NodeNames,
[emqx, emqx_eviction_agent, emqx_node_rebalance]
),
ok = snabbkaffe:start_trace(),
[{cluster_nodes, ClusterNodes} | Config].
end_per_testcase(_Case, Config) ->
ok = snabbkaffe:stop(),
stop_cluster(?config(cluster_nodes, Config)).
%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
t_rebalance(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
Nodes = [DonorNode, RecipientNode],
Conns = emqtt_connect_many(DonorPort, 500),
Opts = #{
conn_evict_rate => 10,
sess_evict_rate => 10,
evict_interval => 10,
abs_conn_threshold => 50,
abs_sess_threshold => 50,
rel_conn_threshold => 1.0,
rel_sess_threshold => 1.0,
wait_health_check => 0.01,
wait_takeover => 0.01,
nodes => Nodes
},
?assertWaitEvent(
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
#{?snk_kind := emqx_node_rebalance_evict_sess_over},
10000
),
DonorConnCount = rpc:call(DonorNode, emqx_eviction_agent, connection_count, []),
DonorSessCount = rpc:call(DonorNode, emqx_eviction_agent, session_count, []),
DonorDSessCount = rpc:call(DonorNode, emqx_eviction_agent, session_count, [disconnected]),
RecipientConnCount = rpc:call(RecipientNode, emqx_eviction_agent, connection_count, []),
RecipientSessCount = rpc:call(RecipientNode, emqx_eviction_agent, session_count, []),
RecipientDSessCount = rpc:call(RecipientNode, emqx_eviction_agent, session_count, [disconnected]),
ct:pal(
"Donor: conn=~p, sess=~p, dsess=~p",
[DonorConnCount, DonorSessCount, DonorDSessCount]
),
ct:pal(
"Recipient: conn=~p, sess=~p, dsess=~p",
[RecipientConnCount, RecipientSessCount, RecipientDSessCount]
),
?assert(DonorConnCount - 50 =< RecipientConnCount),
?assert(DonorDSessCount - 50 =< RecipientDSessCount),
ok = stop_many(Conns).
t_rebalance_node_crash(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
Nodes = [DonorNode, RecipientNode],
Conns = emqtt_connect_many(DonorPort, 500),
Opts = #{
conn_evict_rate => 10,
sess_evict_rate => 10,
evict_interval => 10,
abs_conn_threshold => 50,
abs_sess_threshold => 50,
rel_conn_threshold => 1.0,
rel_sess_threshold => 1.0,
wait_health_check => 0.01,
wait_takeover => 0.01,
nodes => Nodes
},
?assertWaitEvent(
begin
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
emqx_common_test_helpers:stop_peer(RecipientNode)
end,
#{?snk_kind := emqx_node_rebalance_started},
1000
),
?assertEqual(
disabled,
rpc:call(DonorNode, emqx_node_rebalance, status, [])
),
ok = stop_many(Conns).
t_no_need_to_rebalance(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
Nodes = [DonorNode, RecipientNode],
Opts = #{
conn_evict_rate => 10,
sess_evict_rate => 10,
evict_interval => 10,
abs_conn_threshold => 50,
abs_sess_threshold => 50,
rel_conn_threshold => 1.0,
rel_sess_threshold => 1.0,
wait_health_check => 0.01,
wait_takeover => 0.01,
nodes => Nodes
},
?assertEqual(
{error, nothing_to_balance},
rpc:call(DonorNode, emqx_node_rebalance, start, [Opts])
),
Conns = emqtt_connect_many(DonorPort, 50),
?assertEqual(
{error, nothing_to_balance},
rpc:call(DonorNode, emqx_node_rebalance, start, [Opts])
),
ok = stop_many(Conns).
t_unknown_mesages(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
Nodes = [DonorNode, RecipientNode],
Conns = emqtt_connect_many(DonorPort, 500),
Opts = #{
wait_health_check => 100,
abs_conn_threshold => 50,
nodes => Nodes
},
Pid = rpc:call(DonorNode, erlang, whereis, [emqx_node_rebalance]),
Pid ! unknown,
ok = gen_server:cast(Pid, unknown),
?assertEqual(
ignored,
gen_server:call(Pid, unknown)
),
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
Pid ! unknown,
ok = gen_server:cast(Pid, unknown),
?assertEqual(
ignored,
gen_server:call(Pid, unknown)
),
ok = stop_many(Conns).
t_available_nodes(Config) ->
[{DonorNode, _DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
%% Start eviction agent on RecipientNode so that it will be "occupied"
%% and not available for rebalance
ok = rpc:call(RecipientNode, emqx_eviction_agent, enable, [test_rebalance, undefined]),
%% Only DonorNode should be is available for rebalance, since RecipientNode is "occupied"
?assertEqual(
[DonorNode],
rpc:call(
DonorNode,
emqx_node_rebalance,
available_nodes,
[[DonorNode, RecipientNode]]
)
).
t_before_health_check_over(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
Nodes = [DonorNode, RecipientNode],
Conns = emqtt_connect_many(DonorPort, 50),
Opts = #{
conn_evict_rate => 1,
sess_evict_rate => 1,
evict_interval => 1000,
abs_conn_threshold => 1,
abs_sess_threshold => 1,
rel_conn_threshold => 1.0,
rel_sess_threshold => 1.0,
wait_health_check => 2,
wait_takeover => 100,
nodes => Nodes
},
?assertWaitEvent(
begin
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
?assertMatch(
ok,
emqtt_try_connect([{port, DonorPort}])
)
end,
#{?snk_kind := node_rebalance_enable_started_prohibiting},
5000
),
?assertMatch(
{error, {use_another_server, #{}}},
emqtt_try_connect([{port, DonorPort}])
),
stop_many(Conns).