277 lines
7.7 KiB
Erlang
277 lines
7.7 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_node_rebalance_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("emqx/include/asserts.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-import(
|
|
emqx_eviction_agent_test_helpers,
|
|
[
|
|
emqtt_connect_many/1,
|
|
emqtt_connect_many/2,
|
|
emqtt_try_connect/1,
|
|
stop_many/1,
|
|
case_specific_node_name/3,
|
|
start_cluster/3,
|
|
stop_cluster/1
|
|
]
|
|
).
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
Apps = emqx_cth_suite:start([emqx], #{
|
|
work_dir => ?config(priv_dir, Config)
|
|
}),
|
|
[{apps, Apps} | Config].
|
|
|
|
end_per_suite(Config) ->
|
|
emqx_cth_suite:stop(?config(apps, Config)).
|
|
|
|
init_per_testcase(Case, Config) ->
|
|
NodeNames =
|
|
[
|
|
case_specific_node_name(?MODULE, Case, '_donor'),
|
|
case_specific_node_name(?MODULE, Case, '_recipient')
|
|
],
|
|
ClusterNodes = start_cluster(
|
|
Config,
|
|
NodeNames,
|
|
[emqx, emqx_eviction_agent, emqx_node_rebalance]
|
|
),
|
|
ok = snabbkaffe:start_trace(),
|
|
[{cluster_nodes, ClusterNodes} | Config].
|
|
|
|
end_per_testcase(_Case, Config) ->
|
|
ok = snabbkaffe:stop(),
|
|
stop_cluster(?config(cluster_nodes, Config)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Tests
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_rebalance(Config) ->
|
|
process_flag(trap_exit, true),
|
|
|
|
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
|
|
|
|
Nodes = [DonorNode, RecipientNode],
|
|
|
|
Conns = emqtt_connect_many(DonorPort, 500),
|
|
|
|
Opts = #{
|
|
conn_evict_rate => 10,
|
|
sess_evict_rate => 10,
|
|
evict_interval => 10,
|
|
abs_conn_threshold => 50,
|
|
abs_sess_threshold => 50,
|
|
rel_conn_threshold => 1.0,
|
|
rel_sess_threshold => 1.0,
|
|
wait_health_check => 0.01,
|
|
wait_takeover => 0.01,
|
|
nodes => Nodes
|
|
},
|
|
|
|
?assertWaitEvent(
|
|
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
|
|
#{?snk_kind := emqx_node_rebalance_evict_sess_over},
|
|
10000
|
|
),
|
|
|
|
DonorConnCount = rpc:call(DonorNode, emqx_eviction_agent, connection_count, []),
|
|
DonorSessCount = rpc:call(DonorNode, emqx_eviction_agent, session_count, []),
|
|
DonorDSessCount = rpc:call(DonorNode, emqx_eviction_agent, session_count, [disconnected]),
|
|
|
|
RecipientConnCount = rpc:call(RecipientNode, emqx_eviction_agent, connection_count, []),
|
|
RecipientSessCount = rpc:call(RecipientNode, emqx_eviction_agent, session_count, []),
|
|
RecipientDSessCount = rpc:call(RecipientNode, emqx_eviction_agent, session_count, [disconnected]),
|
|
|
|
ct:pal(
|
|
"Donor: conn=~p, sess=~p, dsess=~p",
|
|
[DonorConnCount, DonorSessCount, DonorDSessCount]
|
|
),
|
|
ct:pal(
|
|
"Recipient: conn=~p, sess=~p, dsess=~p",
|
|
[RecipientConnCount, RecipientSessCount, RecipientDSessCount]
|
|
),
|
|
|
|
?assert(DonorConnCount - 50 =< RecipientConnCount),
|
|
?assert(DonorDSessCount - 50 =< RecipientDSessCount),
|
|
|
|
ok = stop_many(Conns).
|
|
|
|
t_rebalance_node_crash(Config) ->
|
|
process_flag(trap_exit, true),
|
|
|
|
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
|
|
|
|
Nodes = [DonorNode, RecipientNode],
|
|
|
|
Conns = emqtt_connect_many(DonorPort, 500),
|
|
|
|
Opts = #{
|
|
conn_evict_rate => 10,
|
|
sess_evict_rate => 10,
|
|
evict_interval => 10,
|
|
abs_conn_threshold => 50,
|
|
abs_sess_threshold => 50,
|
|
rel_conn_threshold => 1.0,
|
|
rel_sess_threshold => 1.0,
|
|
wait_health_check => 0.01,
|
|
wait_takeover => 0.01,
|
|
nodes => Nodes
|
|
},
|
|
|
|
?assertWaitEvent(
|
|
begin
|
|
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
|
|
emqx_common_test_helpers:stop_peer(RecipientNode)
|
|
end,
|
|
#{?snk_kind := emqx_node_rebalance_started},
|
|
1000
|
|
),
|
|
|
|
?assertEqual(
|
|
disabled,
|
|
rpc:call(DonorNode, emqx_node_rebalance, status, [])
|
|
),
|
|
|
|
ok = stop_many(Conns).
|
|
|
|
t_no_need_to_rebalance(Config) ->
|
|
process_flag(trap_exit, true),
|
|
|
|
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
|
|
|
|
Nodes = [DonorNode, RecipientNode],
|
|
|
|
Opts = #{
|
|
conn_evict_rate => 10,
|
|
sess_evict_rate => 10,
|
|
evict_interval => 10,
|
|
abs_conn_threshold => 50,
|
|
abs_sess_threshold => 50,
|
|
rel_conn_threshold => 1.0,
|
|
rel_sess_threshold => 1.0,
|
|
wait_health_check => 0.01,
|
|
wait_takeover => 0.01,
|
|
nodes => Nodes
|
|
},
|
|
|
|
?assertEqual(
|
|
{error, nothing_to_balance},
|
|
rpc:call(DonorNode, emqx_node_rebalance, start, [Opts])
|
|
),
|
|
|
|
Conns = emqtt_connect_many(DonorPort, 50),
|
|
|
|
?assertEqual(
|
|
{error, nothing_to_balance},
|
|
rpc:call(DonorNode, emqx_node_rebalance, start, [Opts])
|
|
),
|
|
|
|
ok = stop_many(Conns).
|
|
|
|
t_unknown_mesages(Config) ->
|
|
process_flag(trap_exit, true),
|
|
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
|
|
|
|
Nodes = [DonorNode, RecipientNode],
|
|
|
|
Conns = emqtt_connect_many(DonorPort, 500),
|
|
|
|
Opts = #{
|
|
wait_health_check => 100,
|
|
abs_conn_threshold => 50,
|
|
nodes => Nodes
|
|
},
|
|
|
|
Pid = rpc:call(DonorNode, erlang, whereis, [emqx_node_rebalance]),
|
|
|
|
Pid ! unknown,
|
|
ok = gen_server:cast(Pid, unknown),
|
|
?assertEqual(
|
|
ignored,
|
|
gen_server:call(Pid, unknown)
|
|
),
|
|
|
|
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
|
|
|
|
Pid ! unknown,
|
|
ok = gen_server:cast(Pid, unknown),
|
|
?assertEqual(
|
|
ignored,
|
|
gen_server:call(Pid, unknown)
|
|
),
|
|
|
|
ok = stop_many(Conns).
|
|
|
|
t_available_nodes(Config) ->
|
|
[{DonorNode, _DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
|
|
|
|
%% Start eviction agent on RecipientNode so that it will be "occupied"
|
|
%% and not available for rebalance
|
|
ok = rpc:call(RecipientNode, emqx_eviction_agent, enable, [test_rebalance, undefined]),
|
|
|
|
%% Only DonorNode should be is available for rebalance, since RecipientNode is "occupied"
|
|
?assertEqual(
|
|
[DonorNode],
|
|
rpc:call(
|
|
DonorNode,
|
|
emqx_node_rebalance,
|
|
available_nodes,
|
|
[[DonorNode, RecipientNode]]
|
|
)
|
|
).
|
|
|
|
t_before_health_check_over(Config) ->
|
|
process_flag(trap_exit, true),
|
|
|
|
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
|
|
|
|
Nodes = [DonorNode, RecipientNode],
|
|
|
|
Conns = emqtt_connect_many(DonorPort, 50),
|
|
|
|
Opts = #{
|
|
conn_evict_rate => 1,
|
|
sess_evict_rate => 1,
|
|
evict_interval => 1000,
|
|
abs_conn_threshold => 1,
|
|
abs_sess_threshold => 1,
|
|
rel_conn_threshold => 1.0,
|
|
rel_sess_threshold => 1.0,
|
|
wait_health_check => 2,
|
|
wait_takeover => 100,
|
|
nodes => Nodes
|
|
},
|
|
|
|
?assertWaitEvent(
|
|
begin
|
|
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
|
|
?assertMatch(
|
|
ok,
|
|
emqtt_try_connect([{port, DonorPort}])
|
|
)
|
|
end,
|
|
#{?snk_kind := node_rebalance_enable_started_prohibiting},
|
|
5000
|
|
),
|
|
?assertMatch(
|
|
{error, {use_another_server, #{}}},
|
|
emqtt_try_connect([{port, DonorPort}])
|
|
),
|
|
|
|
stop_many(Conns).
|