chore(rebalance): rebase and review fixes

This commit is contained in:
Ilya Averyanov 2023-05-04 23:33:46 +03:00
parent 609f7bd8fd
commit e683d28973
19 changed files with 673 additions and 582 deletions

View File

@ -602,14 +602,14 @@ all_channels() ->
ets:select(?CHAN_TAB, Pat).
%% @doc Get clientinfo for all clients with sessions
channel_with_session_table(ConnModules) ->
channel_with_session_table(ConnModuleList) ->
Ms = ets:fun2ms(
fun({{ClientId, _ChanPid}, Info, _Stats}) ->
{ClientId, Info}
end
),
Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
ConnModuleMap = maps:from_list([{Mod, true} || Mod <- ConnModules]),
ConnModules = sets:from_list(ConnModuleList, [{version, 2}]),
qlc:q([
{ClientId, ConnState, ConnInfo, ClientInfo}
|| {ClientId, #{
@ -618,7 +618,7 @@ channel_with_session_table(ConnModules) ->
conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo
}} <-
Table,
maps:is_key(ConnModule, ConnModuleMap)
sets:is_element(ConnModule, ConnModules)
]).
%% @doc Get all local connection query handle

View File

@ -72,4 +72,6 @@ is_running_node(Node) ->
handle_result({ok, Result}) ->
?OK(Result);
handle_result({error, Reason}) ->
?BAD_REQUEST(Reason).
?BAD_REQUEST(Reason);
handle_result({HTTPCode, Content}) when is_integer(HTTPCode) ->
{HTTPCode, Content}.

View File

@ -1,14 +0,0 @@
emqx_eviction_agent_api {
node_eviction_status_get {
desc {
en: "Get the node eviction status"
zh: "获取节点驱逐状态"
}
label {
en: "Node Eviction Status"
zh: "节点驱逐状态"
}
}
}

View File

@ -48,7 +48,9 @@
-export_type([server_reference/0]).
-define(CONN_MODULES, [emqx_connection, emqx_ws_connection, emqx_eviction_agent_channel]).
-define(CONN_MODULES, [
emqx_connection, emqx_ws_connection, emqx_quic_connection, emqx_eviction_agent_channel
]).
%%--------------------------------------------------------------------
%% APIs

View File

@ -6,8 +6,6 @@
-behaviour(application).
-emqx_plugin(?MODULE).
-export([
start/2,
stop/1

View File

@ -13,8 +13,6 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-logger_header("[Evicted Channel]").
-export([
start_link/1,
start_supervised/1,
@ -33,13 +31,6 @@
code_change/3
]).
-import(
emqx_misc,
[
maybe_apply/2
]
).
-type opts() :: #{
conninfo := emqx_types:conninfo(),
clientinfo := emqx_types:clientinfo()
@ -133,7 +124,7 @@ handle_call(
) ->
ok = emqx_session:takeover(Session),
%% TODO: Should not drain deliver here (side effect)
Delivers = emqx_misc:drain_deliver(),
Delivers = emqx_utils:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
?tp(
debug,
@ -156,7 +147,7 @@ handle_call(Req, _From, Channel) ->
{reply, ignored, Channel}.
handle_info(Deliver = {deliver, _Topic, _Msg}, Channel) ->
Delivers = [Deliver | emqx_misc:drain_deliver()],
Delivers = [Deliver | emqx_utils:drain_deliver()],
{noreply, handle_deliver(Delivers, Channel)};
handle_info(expire_session, Channel) ->
{stop, expired, Channel};
@ -186,7 +177,6 @@ code_change(_OldVsn, Channel, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
%% TODO: sync with emqx_channel
handle_deliver(
Delivers,
#{
@ -239,7 +229,7 @@ set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) ->
Channel = channel(ConnInfo, ClientInfo),
case emqx_cm:open_session(false, ClientInfo, ConnInfo) of
case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo) of
{ok, #{present := false}} ->
?SLOG(
info,
@ -259,7 +249,7 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) ->
node => node()
}
),
Pendings1 = lists:usort(lists:append(Pendings0, emqx_misc:drain_deliver())),
Pendings1 = lists:usort(lists:append(Pendings0, emqx_utils:drain_deliver())),
NSession = emqx_session:enqueue(
ClientInfo,
emqx_session:ignore_local(
@ -352,7 +342,7 @@ info(Channel) ->
#{
conninfo => maps:get(conninfo, Channel, undefined),
clientinfo => maps:get(clientinfo, Channel, undefined),
session => maybe_apply(
session => emqx_utils:maybe_apply(
fun emqx_session:info/1,
maps:get(session, Channel, undefined)
),

View File

@ -362,13 +362,77 @@ t_will_msg(_Config) ->
ok = emqtt:disconnect(C).
t_ws_conn(_Config) ->
erlang:process_flag(trap_exit, true),
ClientId = <<"ws_client">>,
{ok, C} = emqtt:start_link([
{proto_ver, v5},
{clientid, ClientId},
{port, 8083},
{ws_path, "/mqtt"}
]),
{ok, _} = emqtt:ws_connect(C),
ok = emqx_eviction_agent:enable(test_eviction, undefined),
?assertEqual(
1,
emqx_eviction_agent:connection_count()
),
?assertWaitEvent(
ok = emqx_eviction_agent:evict_connections(1),
#{?snk_kind := emqx_cm_connected_client_count_dec},
1000
),
?assertEqual(
0,
emqx_eviction_agent:connection_count()
).
-ifndef(BUILD_WITHOUT_QUIC).
t_quic_conn(_Config) ->
erlang:process_flag(trap_exit, true),
QuicPort = emqx_common_test_helpers:select_free_port(quic),
application:ensure_all_started(quicer),
emqx_common_test_helpers:ensure_quic_listener(?MODULE, QuicPort),
ClientId = <<"quic_client">>,
{ok, C} = emqtt:start_link([
{proto_ver, v5},
{clientid, ClientId},
{port, QuicPort}
]),
{ok, _} = emqtt:quic_connect(C),
ok = emqx_eviction_agent:enable(test_eviction, undefined),
?assertEqual(
1,
emqx_eviction_agent:connection_count()
),
?assertWaitEvent(
ok = emqx_eviction_agent:evict_connections(1),
#{?snk_kind := emqx_cm_connected_client_count_dec},
1000
),
?assertEqual(
0,
emqx_eviction_agent:connection_count()
).
-endif.
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
% sn_connect_and_subscribe(ClientId, Topic) ->
% emqx_eviction_agent_test_helpers:sn_connect_and_subscribe(ClientId, Topic).
assert_receive_publish([]) ->
ok;
assert_receive_publish([#{payload := Msg, topic := Topic} | Rest]) ->

View File

@ -81,12 +81,11 @@ start_cluster(NamesWithPorts, Apps, Env) ->
NamesWithPorts
),
Opts0 = [
{env, [{emqx, boot_modules, [broker, listeners]}]},
{env, [{emqx, boot_modules, [broker, listeners]}] ++ Env},
{apps, Apps},
{conf,
[{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]] ++
[{[rpc, mode], async}]},
{env, Env}
[{[rpc, mode], async}]}
],
Cluster = emqx_common_test_helpers:emqx_cluster(
Specs,
@ -99,12 +98,6 @@ start_cluster(NamesWithPorts, Apps, Env) ->
}
|| {Name, Opts} <- Cluster
],
ok = lists:foreach(
fun({Node, _Port}) ->
snabbkaffe:forward_trace(Node)
end,
NodesWithPorts
),
NodesWithPorts.
stop_cluster(NodesWithPorts, Apps) ->

View File

@ -1,490 +0,0 @@
emqx_node_rebalance_api {
## API Request Fields
load_rebalance_status {
desc {
en: "Get rebalance status of the current node"
zh: "获取当前节点的rebalance状态"
}
label {
en: "Get rebalance status"
zh: "获取rebalance状态"
}
}
load_rebalance_global_status {
desc {
en: "Get status of all rebalance/evacuation processes across the cluster"
zh: "获取集群中所有rebalance/evacuation进程的状态"
}
label {
en: "Get global rebalance status"
zh: "获取全局rebalance状态"
}
}
load_rebalance_availability_check {
desc {
en: "Check if the node is being evacuated or rebalanced"
zh: "检查节点是否正在被evacuate或rebalance"
}
label {
en: "Availability check"
zh: "可用性检查"
}
}
load_rebalance_start {
desc {
en: "Start rebalance process"
zh: "启动rebalance进程"
}
label {
en: "Start rebalance"
zh: "启动rebalance"
}
}
load_rebalance_stop {
desc {
en: "Stop rebalance process"
zh: "停止rebalance进程"
}
label {
en: "Stop rebalance"
zh: "停止rebalance"
}
}
load_rebalance_evacuation_start {
desc {
en: "Start evacuation process"
zh: "启动evacuation进程"
}
label {
en: "Start evacuation"
zh: "启动evacuation"
}
}
load_rebalance_evacuation_stop {
desc {
en: "Stop evacuation process"
zh: "停止evacuation进程"
}
label {
en: "Stop evacuation"
zh: "停止evacuation"
}
}
param_node {
desc {
en: "Node name"
zh: "节点名称"
}
label {
en: "Node name"
zh: "节点名称"
}
}
wait_health_check {
desc {
en: "Time to wait before starting the rebalance process, in seconds"
zh: "启动rebalance进程前等待的时间单位为秒"
}
label {
en: "Wait health check"
zh: "等待健康检查"
}
}
conn_evict_rate {
desc {
en: "The rate of evicting connections, in connections per second"
zh: "逐出连接的速率,以每秒连接数表示"
}
label {
en: "Connection eviction rate"
zh: "连接驱逐率"
}
}
sess_evict_rate {
desc {
en: "The rate of evicting sessions, in sessions per second"
zh: "逐出会话的速率,以每秒会话为单位"
}
label {
en: "Session eviction rate"
zh: "会话驱逐率"
}
}
abs_conn_threshold {
desc {
en: "Maximum desired difference between the number of connections on the node and the average number of connections on the recipient nodes"
zh: "节点上的连接数与接收节点上的平均连接数之间的最大期望差值"
}
label {
en: "Absolute connection threshold"
zh: "绝对连接阈值"
}
}
rel_conn_threshold {
desc {
en: "Maximum desired fraction between the number of connections on the node and the average number of connections on the recipient nodes"
zh: "节点上的连接数与接收节点上的平均连接数之间的最大期望分数"
}
label {
en: "Relative connection threshold"
zh: "相对连接阈值"
}
}
abs_sess_threshold {
desc {
en: "Maximum desired difference between the number of sessions on the node and the average number of sessions on the recipient nodes"
zh: "节点上的会话数与接收节点上的平均会话数之间的最大期望差异"
}
label {
en: "Absolute session threshold"
zh: "绝对会话阈值"
}
}
rel_sess_threshold {
desc {
en: "Maximum desired fraction between the number of sessions on the node and the average number of sessions on the recipient nodes"
zh: "节点上的会话数与接收节点上的平均会话数之间的最大期望分数"
}
label {
en: "Relative session threshold"
zh: "相对会话阈值"
}
}
wait_takeover {
desc {
en: "Time to wait before starting session evacuation process, in seconds"
zh: "开始会话疏散过程之前等待的时间,以秒为单位"
}
label {
en: "Wait takeover"
zh: "等待接管"
}
}
redirect_to {
desc {
en: "Server reference to redirect clients to (MQTTv5 Server redirection)"
zh: "将客户端重定向到的服务器参考MQTTv5 服务器重定向)"
}
label {
en: "Redirect to"
zh: "重定向至"
}
}
migrate_to {
desc {
en: "Nodes to migrate sessions to"
zh: "将会话迁移到的节点"
}
label {
en: "Migrate to"
zh: "迁移到"
}
}
rebalance_nodes {
desc {
en: "Nodes to participate in rebalance"
zh: "参与rebalance的节点"
}
label {
en: "Rebalance nodes"
zh: "重新平衡节点"
}
}
## API Response Fields
local_status_enabled {
desc {
en: "Whether the node is being evacuated"
zh: "节点是否正在撤离"
}
label {
en: "Local evacuation status"
zh: "当地避难状况"
}
}
local_status_process {
desc {
en: "The process that is being performed on the node: evacuation or rebalance"
zh: "正在节点上执行的过程:疏散或重新平衡"
}
label {
en: "Node process"
zh: "节点进程"
}
}
local_status_state {
desc {
en: "The state of the process that is being performed on the node"
zh: "正在节点上执行的进程的状态"
}
label {
en: "Rebalance/evacuation current state"
zh: "重新平衡/疏散当前状态"
}
}
local_status_coordinator_node {
desc {
en: "The node that is coordinating rebalance process"
zh: "协调再平衡过程的节点"
}
label {
en: "Coordinator node"
zh: "协调节点"
}
}
local_status_connection_eviction_rate {
desc {
en: "The rate of evicting connections, in connections per second"
zh: "逐出连接的速率,以每秒连接数表示"
}
label {
en: "Connection eviction rate"
zh: "连接驱逐率"
}
}
local_status_session_eviction_rate {
desc {
en: "The rate of evicting sessions, in sessions per second"
zh: "逐出会话的速率,以每秒会话为单位"
}
label {
en: "Session eviction rate"
zh: "会话驱逐率"
}
}
local_status_connection_goal {
desc {
en: "The number of connections that the node should have after the rebalance/evacuation process"
zh: "节点在重新平衡/疏散过程后应该拥有的连接数"
}
label {
en: "Connection goal"
zh: "连接目标"
}
}
local_status_session_goal {
desc {
en: "The number of sessions that the node should have after the evacuation process"
zh: "疏散过程后节点应有的会话数"
}
label {
en: "Session goal"
zh: "会话目标"
}
}
local_status_disconnected_session_goal {
desc {
en: "The number of disconnected sessions that the node should have after the rebalance process"
zh: "重新平衡过程后节点应具有的断开连接的会话数"
}
label {
en: "Disconnected session goal"
zh: "断开连接的会话目标"
}
}
local_status_session_recipients {
desc {
en: "List of nodes to which sessions are being evacuated"
zh: "会话被疏散到的节点列表"
}
label {
en: "Session recipients"
zh: "会话收件人"
}
}
local_status_recipients {
desc {
en: "List of nodes to which connections/sessions are being evacuated during rebalance"
zh: "在重新平衡期间连接/会话被疏散到的节点列表"
}
label {
en: "Recipients"
zh: "收件人"
}
}
local_status_stats {
desc {
en: "Statistics of the evacuation/rebalance process"
zh: "疏散/再平衡过程的统计"
}
label {
en: "Statistics"
zh: "统计数据"
}
}
status_stats_initial_connected {
desc {
en: "The number of connections on the node before the evacuation/rebalance process"
zh: "疏散/重新平衡过程之前节点上的连接数"
}
label {
en: "Initial connected"
zh: "初始连接"
}
}
status_stats_current_connected {
desc {
en: "Current number of connections on the node"
zh: "节点上的当前连接数"
}
label {
en: "Current connections"
zh: "当前连接"
}
}
status_stats_initial_sessions {
desc {
en: "The number of sessions on the node before the evacuation/rebalance process"
zh: "疏散/重新平衡过程之前节点上的会话数"
}
label {
en: "Initial sessions"
zh: "初始会话"
}
}
status_stats_current_sessions {
desc {
en: "Current number of sessions on the node"
zh: "节点上的当前会话数"
}
label {
en: "Current sessions"
zh: "当前会话"
}
}
status_stats_current_disconnected_sessions {
desc {
en: "Current number of disconnected sessions on the node"
zh: "节点上当前断开连接的会话数"
}
label {
en: "Current disconnected sessions"
zh: "当前断开连接的会话"
}
}
coordinator_status_donors {
desc {
en: "List of nodes from which connections/sessions are being evacuated"
zh: "正在疏散连接/会话的节点列表"
}
label {
en: "Donors"
zh: "捐助者"
}
}
coordinator_status_donor_conn_avg {
desc {
en: "Average number of connections per donor node"
zh: "每个供体节点的平均连接数"
}
label {
en: "Donor connections average"
zh: "捐助者连接平均值"
}
}
coordinator_status_donor_sess_avg {
desc {
en: "Average number of sessions per donor node"
zh: "每个供体节点的平均会话数"
}
label {
en: "Donor sessions average"
zh: "平均捐助会议"
}
}
coordinator_status_node {
desc {
en: "The node that is coordinating the evacuation/rebalance process"
zh: "协调疏散/再平衡过程的节点"
}
label {
en: "Coordinator node"
zh: "协调节点"
}
}
evacuation_status_node {
desc {
en: "The node that is being evacuated"
zh: "正在撤离的节点"
}
label {
en: "Evacuated node"
zh: "疏散节点"
}
}
global_status_evacuations {
desc {
en: "List of nodes that are being evacuated"
zh: "正在撤离的节点列表"
}
label {
en: "Evacuations"
zh: "疏散"
}
}
global_status_rebalances {
desc {
en: "List of nodes that coordinate a rebalance"
zh: "协调再平衡的节点列表"
}
label {
en: "Rebalances"
zh: "再平衡"
}
}
empty_response {
desc {
en: "The response is empty"
zh: "响应为空"
}
label {
en: "Empty response"
zh: "空响应"
}
}
}

View File

@ -8,6 +8,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
%% Swagger specs from hocon schema
-export([
@ -44,9 +45,9 @@
-import(emqx_dashboard_swagger, [error_codes/2]).
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NODE_UNAVAILABLE, 'NODE_UNAVAILABLE').
-define(NODE_EVACUATING, 'NODE_EVACUATING').
-define(RPC_ERROR, 'RPC_ERROR').
-define(NOT_FOUND, 'NOT_FOUND').
%%--------------------------------------------------------------------
%% API Spec
@ -120,7 +121,8 @@ schema("/load_rebalance/:node/start") ->
),
responses => #{
200 => response_schema(),
400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>)
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
}
}
};
@ -134,7 +136,8 @@ schema("/load_rebalance/:node/stop") ->
parameters => [param_node()],
responses => #{
200 => response_schema(),
400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>)
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
}
}
};
@ -153,7 +156,8 @@ schema("/load_rebalance/:node/evacuation/start") ->
),
responses => #{
200 => response_schema(),
400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>)
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
}
}
};
@ -167,7 +171,8 @@ schema("/load_rebalance/:node/evacuation/stop") ->
parameters => [param_node()],
responses => #{
200 => response_schema(),
400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>)
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
}
}
}.
@ -205,7 +210,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
end.
'/load_rebalance/:node/start'(post, #{bindings := #{node := NodeBin}, body := Params0}) ->
with_node(NodeBin, fun(Node) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
Params1 = translate(rebalance_start, Params0),
with_nodes_at_key(nodes, Params1, fun(Params2) ->
wrap_rpc(
@ -215,7 +220,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
end).
'/load_rebalance/:node/stop'(post, #{bindings := #{node := NodeBin}}) ->
with_node(NodeBin, fun(Node) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
wrap_rpc(
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_stop(Node)
)
@ -224,7 +229,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
'/load_rebalance/:node/evacuation/start'(post, #{
bindings := #{node := NodeBin}, body := Params0
}) ->
with_node(NodeBin, fun(Node) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
Params1 = translate(rebalance_evacuation_start, Params0),
with_nodes_at_key(migrate_to, Params1, fun(Params2) ->
wrap_rpc(
@ -237,7 +242,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
end).
'/load_rebalance/:node/evacuation/stop'(post, #{bindings := #{node := NodeBin}}) ->
with_node(NodeBin, fun(Node) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
wrap_rpc(
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_stop(Node)
)
@ -288,19 +293,13 @@ validate_nodes(Key, Params) when is_map_key(Key, Params) ->
validate_nodes(_Key, Params) ->
{ok, Params}.
with_node(BinNode, Fun) ->
case parse_node(BinNode) of
{ok, Node} -> Fun(Node);
{error, _} -> error_response(400, ?BAD_REQUEST, [<<"Invalid node: ">>, BinNode])
end.
with_nodes_at_key(Key, Params, Fun) ->
Res = validate_nodes(Key, Params),
case Res of
{ok, Params1} ->
Fun(Params1);
{error, {unavailable, Nodes}} ->
error_response(400, ?NODE_UNAVAILABLE, io_lib:format("Nodes unavailable: ~p", [Nodes]));
error_response(400, ?NOT_FOUND, io_lib:format("Nodes unavailable: ~p", [Nodes]));
{error, {invalid, Nodes}} ->
error_response(400, ?BAD_REQUEST, io_lib:format("Invalid nodes: ~p", [Nodes]))
end.
@ -322,10 +321,7 @@ format_as_map_list(List) ->
).
error_response(HttpCode, Code, Message) ->
{HttpCode, #{
code => atom_to_binary(Code),
message => iolist_to_binary(Message)
}}.
{HttpCode, ?ERROR_MSG(Code, Message)}.
without(Keys, Props) ->
lists:filter(
@ -470,11 +466,10 @@ fields(rebalance_evacuation_start) ->
)},
{"migrate_to",
mk(
list(binary()),
nonempty_list(binary()),
#{
desc => ?DESC(migrate_to),
required => false,
validator => [fun(Values) -> length(Values) > 0 end]
required => false
}
)}
];

View File

@ -43,7 +43,7 @@ cli(["start" | StartArgs]) ->
false
end;
cli(["node-status", NodeStr]) ->
case emqx_misc:safe_to_existing_atom(NodeStr, utf8) of
case emqx_utils:safe_to_existing_atom(NodeStr, utf8) of
{ok, Node} ->
node_status(emqx_node_rebalance_status:local_status(Node));
{error, _} ->
@ -297,7 +297,7 @@ strings_to_atoms(Strings) ->
strings_to_atoms([], Atoms, Invalid) ->
{lists:reverse(Atoms), lists:reverse(Invalid)};
strings_to_atoms([Str | Rest], Atoms, Invalid) ->
case emqx_misc:safe_to_existing_atom(Str, utf8) of
case emqx_utils:safe_to_existing_atom(Str, utf8) of
{ok, Atom} ->
strings_to_atoms(Rest, [Atom | Atoms], Invalid);
{error, _} ->

View File

@ -55,7 +55,7 @@ save(
Filepath = evacuation_filepath(),
case filelib:ensure_dir(Filepath) of
ok ->
JsonData = emqx_json:encode(
JsonData = emqx_utils_json:encode(
prepare_for_encode(maps:with(persist_keys(), Data)),
[pretty]
),
@ -72,7 +72,7 @@ clear() ->
read(DefaultOpts) ->
case file:read_file(evacuation_filepath()) of
{ok, Data} ->
case emqx_json:safe_decode(Data, [return_maps]) of
case emqx_utils_json:safe_decode(Data, [return_maps]) of
{ok, Map} when is_map(Map) ->
{ok, map_to_opts(DefaultOpts, Map)};
_NotAMap ->

View File

@ -208,7 +208,7 @@ format_local_status_field({session_goal, SessGoal}) ->
format_local_status_field({disconnected_session_goal, DisconnSessGoal}) ->
io_lib:format("Disconnected session goal: ~p~n", [DisconnSessGoal]);
format_local_status_field({session_recipients, SessionRecipients}) ->
io_lib:format("Session recipient nodes: ~p~n", [SessionRecipients]);
io_lib:format("Session recipient nodes: ~p~n", [SessionRecipients]);
format_local_status_field({recipients, Recipients}) ->
io_lib:format("Recipient nodes: ~p~n", [Recipients]);
format_local_status_field({donors, Donors}) ->

View File

@ -88,7 +88,7 @@ t_start_evacuation_validation(Config) ->
BadOpts
),
?assertMatch(
{ok, 400, #{}},
{ok, 404, #{}},
api_post(
["load_rebalance", "bad@node", "evacuation", "start"],
#{}
@ -148,7 +148,7 @@ t_start_rebalance_validation(Config) ->
BadOpts
),
?assertMatch(
{ok, 400, #{}},
{ok, 404, #{}},
api_post(
["load_rebalance", "bad@node", "start"],
#{}
@ -346,7 +346,7 @@ t_start_stop_rebalance(Config) ->
}
]
}},
api_get(["load_rebalance", "global_status"])
GlobalStatusResponse
),
?assertMatch(

View File

@ -22,21 +22,20 @@ all() -> [{group, one_node}, {group, two_node}].
groups() ->
[
{one_node, [], [
t_agent_busy,
t_already_started,
t_not_started,
t_start,
t_persistence,
t_unknown_messages
]},
{two_node, [], [
t_conn_evicted,
t_migrate_to,
t_session_evicted
]}
{one_node, [], one_node_cases()},
{two_node, [], two_node_cases()}
].
two_node_cases() ->
[
t_conn_evicted,
t_migrate_to,
t_session_evicted
].
one_node_cases() ->
emqx_common_test_helpers:all(?MODULE) -- two_node_cases().
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([]),
Config.

View File

@ -0,0 +1,9 @@
emqx_eviction_agent_api {
node_eviction_status_get.desc:
"""Get the node eviction status"""
node_eviction_status_get.label:
"""Node Eviction Status"""
}

View File

@ -0,0 +1,267 @@
emqx_node_rebalance_api {
load_rebalance_status.desc:
"""Get rebalance status of the current node"""
load_rebalance_status.label:
"""Get rebalance status"""
load_rebalance_global_status.desc:
"""Get status of all rebalance/evacuation processes across the cluster"""
load_rebalance_global_status.label:
"""Get global rebalance status"""
load_rebalance_availability_check.desc:
"""Check if the node is being evacuated or rebalanced"""
load_rebalance_availability_check.label:
"""Availability check"""
load_rebalance_start.desc:
"""Start rebalance process"""
load_rebalance_start.label:
"""Start rebalance"""
load_rebalance_stop.desc:
"""Stop rebalance process"""
load_rebalance_stop.label:
"""Stop rebalance"""
load_rebalance_evacuation_start.desc:
"""Start evacuation process"""
load_rebalance_evacuation_start.label:
"""Start evacuation"""
load_rebalance_evacuation_stop.desc:
"""Stop evacuation process"""
load_rebalance_evacuation_stop.label:
"""Stop evacuation"""
param_node.desc:
"""Node name"""
param_node.label:
"""Node name"""
wait_health_check.desc:
"""Time to wait before starting the rebalance process, in seconds"""
wait_health_check.label:
"""Wait health check"""
conn_evict_rate.desc:
"""The rate of evicting connections, in connections per second"""
conn_evict_rate.label:
"""Connection eviction rate"""
sess_evict_rate.desc:
"""The rate of evicting sessions, in sessions per second"""
sess_evict_rate.label:
"""Session eviction rate"""
abs_conn_threshold.desc:
"""Maximum desired difference between the number of connections on the node and the average number of connections on the recipient nodes"""
abs_conn_threshold.label:
"""Absolute connection threshold"""
rel_conn_threshold.desc:
"""Maximum desired fraction between the number of connections on the node and the average number of connections on the recipient nodes"""
rel_conn_threshold.label:
"""Relative connection threshold"""
abs_sess_threshold.desc:
"""Maximum desired difference between the number of sessions on the node and the average number of sessions on the recipient nodes"""
abs_sess_threshold.label:
"""Absolute session threshold"""
rel_sess_threshold.desc:
"""Maximum desired fraction between the number of sessions on the node and the average number of sessions on the recipient nodes"""
rel_sess_threshold.label:
"""Relative session threshold"""
wait_takeover.desc:
"""Time to wait before starting session evacuation process, in seconds"""
wait_takeover.label:
"""Wait takeover"""
redirect_to.desc:
"""Server reference to redirect clients to (MQTTv5 Server redirection)"""
redirect_to.label:
"""Redirect to"""
migrate_to.desc:
"""Nodes to migrate sessions to"""
migrate_to.label:
"""Migrate to"""
rebalance_nodes.desc:
"""Nodes to participate in rebalance"""
rebalance_nodes.label:
"""Rebalance nodes"""
local_status_enabled.desc:
"""Whether the node is being evacuated"""
local_status_enabled.label:
"""Local evacuation status"""
local_status_process.desc:
"""The process that is being performed on the node: evacuation or rebalance"""
local_status_process.label:
"""Node process"""
local_status_state.desc:
"""The state of the process that is being performed on the node"""
local_status_state.label:
"""Rebalance/evacuation current state"""
local_status_coordinator_node.desc:
"""The node that is coordinating rebalance process"""
local_status_coordinator_node.label:
"""Coordinator node"""
local_status_connection_eviction_rate.desc:
"""The rate of evicting connections, in connections per second"""
local_status_connection_eviction_rate.label:
"""Connection eviction rate"""
local_status_session_eviction_rate.desc:
"""The rate of evicting sessions, in sessions per second"""
local_status_session_eviction_rate.label:
"""Session eviction rate"""
local_status_connection_goal.desc:
"""The number of connections that the node should have after the rebalance/evacuation process"""
local_status_connection_goal.label:
"""Connection goal"""
local_status_session_goal.desc:
"""The number of sessions that the node should have after the evacuation process"""
local_status_session_goal.label:
"""Session goal"""
local_status_disconnected_session_goal.desc:
"""The number of disconnected sessions that the node should have after the rebalance process"""
local_status_disconnected_session_goal.label:
"""Disconnected session goal"""
local_status_session_recipients.desc:
"""List of nodes to which sessions are being evacuated"""
local_status_session_recipients.label:
"""Session recipients"""
local_status_recipients.desc:
"""List of nodes to which connections/sessions are being evacuated during rebalance"""
local_status_recipients.label:
"""Recipients"""
local_status_stats.desc:
"""Statistics of the evacuation/rebalance process"""
local_status_stats.label:
"""Statistics"""
status_stats_initial_connected.desc:
"""The number of connections on the node before the evacuation/rebalance process"""
status_stats_initial_connected.label:
"""Initial connected"""
status_stats_current_connected.desc:
"""Current number of connections on the node"""
status_stats_current_connected.label:
"""Current connections"""
status_stats_initial_sessions.desc:
"""The number of sessions on the node before the evacuation/rebalance process"""
status_stats_initial_sessions.label:
"""Initial sessions"""
status_stats_current_sessions.desc:
"""Current number of sessions on the node"""
status_stats_current_sessions.label:
"""Current sessions"""
status_stats_current_disconnected_sessions.desc:
"""Current number of disconnected sessions on the node"""
status_stats_current_disconnected_sessions.label:
"""Current disconnected sessions"""
coordinator_status_donors.desc:
"""List of nodes from which connections/sessions are being evacuated"""
coordinator_status_donors.label:
"""Donors"""
coordinator_status_donor_conn_avg.desc:
"""Average number of connections per donor node"""
coordinator_status_donor_conn_avg.label:
"""Donor connections average"""
coordinator_status_donor_sess_avg.desc:
"""Average number of sessions per donor node"""
coordinator_status_donor_sess_avg.label:
"""Donor sessions average"""
coordinator_status_node.desc:
"""The node that is coordinating the evacuation/rebalance process"""
coordinator_status_node.label:
"""Coordinator node"""
evacuation_status_node.desc:
"""The node that is being evacuated"""
evacuation_status_node.label:
"""Evacuated node"""
global_status_evacuations.desc:
"""List of nodes that are being evacuated"""
global_status_evacuations.label:
"""Evacuations"""
global_status_rebalances.desc:
"""List of nodes that coordinate a rebalance"""
global_status_rebalances.label:
"""Rebalances"""
empty_response.desc:
"""The response is empty"""
empty_response.label:
"""Empty response"""
}

View File

@ -0,0 +1,9 @@
emqx_eviction_agent_api {
node_eviction_status_get.desc:
"""获取节点驱逐状态"""
node_eviction_status_get.label:
"""节点驱逐状态"""
}

View File

@ -0,0 +1,267 @@
emqx_node_rebalance_api {
load_rebalance_status.desc:
"""获取当前节点的rebalance状态"""
load_rebalance_status.label:
"""获取rebalance状态"""
load_rebalance_global_status.desc:
"""获取集群中所有rebalance/evacuation进程的状态"""
load_rebalance_global_status.label:
"""获取全局rebalance状态"""
load_rebalance_availability_check.desc:
"""检查节点是否正在被evacuate或rebalance"""
load_rebalance_availability_check.label:
"""可用性检查"""
load_rebalance_start.desc:
"""启动rebalance进程"""
load_rebalance_start.label:
"""启动rebalance"""
load_rebalance_stop.desc:
"""停止rebalance进程"""
load_rebalance_stop.label:
"""停止rebalance"""
load_rebalance_evacuation_start.desc:
"""启动evacuation进程"""
load_rebalance_evacuation_start.label:
"""启动evacuation"""
load_rebalance_evacuation_stop.desc:
"""停止evacuation进程"""
load_rebalance_evacuation_stop.label:
"""停止evacuation"""
param_node.desc:
"""节点名称"""
param_node.label:
"""节点名称"""
wait_health_check.desc:
"""启动rebalance进程前等待的时间单位为秒"""
wait_health_check.label:
"""等待健康检查"""
conn_evict_rate.desc:
"""逐出连接的速率,以每秒连接数表示"""
conn_evict_rate.label:
"""连接驱逐率"""
sess_evict_rate.desc:
"""逐出会话的速率,以每秒会话为单位"""
sess_evict_rate.label:
"""会话驱逐率"""
abs_conn_threshold.desc:
"""节点上的连接数与接收节点上的平均连接数之间的最大期望差值"""
abs_conn_threshold.label:
"""绝对连接阈值"""
rel_conn_threshold.desc:
"""节点上的连接数与接收节点上的平均连接数之间的最大期望分数"""
rel_conn_threshold.label:
"""相对连接阈值"""
abs_sess_threshold.desc:
"""节点上的会话数与接收节点上的平均会话数之间的最大期望差异"""
abs_sess_threshold.label:
"""绝对会话阈值"""
rel_sess_threshold.desc:
"""节点上的会话数与接收节点上的平均会话数之间的最大期望分数"""
rel_sess_threshold.label:
"""相对会话阈值"""
wait_takeover.desc:
"""开始会话疏散过程之前等待的时间,以秒为单位"""
wait_takeover.label:
"""等待接管"""
redirect_to.desc:
"""将客户端重定向到的服务器参考MQTTv5 服务器重定向)"""
redirect_to.label:
"""重定向至"""
migrate_to.desc:
"""将会话迁移到的节点"""
migrate_to.label:
"""迁移到"""
rebalance_nodes.desc:
"""参与rebalance的节点"""
rebalance_nodes.label:
"""重新平衡节点"""
local_status_enabled.desc:
"""节点是否正在撤离"""
local_status_enabled.label:
"""当地避难状况"""
local_status_process.desc:
"""正在节点上执行的过程:疏散或重新平衡"""
local_status_process.label:
"""节点进程"""
local_status_state.desc:
"""正在节点上执行的进程的状态"""
local_status_state.label:
"""重新平衡/疏散当前状态"""
local_status_coordinator_node.desc:
"""协调再平衡过程的节点"""
local_status_coordinator_node.label:
"""协调节点"""
local_status_connection_eviction_rate.desc:
"""逐出连接的速率,以每秒连接数表示"""
local_status_connection_eviction_rate.label:
"""连接驱逐率"""
local_status_session_eviction_rate.desc:
"""逐出会话的速率,以每秒会话为单位"""
local_status_session_eviction_rate.label:
"""会话驱逐率"""
local_status_connection_goal.desc:
"""节点在重新平衡/疏散过程后应该拥有的连接数"""
local_status_connection_goal.label:
"""连接目标"""
local_status_session_goal.desc:
"""疏散过程后节点应有的会话数"""
local_status_session_goal.label:
"""会话目标"""
local_status_disconnected_session_goal.desc:
"""重新平衡过程后节点应具有的断开连接的会话数"""
local_status_disconnected_session_goal.label:
"""断开连接的会话目标"""
local_status_session_recipients.desc:
"""会话被疏散到的节点列表"""
local_status_session_recipients.label:
"""会话收件人"""
local_status_recipients.desc:
"""在重新平衡期间连接/会话被疏散到的节点列表"""
local_status_recipients.label:
"""收件人"""
local_status_stats.desc:
"""疏散/再平衡过程的统计"""
local_status_stats.label:
"""统计数据"""
status_stats_initial_connected.desc:
"""疏散/重新平衡过程之前节点上的连接数"""
status_stats_initial_connected.label:
"""初始连接"""
status_stats_current_connected.desc:
"""节点上的当前连接数"""
status_stats_current_connected.label:
"""当前连接"""
status_stats_initial_sessions.desc:
"""疏散/重新平衡过程之前节点上的会话数"""
status_stats_initial_sessions.label:
"""初始会话"""
status_stats_current_sessions.desc:
"""节点上的当前会话数"""
status_stats_current_sessions.label:
"""当前会话"""
status_stats_current_disconnected_sessions.desc:
"""节点上当前断开连接的会话数"""
status_stats_current_disconnected_sessions.label:
"""当前断开连接的会话"""
coordinator_status_donors.desc:
"""正在疏散连接/会话的节点列表"""
coordinator_status_donors.label:
"""捐助者"""
coordinator_status_donor_conn_avg.desc:
"""每个供体节点的平均连接数"""
coordinator_status_donor_conn_avg.label:
"""捐助者连接平均值"""
coordinator_status_donor_sess_avg.desc:
"""每个供体节点的平均会话数"""
coordinator_status_donor_sess_avg.label:
"""平均捐助会议"""
coordinator_status_node.desc:
"""协调疏散/再平衡过程的节点"""
coordinator_status_node.label:
"""协调节点"""
evacuation_status_node.desc:
"""正在撤离的节点"""
evacuation_status_node.label:
"""疏散节点"""
global_status_evacuations.desc:
"""正在撤离的节点列表"""
global_status_evacuations.label:
"""疏散"""
global_status_rebalances.desc:
"""协调再平衡的节点列表"""
global_status_rebalances.label:
"""再平衡"""
empty_response.desc:
"""响应为空"""
empty_response.label:
"""空响应"""
}