feat(rebalance): improve rebalance usability

* make availability API endpoint public
* allow connections during wait_health_check interval
* make availability status calculation more consistent and lightweight
* refactor test to get rid of some mocks and to use cth
This commit is contained in:
Ilya Averyanov 2023-11-17 12:32:16 +03:00
parent 1395f1c424
commit e93e9ed108
24 changed files with 657 additions and 330 deletions

View File

@ -43,6 +43,7 @@
{emqx_mgmt_trace,2}.
{emqx_node_rebalance,1}.
{emqx_node_rebalance,2}.
{emqx_node_rebalance,3}.
{emqx_node_rebalance_api,1}.
{emqx_node_rebalance_api,2}.
{emqx_node_rebalance_evacuation,1}.

View File

@ -1,6 +1,6 @@
{application, emqx_eviction_agent, [
{description, "EMQX Eviction Agent"},
{vsn, "5.1.4"},
{vsn, "5.1.5"},
{registered, [
emqx_eviction_agent_sup,
emqx_eviction_agent,

View File

@ -15,8 +15,11 @@
-export([
start_link/0,
enable/2,
enable/3,
default_options/0,
disable/1,
status/0,
enable_status/0,
connection_count/0,
all_channels_count/0,
session_count/0,
@ -51,7 +54,7 @@
unhook/0
]).
-export_type([server_reference/0]).
-export_type([server_reference/0, kind/0, options/0]).
-define(CONN_MODULES, [
emqx_connection, emqx_ws_connection, emqx_quic_connection, emqx_eviction_agent_channel
@ -67,15 +70,31 @@
connections := non_neg_integer(),
sessions := non_neg_integer()
}.
-type kind() :: atom().
%% kind() is any() because it was not exported previously
%% and bpapi checker remembered it as any()
-type kind() :: any().
-type options() :: #{
allow_connections => boolean()
}.
-spec start_link() -> startlink_ret().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec default_options() -> options().
default_options() ->
#{
allow_connections => false
}.
-spec enable(kind(), server_reference()) -> ok_or_error(eviction_agent_busy).
enable(Kind, ServerReference) ->
gen_server:call(?MODULE, {enable, Kind, ServerReference}).
gen_server:call(?MODULE, {enable, Kind, ServerReference, default_options()}).
-spec enable(kind(), server_reference(), options()) -> ok_or_error(eviction_agent_busy).
enable(Kind, ServerReference, #{} = Options) ->
gen_server:call(?MODULE, {enable, Kind, ServerReference, Options}).
-spec disable(kind()) -> ok.
disable(Kind) ->
@ -84,16 +103,20 @@ disable(Kind) ->
-spec status() -> status().
status() ->
case enable_status() of
{enabled, _Kind, _ServerReference} ->
{enabled, _Kind, _ServerReference, _Options} ->
{enabled, stats()};
disabled ->
disabled
end.
-spec enable_status() -> disabled | {enabled, kind(), server_reference(), options()}.
enable_status() ->
persistent_term:get(?MODULE, disabled).
-spec evict_connections(pos_integer()) -> ok_or_error(disabled).
evict_connections(N) ->
case enable_status() of
{enabled, _Kind, ServerReference} ->
{enabled, _Kind, ServerReference, _Options} ->
ok = do_evict_connections(N, ServerReference);
disabled ->
{error, disabled}
@ -112,15 +135,16 @@ evict_sessions(N, Nodes, ConnState) when
is_list(Nodes) andalso length(Nodes) > 0
->
case enable_status() of
{enabled, _Kind, _ServerReference} ->
{enabled, _Kind, _ServerReference, _Options} ->
ok = do_evict_sessions(N, Nodes, ConnState);
disabled ->
{error, disabled}
end.
-spec purge_sessions(non_neg_integer()) -> ok_or_error(disabled).
purge_sessions(N) ->
case enable_status() of
{enabled, _Kind, _ServerReference} ->
{enabled, _Kind, _ServerReference, _Options} ->
ok = do_purge_sessions(N);
disabled ->
{error, disabled}
@ -135,14 +159,14 @@ init([]) ->
{ok, #{}}.
%% enable
handle_call({enable, Kind, ServerReference}, _From, St) ->
handle_call({enable, Kind, ServerReference, Options}, _From, St) ->
Reply =
case enable_status() of
disabled ->
ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference});
{enabled, Kind, _ServerReference} ->
ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference});
{enabled, _OtherKind, _ServerReference} ->
ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference, Options});
{enabled, Kind, _ServerReference, _Options} ->
ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference, Options});
{enabled, _OtherKind, _ServerReference, _Options} ->
{error, eviction_agent_busy}
end,
{reply, Reply, St};
@ -152,10 +176,10 @@ handle_call({disable, Kind}, _From, St) ->
case enable_status() of
disabled ->
{error, disabled};
{enabled, Kind, _ServerReference} ->
{enabled, Kind, _ServerReference, _Options} ->
_ = persistent_term:erase(?MODULE),
ok;
{enabled, _OtherKind, _ServerReference} ->
{enabled, _OtherKind, _ServerReference, _Options} ->
{error, eviction_agent_busy}
end,
{reply, Reply, St};
@ -180,8 +204,10 @@ code_change(_Vsn, State, _Extra) ->
on_connect(_ConnInfo, _Props) ->
case enable_status() of
{enabled, _Kind, _ServerReference} ->
{enabled, _Kind, _ServerReference, #{allow_connections := false}} ->
{stop, {error, ?RC_USE_ANOTHER_SERVER}};
{enabled, _Kind, _ServerReference, _Options} ->
ignore;
disabled ->
ignore
end.
@ -192,7 +218,7 @@ on_connack(
Props
) ->
case enable_status() of
{enabled, _Kind, ServerReference} ->
{enabled, _Kind, ServerReference, _Options} ->
{ok, Props#{'Server-Reference' => ServerReference}};
disabled ->
{ok, Props}
@ -214,10 +240,10 @@ unhook() ->
ok = emqx_hooks:del('client.connect', {?MODULE, on_connect}),
ok = emqx_hooks:del('client.connack', {?MODULE, on_connack}).
enable_status() ->
persistent_term:get(?MODULE, disabled).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
% connection management
stats() ->
#{
connections => connection_count(),

View File

@ -15,7 +15,11 @@
-import(
emqx_eviction_agent_test_helpers,
[emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, emqtt_connect_for_publish/1]
[
emqtt_connect/0, emqtt_connect/1, emqtt_connect/2,
emqtt_connect_for_publish/1,
case_specific_node_name/1
]
).
-define(assertPrinted(Printed, Code),
@ -29,11 +33,19 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_eviction_agent]),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_eviction_agent
],
#{
work_dir => emqx_cth_suite:work_dir(Config)
}
),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_eviction_agent]).
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) ->
_ = emqx_eviction_agent:disable(test_eviction),
@ -41,10 +53,17 @@ init_per_testcase(Case, Config) ->
start_slave(Case, Config).
start_slave(t_explicit_session_takeover, Config) ->
NodeNames =
[
t_explicit_session_takeover_donor,
t_explicit_session_takeover_recipient
],
ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
[{evacuate_test1, 2883}, {evacuate_test2, 3883}],
[emqx_eviction_agent]
Config,
NodeNames,
[emqx_conf, emqx, emqx_eviction_agent]
),
ok = snabbkaffe:start_trace(),
[{evacuate_nodes, ClusterNodes} | Config];
start_slave(_Case, Config) ->
Config.
@ -56,8 +75,7 @@ end_per_testcase(TestCase, Config) ->
stop_slave(t_explicit_session_takeover, Config) ->
emqx_eviction_agent_test_helpers:stop_cluster(
?config(evacuate_nodes, Config),
[emqx_eviction_agent]
?config(evacuate_nodes, Config)
);
stop_slave(_Case, _Config) ->
ok.
@ -77,13 +95,16 @@ t_enable_disable(_Config) ->
{ok, C0} = emqtt_connect(),
ok = emqtt:disconnect(C0),
%% Enable
ok = emqx_eviction_agent:enable(test_eviction, undefined),
%% Can't enable with different kind
?assertMatch(
{error, eviction_agent_busy},
emqx_eviction_agent:enable(bar, undefined)
),
%% Enable with the same kind but different server ref
?assertMatch(
ok,
emqx_eviction_agent:enable(test_eviction, <<"srv">>)
@ -99,6 +120,39 @@ t_enable_disable(_Config) ->
emqtt_connect()
),
%% Enable with the same kind and server ref and explicit options
?assertMatch(
ok,
emqx_eviction_agent:enable(test_eviction, <<"srv">>, #{allow_connections => false})
),
?assertMatch(
{enabled, #{}},
emqx_eviction_agent:status()
),
?assertMatch(
{error, {use_another_server, #{}}},
emqtt_connect()
),
%% Enable with the same kind and server ref and permissive options
?assertMatch(
ok,
emqx_eviction_agent:enable(test_eviction, <<"srv">>, #{allow_connections => true})
),
?assertMatch(
{enabled, #{}},
emqx_eviction_agent:status()
),
?assertMatch(
{ok, _},
emqtt_connect()
),
%% Can't enable using different kind
?assertMatch(
{error, eviction_agent_busy},
emqx_eviction_agent:disable(bar)

View File

@ -22,12 +22,23 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_eviction_agent]),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_eviction_agent,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{
work_dir => emqx_cth_suite:work_dir(Config)
}
),
_ = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_eviction_agent]),
Config.
emqx_common_test_http:delete_default_app(),
emqx_cth_suite:stop(?config(apps, Config)).
%%--------------------------------------------------------------------
%% Tests

View File

@ -22,12 +22,20 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_conf, emqx_eviction_agent]),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx,
emqx_eviction_agent
],
#{
work_dir => emqx_cth_suite:work_dir(Config)
}
),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]).
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
%%--------------------------------------------------------------------
%% Tests

View File

@ -14,13 +14,21 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_eviction_agent]),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_eviction_agent
],
#{
work_dir => emqx_cth_suite:work_dir(Config)
}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
_ = emqx_eviction_agent:disable(foo),
emqx_common_test_helpers:stop_apps([emqx_eviction_agent]),
Config.
emqx_cth_suite:stop(?config(apps, Config)).
%%--------------------------------------------------------------------
%% Tests

View File

@ -15,13 +15,15 @@
emqtt_try_connect/1,
start_cluster/2,
start_cluster/3,
stop_cluster/2,
stop_cluster/1,
case_specific_node_name/2,
case_specific_node_name/3,
concat_atoms/1
concat_atoms/1,
get_mqtt_port/2,
nodes_with_mqtt_tcp_ports/1
]).
emqtt_connect() ->
@ -83,52 +85,24 @@ emqtt_try_connect(Opts) ->
Error
end.
start_cluster(NamesWithPorts, Apps) ->
start_cluster(NamesWithPorts, Apps, []).
start_cluster(NamesWithPorts, Apps, Env) ->
Specs = lists:map(
fun({ShortName, Port}) ->
{core, ShortName, #{listener_ports => [{tcp, Port}]}}
end,
NamesWithPorts
start_cluster(Config, NodeNames = [Node1 | _], Apps) ->
Spec = #{
role => core,
join_to => emqx_cth_cluster:node_name(Node1),
listeners => true,
apps => Apps
},
Cluster = [{NodeName, Spec} || NodeName <- NodeNames],
ClusterNodes = emqx_cth_cluster:start(
Cluster,
%% Use Node1 to scope the work dirs for all the nodes
#{work_dir => emqx_cth_suite:work_dir(Node1, Config)}
),
Opts0 = [
{env, Env},
{apps, Apps},
{conf,
[{[listeners, Proto, default, enable], false} || Proto <- [ssl, ws, wss]] ++
[{[rpc, mode], async}]}
],
Cluster = emqx_common_test_helpers:emqx_cluster(
Specs,
Opts0
),
NodesWithPorts = [
{
emqx_common_test_helpers:start_slave(Name, Opts),
proplists:get_value(Name, NamesWithPorts)
}
|| {Name, Opts} <- Cluster
],
NodesWithPorts.
nodes_with_mqtt_tcp_ports(ClusterNodes).
stop_cluster(NodesWithPorts, Apps) ->
lists:foreach(
fun({Node, _Port}) ->
lists:foreach(
fun(App) ->
rpc:call(Node, application, stop, [App])
end,
Apps
),
%% This sleep is just to make logs cleaner
ct:sleep(100),
_ = rpc:call(Node, emqx_common_test_helpers, stop_apps, []),
emqx_common_test_helpers:stop_slave(Node)
end,
NodesWithPorts
).
stop_cluster(NamesWithPorts) ->
{Nodes, _Ports} = lists:unzip(NamesWithPorts),
ok = emqx_cth_cluster:stop(Nodes).
case_specific_node_name(Module, Case) ->
concat_atoms([Module, '__', Case]).
@ -145,3 +119,15 @@ concat_atoms(Atoms) ->
)
)
).
get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port.
nodes_with_mqtt_tcp_ports(Nodes) ->
lists:map(
fun(Node) ->
{Node, get_mqtt_port(Node, tcp)}
end,
Nodes
).

View File

@ -1,11 +1,12 @@
{application, emqx_node_rebalance, [
{description, "EMQX Node Rebalance"},
{vsn, "5.0.6"},
{vsn, "5.0.7"},
{registered, [
emqx_node_rebalance_sup,
emqx_node_rebalance,
emqx_node_rebalance_agent,
emqx_node_rebalance_evacuation
emqx_node_rebalance_evacuation,
emqx_node_rebalance_purge
]},
{applications, [
kernel,

View File

@ -41,6 +41,8 @@
start_error/0
]).
-define(ENABLE_KIND, ?MODULE).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
@ -143,9 +145,13 @@ handle_event(
state_timeout,
evict_conns,
wait_health_check,
Data
#{donors := DonorNodes} = Data
) ->
?SLOG(warning, #{msg => "node_rebalance_wait_health_check_over"}),
_ = multicall(DonorNodes, enable_rebalance_agent, [
self(), ?ENABLE_KIND, #{allow_connections => false}
]),
?tp(debug, node_rebalance_enable_started_prohibiting, #{}),
{next_state, evicting_conns, Data, [{state_timeout, 0, evict_conns}]};
handle_event(
state_timeout,
@ -232,7 +238,9 @@ enable_rebalance(#{opts := Opts} = Data) ->
false ->
{error, nothing_to_balance};
true ->
_ = multicall(DonorNodes, enable_rebalance_agent, [self()]),
_ = multicall(DonorNodes, enable_rebalance_agent, [
self(), ?ENABLE_KIND, #{allow_connections => true}
]),
{ok, Data#{
donors => DonorNodes,
recipients => RecipientNodes,
@ -242,7 +250,7 @@ enable_rebalance(#{opts := Opts} = Data) ->
end.
disable_rebalance(#{donors := DonorNodes}) ->
_ = multicall(DonorNodes, disable_rebalance_agent, [self()]),
_ = multicall(DonorNodes, disable_rebalance_agent, [self(), ?ENABLE_KIND]),
ok.
evict_conns(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts} = Data) ->
@ -370,7 +378,7 @@ avg(List) when length(List) >= 1 ->
lists:sum(List) / length(List).
multicall(Nodes, F, A) ->
case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
case apply(emqx_node_rebalance_proto_v3, F, [Nodes | A]) of
{Results, []} ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{OkResults, []} ->

View File

@ -11,10 +11,13 @@
-include_lib("stdlib/include/qlc.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(gen_statem).
-export([
start_link/0,
enable/1,
enable/2,
enable/3,
disable/1,
disable/2,
status/0
@ -22,13 +25,13 @@
-export([
init/1,
handle_call/3,
handle_info/2,
handle_cast/2,
code_change/3
callback_mode/0,
handle_event/4,
code_change/4
]).
-define(ENABLE_KIND, emqx_node_rebalance).
-define(SERVER_REFERENCE, undefined).
%%--------------------------------------------------------------------
%% APIs
@ -38,16 +41,21 @@
-spec start_link() -> startlink_ret().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec enable(pid()) -> ok_or_error(already_enabled | eviction_agent_busy).
enable(CoordinatorPid) ->
enable(CoordinatorPid, ?ENABLE_KIND).
-spec enable(pid(), emqx_eviction_agent:kind()) ->
ok_or_error(already_enabled | eviction_agent_busy).
ok_or_error(invalid_coordinator | eviction_agent_busy).
enable(CoordinatorPid, Kind) ->
gen_server:call(?MODULE, {enable, CoordinatorPid, Kind}).
enable(CoordinatorPid, Kind, emqx_eviction_agent:default_options()).
-spec enable(pid(), emqx_eviction_agent:kind(), emqx_eviction_agent:options()) ->
ok_or_error(invalid_coordinator | eviction_agent_busy).
enable(CoordinatorPid, Kind, Options) ->
gen_statem:call(?MODULE, {enable, CoordinatorPid, Kind, Options}).
-spec disable(pid()) -> ok_or_error(already_disabled | invalid_coordinator).
disable(CoordinatorPid) ->
@ -56,88 +64,113 @@ disable(CoordinatorPid) ->
-spec disable(pid(), emqx_eviction_agent:kind()) ->
ok_or_error(already_disabled | invalid_coordinator).
disable(CoordinatorPid, Kind) ->
gen_server:call(?MODULE, {disable, CoordinatorPid, Kind}).
gen_statem:call(?MODULE, {disable, CoordinatorPid, Kind}).
-spec status() -> status().
status() ->
gen_server:call(?MODULE, status).
gen_statem:call(?MODULE, status).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% gen_statem callbacks
%%--------------------------------------------------------------------
-define(disabled, disabled).
-define(enabled(ST), {enabled, ST}).
callback_mode() ->
handle_event_function.
init([]) ->
{ok, #{}}.
{ok, ?disabled, #{}}.
handle_call({enable, CoordinatorPid, Kind}, _From, St) ->
case St of
#{coordinator_pid := _Pid} ->
{reply, {error, already_enabled}, St};
_ ->
true = link(CoordinatorPid),
EvictionAgentPid = whereis(emqx_eviction_agent),
true = link(EvictionAgentPid),
case emqx_eviction_agent:enable(Kind, undefined) of
ok ->
{reply, ok, #{
coordinator_pid => CoordinatorPid,
eviction_agent_pid => EvictionAgentPid
}};
{error, eviction_agent_busy} ->
true = unlink(EvictionAgentPid),
true = unlink(CoordinatorPid),
{reply, {error, eviction_agent_busy}, St}
end
end;
handle_call({disable, CoordinatorPid, Kind}, _From, St) ->
case St of
#{
coordinator_pid := CoordinatorPid,
eviction_agent_pid := EvictionAgentPid
} ->
_ = emqx_eviction_agent:disable(Kind),
%% disabled status
%% disabled status, enable command
handle_event({call, From}, {enable, CoordinatorPid, Kind, Options}, ?disabled, Data) ->
true = link(CoordinatorPid),
EvictionAgentPid = whereis(emqx_eviction_agent),
true = link(EvictionAgentPid),
case emqx_eviction_agent:enable(Kind, ?SERVER_REFERENCE, Options) of
ok ->
{next_state,
?enabled(#{
coordinator_pid => CoordinatorPid,
eviction_agent_pid => EvictionAgentPid,
kind => Kind
}), Data, {reply, From, ok}};
{error, eviction_agent_busy} ->
true = unlink(EvictionAgentPid),
true = unlink(CoordinatorPid),
NewSt = maps:without(
[coordinator_pid, eviction_agent_pid],
St
),
{reply, ok, NewSt};
#{coordinator_pid := _CoordinatorPid} ->
{reply, {error, invalid_coordinator}, St};
#{} ->
{reply, {error, already_disabled}, St}
{keep_state_and_data, {reply, From, {error, eviction_agent_busy}}}
end;
handle_call(status, _From, St) ->
case St of
#{coordinator_pid := Pid} ->
{reply, {enabled, Pid}, St};
_ ->
{reply, disabled, St}
end;
handle_call(Msg, _From, St) ->
%% disabled status, disable command
handle_event({call, From}, {disable, _CoordinatorPid, _Kind}, ?disabled, _Data) ->
{keep_state_and_data, {reply, From, {error, already_disabled}}};
%% disabled status, status command
handle_event({call, From}, status, ?disabled, _Data) ->
{keep_state_and_data, {reply, From, disabled}};
%% enabled status
%% enabled status, enable command
handle_event(
{call, From},
{enable, CoordinatorPid, Kind, Options},
?enabled(#{
coordinator_pid := CoordinatorPid,
kind := Kind
}),
_Data
) ->
%% just updating options
ok = emqx_eviction_agent:enable(Kind, ?SERVER_REFERENCE, Options),
{keep_state_and_data, {reply, From, ok}};
handle_event({call, From}, {enable, _CoordinatorPid, _Kind, _Options}, ?enabled(_St), _Data) ->
{keep_state_and_data, {reply, From, {error, invalid_coordinator}}};
%% enabled status, disable command
handle_event(
{call, From},
{disable, CoordinatorPid, Kind},
?enabled(#{
coordinator_pid := CoordinatorPid,
eviction_agent_pid := EvictionAgentPid
}),
Data
) ->
_ = emqx_eviction_agent:disable(Kind),
true = unlink(EvictionAgentPid),
true = unlink(CoordinatorPid),
{next_state, ?disabled, Data, {reply, From, ok}};
handle_event({call, From}, {disable, _CoordinatorPid, _Kind}, ?enabled(_St), _Data) ->
{keep_state_and_data, {reply, From, {error, invalid_coordinator}}};
%% enabled status, status command
handle_event({call, From}, status, ?enabled(#{coordinator_pid := CoordinatorPid}), _Data) ->
{keep_state_and_data, {reply, From, {enabled, CoordinatorPid}}};
%% fallbacks
handle_event({call, From}, Msg, State, Data) ->
?SLOG(warning, #{
msg => "unknown_call",
call => Msg,
state => St
state => State,
data => Data
}),
{reply, ignored, St}.
handle_info(Msg, St) ->
?SLOG(warning, #{
msg => "unknown_info",
info => Msg,
state => St
}),
{noreply, St}.
handle_cast(Msg, St) ->
{keep_state_and_data, {reply, From, ignored}};
handle_event(cast, Msg, State, Data) ->
?SLOG(warning, #{
msg => "unknown_cast",
cast => Msg,
state => St
state => State,
data => Data
}),
{noreply, St}.
keep_state_and_data;
handle_event(info, Msg, State, Data) ->
?SLOG(warning, #{
msg => "unknown_info",
info => Msg,
state => State,
data => Data
}),
keep_state_and_data.
code_change(_Vsn, State, _Extra) ->
{ok, State}.
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.

View File

@ -109,7 +109,8 @@ schema("/load_rebalance/availability_check") ->
responses => #{
200 => response_schema(),
503 => error_codes([?NODE_EVACUATING], <<"Node Evacuating">>)
}
},
security => []
}
};
schema("/load_rebalance/:node/start") ->
@ -248,10 +249,10 @@ schema("/load_rebalance/:node/evacuation/stop") ->
}}.
'/load_rebalance/availability_check'(get, #{}) ->
case emqx_node_rebalance_status:local_status() of
disabled ->
case emqx_node_rebalance_status:availability_status() of
available ->
{200, #{}};
_ ->
unavailable ->
error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
end.

View File

@ -57,7 +57,7 @@
migrate_to => migrate_to(),
wait_health_check => number()
}.
-type start_error() :: already_started.
-type start_error() :: already_started | eviction_agent_busy.
-type stats() :: #{
initial_conns := non_neg_integer(),
initial_sessions := non_neg_integer(),
@ -102,9 +102,9 @@ callback_mode() -> handle_event_function.
init([]) ->
case emqx_node_rebalance_evacuation_persist:read(default_opts()) of
{ok, #{server_reference := ServerReference} = Opts} ->
{ok, Opts} ->
?SLOG(warning, #{msg => "restoring_evacuation_state", opts => Opts}),
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
case enable_eviction_agent(Opts, _AllowConnections = false) of
ok ->
Data = init_data(#{}, Opts),
ok = warn_enabled(),
@ -122,18 +122,26 @@ handle_event(
{call, From},
{start, #{wait_health_check := WaitHealthCheck} = Opts},
disabled,
#{} = Data
Data
) ->
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
{next_state, waiting_health_check, NewData, [
{state_timeout, seconds(WaitHealthCheck), start_eviction},
{reply, From, ok}
]};
case enable_eviction_agent(Opts, _AllowConnections = true) of
ok ->
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
{next_state, waiting_health_check, NewData, [
{state_timeout, seconds(WaitHealthCheck), start_eviction},
{reply, From, ok}
]};
{error, eviction_agent_busy} ->
?tp(warning, eviction_agent_busy, #{
data => Data
}),
{keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]}
end;
handle_event({call, From}, {start, _Opts}, _State, #{}) ->
{keep_state_and_data, [{reply, From, {error, already_started}}]};
%% stop
@ -168,9 +176,9 @@ handle_event(
state_timeout,
start_eviction,
waiting_health_check,
#{server_reference := ServerReference} = Data
Data
) ->
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
case enable_eviction_agent(Data, _AllowConnections = false) of
ok ->
?tp(debug, eviction_agent_started, #{
data => Data
@ -178,10 +186,8 @@ handle_event(
{next_state, evicting_conns, Data, [
{state_timeout, 0, evict_conns}
]};
%% This should never happen
{error, eviction_agent_busy} ->
?tp(warning, eviction_agent_busy, #{
data => Data
}),
{next_state, disabled, deinit(Data)}
end;
%% conn eviction
@ -212,7 +218,7 @@ handle_event(
NewData = Data#{current_conns => 0},
?SLOG(warning, #{msg => "node_evacuation_evict_conns_done"}),
{next_state, waiting_takeover, NewData, [
{state_timeout, timer:seconds(WaitTakeover), evict_sessions}
{state_timeout, seconds(WaitTakeover), evict_sessions}
]}
end;
handle_event(
@ -308,6 +314,9 @@ deinit(Data) ->
maps:keys(default_opts()),
maps:without(Keys, Data).
enable_eviction_agent(#{server_reference := ServerReference} = _Opts, AllowConnections) ->
emqx_eviction_agent:enable(?MODULE, ServerReference, #{allow_connections => AllowConnections}).
warn_enabled() ->
?SLOG(warning, #{msg => "node_evacuation_enabled"}),
io:format(

View File

@ -199,7 +199,7 @@ deinit(Data) ->
maps:without(Keys, Data).
multicall(Nodes, F, A) ->
case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
case apply(emqx_node_rebalance_proto_v3, F, [Nodes | A]) of
{Results, []} ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{_OkResults, []} ->

View File

@ -5,6 +5,7 @@
-module(emqx_node_rebalance_status).
-export([
availability_status/0,
local_status/0,
local_status/1,
global_status/0,
@ -23,6 +24,13 @@
%% APIs
%%--------------------------------------------------------------------
-spec availability_status() -> available | unavailable.
availability_status() ->
case emqx_eviction_agent:enable_status() of
{enabled, _Kind, _ServerReference, _Options} -> unavailable;
disabled -> available
end.
-spec local_status() -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}.
local_status() ->
Checks = [

View File

@ -0,0 +1,96 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
available_nodes/1,
evict_connections/2,
evict_sessions/4,
connection_counts/1,
session_counts/1,
enable_rebalance_agent/2,
disable_rebalance_agent/2,
disconnected_session_counts/1,
%% Introduced in v2:
enable_rebalance_agent/3,
disable_rebalance_agent/3,
purge_sessions/2,
%% Introduced in v3:
enable_rebalance_agent/4
]).
-include_lib("emqx/include/bpapi.hrl").
-include_lib("emqx/include/types.hrl").
introduced_in() ->
"5.4.0".
-spec available_nodes([node()]) -> emqx_rpc:multicall_result(node()).
available_nodes(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, is_node_available, []).
-spec evict_connections([node()], non_neg_integer()) ->
emqx_rpc:multicall_result(ok_or_error(disabled)).
evict_connections(Nodes, Count) ->
rpc:multicall(Nodes, emqx_eviction_agent, evict_connections, [Count]).
-spec evict_sessions([node()], non_neg_integer(), [node()], emqx_channel:conn_state()) ->
emqx_rpc:multicall_result(ok_or_error(disabled)).
evict_sessions(Nodes, Count, RecipientNodes, ConnState) ->
rpc:multicall(Nodes, emqx_eviction_agent, evict_sessions, [Count, RecipientNodes, ConnState]).
-spec connection_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
connection_counts(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, connection_count, []).
-spec session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
session_counts(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, session_count, []).
-spec enable_rebalance_agent([node()], pid()) ->
emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
enable_rebalance_agent(Nodes, OwnerPid) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid]).
-spec disable_rebalance_agent([node()], pid()) ->
emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
disable_rebalance_agent(Nodes, OwnerPid) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid]).
-spec disconnected_session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
disconnected_session_counts(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, disconnected_session_count, []).
%% Introduced in v2:
-spec enable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
enable_rebalance_agent(Nodes, OwnerPid, Kind) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind]).
-spec disable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
disable_rebalance_agent(Nodes, OwnerPid, Kind) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid, Kind]).
-spec purge_sessions([node()], non_neg_integer()) ->
emqx_rpc:multicall_result(ok_or_error(disabled)).
purge_sessions(Nodes, Count) ->
rpc:multicall(Nodes, emqx_eviction_agent, purge_sessions, [Count]).
%% Introduced in v3:
-spec enable_rebalance_agent(
[node()], pid(), emqx_eviction_agent:kind(), emqx_eviction_agent:options()
) ->
emqx_rpc:multicall_result(ok_or_error(eviction_agent_busy | invalid_coordinator)).
enable_rebalance_agent(Nodes, OwnerPid, Kind, Options) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind, Options]).

View File

@ -16,39 +16,46 @@
-import(
emqx_eviction_agent_test_helpers,
[emqtt_connect_many/1, emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
[
emqtt_connect_many/1,
emqtt_connect_many/2,
emqtt_try_connect/1,
stop_many/1,
case_specific_node_name/3,
start_cluster/3,
stop_cluster/1
]
).
-define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([]),
ok.
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) ->
ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
NodeNames =
[
{case_specific_node_name(?MODULE, Case, '_donor'), 2883},
{case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
case_specific_node_name(?MODULE, Case, '_donor'),
case_specific_node_name(?MODULE, Case, '_recipient')
],
?START_APPS
ClusterNodes = start_cluster(
Config,
NodeNames,
[emqx, emqx_eviction_agent, emqx_node_rebalance]
),
ok = snabbkaffe:start_trace(),
[{cluster_nodes, ClusterNodes} | Config].
end_per_testcase(_Case, Config) ->
ok = snabbkaffe:stop(),
ok = emqx_eviction_agent_test_helpers:stop_cluster(
?config(cluster_nodes, Config),
?START_APPS
).
stop_cluster(?config(cluster_nodes, Config)).
%%--------------------------------------------------------------------
%% Tests
@ -227,3 +234,43 @@ t_available_nodes(Config) ->
[[DonorNode, RecipientNode]]
)
).
t_before_health_check_over(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config),
Nodes = [DonorNode, RecipientNode],
Conns = emqtt_connect_many(DonorPort, 50),
Opts = #{
conn_evict_rate => 1,
sess_evict_rate => 1,
evict_interval => 1000,
abs_conn_threshold => 1,
abs_sess_threshold => 1,
rel_conn_threshold => 1.0,
rel_sess_threshold => 1.0,
wait_health_check => 2,
wait_takeover => 100,
nodes => Nodes
},
?assertWaitEvent(
begin
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
?assertMatch(
ok,
emqtt_try_connect([{port, DonorPort}])
)
end,
#{?snk_kind := node_rebalance_enable_started_prohibiting},
5000
),
?assertMatch(
{error, {use_another_server, #{}}},
emqtt_try_connect([{port, DonorPort}])
),
stop_many(Conns).

View File

@ -38,12 +38,13 @@ groups() ->
].
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_eviction_agent, emqx_node_rebalance]),
Config.
Apps = emqx_cth_suite:start([emqx, emqx_eviction_agent, emqx_node_rebalance], #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_node_rebalance]),
ok.
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_group(local, Config) ->
[{cluster, false} | Config];
@ -56,9 +57,13 @@ end_per_group(_Group, _Config) ->
init_per_testcase(Case, Config) ->
case ?config(cluster, Config) of
true ->
ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
[{case_specific_node_name(?MODULE, Case), 2883}],
[emqx_eviction_agent, emqx_node_rebalance]
ClusterNodes = emqx_cth_cluster:start(
[
{case_specific_node_name(?MODULE, Case), #{
apps => [emqx, emqx_eviction_agent, emqx_node_rebalance]
}}
],
#{work_dir => emqx_cth_suite:work_dir(Case, Config)}
),
[{cluster_nodes, ClusterNodes} | Config];
false ->
@ -68,10 +73,7 @@ init_per_testcase(Case, Config) ->
end_per_testcase(_Case, Config) ->
case ?config(cluster, Config) of
true ->
emqx_eviction_agent_test_helpers:stop_cluster(
?config(cluster_nodes, Config),
[emqx_eviction_agent, emqx_node_rebalance]
);
emqx_cth_cluster:stop(?config(cluster_nodes, Config));
false ->
ok
end.
@ -94,7 +96,13 @@ t_enable_disable(_Config) ->
),
?assertEqual(
{error, already_enabled},
{error, invalid_coordinator},
emqx_node_rebalance_agent:enable(self(), other_rebalance)
),
%% Options update
?assertEqual(
ok,
emqx_node_rebalance_agent:enable(self())
),
@ -150,7 +158,7 @@ t_unknown_messages(_Config) ->
t_rebalance_agent_coordinator_fail(Config) ->
process_flag(trap_exit, true),
[{Node, _}] = ?config(cluster_nodes, Config),
[Node] = ?config(cluster_nodes, Config),
CoordinatorPid = spawn_link(
fun() ->
@ -189,7 +197,7 @@ t_rebalance_agent_coordinator_fail(Config) ->
t_rebalance_agent_fail(Config) ->
process_flag(trap_exit, true),
[{Node, _}] = ?config(cluster_nodes, Config),
[Node] = ?config(cluster_nodes, Config),
CoordinatorPid = spawn_link(
fun() ->

View File

@ -13,6 +13,7 @@
-import(
emqx_mgmt_api_test_util,
[
request_api/3,
request/2,
request/3,
uri/1
@ -24,18 +25,17 @@
[emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
).
-define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps(?START_APPS),
Config.
Apps = emqx_cth_suite:start([emqx, emqx_eviction_agent, emqx_node_rebalance], #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps(?START_APPS),
ok.
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) ->
DonorNode = case_specific_node_name(?MODULE, Case, '_donor'),
@ -57,7 +57,6 @@ init_per_testcase(Case, Config) ->
[{cluster_nodes, ClusterNodes} | Config].
end_per_testcase(_Case, Config) ->
Nodes = ?config(cluster_nodes, Config),
erpc:multicall(Nodes, meck, unload, []),
_ = emqx_cth_cluster:stop(Nodes),
ok.
@ -473,28 +472,31 @@ t_start_stop_rebalance(Config) ->
t_availability_check(Config) ->
[DonorNode | _] = ?config(cluster_nodes, Config),
?assertMatch(
{ok, 200, #{}},
api_get(["load_rebalance", "availability_check"])
{ok, _},
api_get_noauth(["load_rebalance", "availability_check"])
),
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]),
?assertMatch(
{ok, 503, _},
api_get(["load_rebalance", "availability_check"])
{error, {_, 503, _}},
api_get_noauth(["load_rebalance", "availability_check"])
),
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []),
?assertMatch(
{ok, 200, #{}},
api_get(["load_rebalance", "availability_check"])
{ok, _},
api_get_noauth(["load_rebalance", "availability_check"])
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
api_get_noauth(Path) ->
request_api(get, uri(Path), emqx_common_test_http:auth_header("invalid", "password")).
api_get(Path) ->
case request(get, uri(Path)) of
{ok, Code, ResponseBody} ->

View File

@ -15,27 +15,38 @@
[emqtt_connect_many/2, stop_many/1, case_specific_node_name/3]
).
-define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]).
-define(START_APPS, [emqx, emqx_eviction_agent, emqx_node_rebalance]).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps(?START_APPS),
Config.
Apps = emqx_cth_suite:start(?START_APPS, #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_common_test_helpers:stop_apps(lists:reverse(?START_APPS)),
Config.
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case = t_rebalance, Config) ->
_ = emqx_node_rebalance_evacuation:stop(),
ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
Nodes =
[Node1 | _] =
[
{case_specific_node_name(?MODULE, Case, '_donor'), 2883},
{case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
case_specific_node_name(?MODULE, Case, '_1'),
case_specific_node_name(?MODULE, Case, '_2')
],
?START_APPS
Spec = #{
role => core,
join_to => emqx_cth_cluster:node_name(Node1),
listeners => true,
apps => ?START_APPS
},
Cluster = [{Node, Spec} || Node <- Nodes],
ClusterNodes = emqx_cth_cluster:start(
Cluster,
#{work_dir => emqx_cth_suite:work_dir(Case, Config)}
),
[{cluster_nodes, ClusterNodes} | Config];
init_per_testcase(_Case, Config) ->
@ -46,10 +57,7 @@ init_per_testcase(_Case, Config) ->
end_per_testcase(t_rebalance, Config) ->
_ = emqx_node_rebalance_evacuation:stop(),
_ = emqx_node_rebalance:stop(),
_ = emqx_eviction_agent_test_helpers:stop_cluster(
?config(cluster_nodes, Config),
?START_APPS
);
_ = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
end_per_testcase(_Case, _Config) ->
_ = emqx_node_rebalance_evacuation:stop(),
_ = emqx_node_rebalance:stop().
@ -157,6 +165,8 @@ t_evacuation(_Config) ->
).
t_purge(_Config) ->
process_flag(trap_exit, true),
%% start with invalid args
?assertNot(
emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"])
@ -187,40 +197,44 @@ t_purge(_Config) ->
atom_to_list(node())
])
),
with_some_sessions(fun() ->
?assert(
emqx_node_rebalance_cli:cli([
"start",
"--purge",
"--purge-rate",
"10"
])
),
%% status
ok = emqx_node_rebalance_cli:cli(["status"]),
ok = emqx_node_rebalance_cli:cli(["node-status"]),
ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
Conns = emqtt_connect_many(get_mqtt_port(node(), tcp), 100),
?assertMatch(
{enabled, #{}},
emqx_node_rebalance_purge:status()
),
?assert(
emqx_node_rebalance_cli:cli([
"start",
"--purge",
"--purge-rate",
"10"
])
),
%% status
ok = emqx_node_rebalance_cli:cli(["status"]),
ok = emqx_node_rebalance_cli:cli(["node-status"]),
ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
?assertMatch(
{enabled, #{}},
emqx_node_rebalance_purge:status()
),
%% already enabled
?assertNot(
emqx_node_rebalance_cli:cli([
"start",
"--purge",
"--purge-rate",
"10"
])
),
%% already enabled
?assertNot(
emqx_node_rebalance_cli:cli([
"start",
"--purge",
"--purge-rate",
"10"
])
),
true = emqx_node_rebalance_cli:cli(["stop"]),
ok
end),
%% stop
true = emqx_node_rebalance_cli:cli(["stop"]),
%% stop when not started
false = emqx_node_rebalance_cli:cli(["stop"]),
?assertEqual(
@ -228,12 +242,13 @@ t_purge(_Config) ->
emqx_node_rebalance_purge:status()
),
ok.
ok = stop_many(Conns).
t_rebalance(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
[DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
DonorPort = get_mqtt_port(DonorNode, tcp),
%% start with invalid args
?assertNot(
@ -364,11 +379,6 @@ emqx_node_rebalance_cli(Node, Args) ->
Result
end.
%% to avoid it finishing too fast
with_some_sessions(Fn) ->
emqx_common_test_helpers:with_mock(
emqx_eviction_agent,
all_channels_count,
fun() -> 100 end,
Fn
).
get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port.

View File

@ -15,7 +15,13 @@
-import(
emqx_eviction_agent_test_helpers,
[emqtt_connect/1, emqtt_try_connect/1, case_specific_node_name/3]
[
emqtt_connect/1,
emqtt_try_connect/1,
case_specific_node_name/3,
start_cluster/3,
stop_cluster/1
]
).
all() -> [{group, one_node}, {group, two_node}].
@ -37,12 +43,13 @@ one_node_cases() ->
emqx_common_test_helpers:all(?MODULE) -- two_node_cases().
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([]),
ok.
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_group(one_node, Config) ->
[{cluster_type, one_node} | Config];
@ -53,30 +60,23 @@ end_per_group(_Group, _Config) ->
ok.
init_per_testcase(Case, Config) ->
NodesWithPorts =
NodeNames =
case ?config(cluster_type, Config) of
one_node ->
[{case_specific_node_name(?MODULE, Case, '_evacuated'), 2883}];
[case_specific_node_name(?MODULE, Case, '_evacuated')];
two_node ->
[
{case_specific_node_name(?MODULE, Case, '_evacuated'), 2883},
{case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
case_specific_node_name(?MODULE, Case, '_evacuated'),
case_specific_node_name(?MODULE, Case, '_recipient')
]
end,
ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
NodesWithPorts,
[emqx_eviction_agent, emqx_node_rebalance],
[{emqx, data_dir, case_specific_data_dir(Case, Config)}]
),
ClusterNodes = start_cluster(Config, NodeNames, [emqx, emqx_eviction_agent, emqx_node_rebalance]),
ok = snabbkaffe:start_trace(),
[{cluster_nodes, ClusterNodes} | Config].
end_per_testcase(_Case, Config) ->
ok = snabbkaffe:stop(),
ok = emqx_eviction_agent_test_helpers:stop_cluster(
?config(cluster_nodes, Config),
[emqx_eviction_agent, emqx_node_rebalance]
).
stop_cluster(?config(cluster_nodes, Config)).
%%--------------------------------------------------------------------
%% Tests
@ -89,10 +89,9 @@ t_agent_busy(Config) ->
ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_busy},
5000
?assertEqual(
{error, eviction_agent_busy},
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)])
).
t_already_started(Config) ->
@ -118,7 +117,13 @@ t_start(Config) ->
[{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
begin
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
?assertMatch(
ok,
emqtt_try_connect([{port, DonorPort}])
)
end,
#{?snk_kind := eviction_agent_started},
5000
),

View File

@ -18,7 +18,9 @@
[
emqtt_connect/1,
emqtt_try_connect/1,
case_specific_node_name/3
case_specific_node_name/3,
stop_many/1,
get_mqtt_port/2
]
).
@ -41,11 +43,13 @@ one_node_cases() ->
emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([]),
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.
init_per_group(one_node, Config) ->
@ -78,7 +82,7 @@ init_per_testcase(TestCase, Config) ->
Cluster = [{Node, Spec} || Node <- Nodes],
ClusterNodes = emqx_cth_cluster:start(
Cluster,
#{work_dir => ?config(priv_dir, Config)}
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
),
ok = snabbkaffe:start_trace(),
[{cluster_nodes, ClusterNodes} | Config].
@ -128,20 +132,12 @@ case_specific_data_dir(Case, Config) ->
PrivDir -> filename:join(PrivDir, atom_to_list(Case))
end.
get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port.
%% to avoid it finishing too fast
with_some_sessions(Node, Fn) ->
erpc:call(Node, fun() ->
emqx_common_test_helpers:with_mock(
emqx_eviction_agent,
all_channels_count,
fun() -> 100 end,
Fn
)
end).
Port = get_mqtt_port(Node, tcp),
Conns = emqtt_connect_many(Port, 100),
_ = erpc:call(Node, Fn),
ok = stop_many(Conns).
drain_exits([ClientPid | Rest]) ->
receive
@ -189,6 +185,7 @@ t_agent_busy(Config) ->
ok.
t_already_started(Config) ->
process_flag(trap_exit, true),
[Node] = ?config(cluster_nodes, Config),
with_some_sessions(Node, fun() ->
ok = emqx_node_rebalance_purge:start(opts(Config)),
@ -216,6 +213,7 @@ t_not_started(Config) ->
).
t_start(Config) ->
process_flag(trap_exit, true),
[Node] = ?config(cluster_nodes, Config),
Port = get_mqtt_port(Node, tcp),
@ -233,6 +231,7 @@ t_start(Config) ->
ok.
t_non_persistence(Config) ->
process_flag(trap_exit, true),
[Node] = ?config(cluster_nodes, Config),
Port = get_mqtt_port(Node, tcp),
@ -284,6 +283,7 @@ t_unknown_messages(Config) ->
%%--------------------------------------------------------------------
t_already_started_two(Config) ->
process_flag(trap_exit, true),
[Node1, _Node2] = ?config(cluster_nodes, Config),
with_some_sessions(Node1, fun() ->
ok = emqx_node_rebalance_purge:start(opts(Config)),

View File

@ -32,6 +32,7 @@ init_per_suite(Config) ->
Apps = [
emqx_conf,
emqx,
emqx_eviction_agent,
emqx_node_rebalance
],
Cluster = [

View File

@ -0,0 +1,4 @@
Made `/api/v5/load_rebalance/availability_check` public, i.e. not requiring authentication. This simplifies load balancer setup.
Made rebalance/evacuation more graceful during the wait health check phase. The connections to nodes marked for eviction are now not prohibited during this phase.
During this phase it is unknown whether these nodes are all marked unhealthy by the load balancer, so prohibiting connections to them may cause multiple unssuccessful attempts to reconnect.