239 lines
8.5 KiB
Erlang
239 lines
8.5 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_node_rebalance_status).
|
|
|
|
-export([
|
|
local_status/0,
|
|
local_status/1,
|
|
global_status/0,
|
|
format_local_status/1,
|
|
format_coordinator_status/1
|
|
]).
|
|
|
|
%% For RPC
|
|
-export([
|
|
evacuation_status/0,
|
|
rebalance_status/0
|
|
]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec local_status() -> disabled | {evacuation, map()} | {rebalance, map()}.
|
|
local_status() ->
|
|
case emqx_node_rebalance_evacuation:status() of
|
|
{enabled, Status} ->
|
|
{evacuation, evacuation(Status)};
|
|
disabled ->
|
|
case emqx_node_rebalance_agent:status() of
|
|
{enabled, CoordinatorPid} ->
|
|
case emqx_node_rebalance:status(CoordinatorPid) of
|
|
{enabled, Status} ->
|
|
local_rebalance(Status, node());
|
|
disabled ->
|
|
disabled
|
|
end;
|
|
disabled ->
|
|
disabled
|
|
end
|
|
end.
|
|
|
|
-spec local_status(node()) -> disabled | {evacuation, map()} | {rebalance, map()}.
|
|
local_status(Node) ->
|
|
emqx_node_rebalance_status_proto_v1:local_status(Node).
|
|
|
|
-spec format_local_status(map()) -> iodata().
|
|
format_local_status(Status) ->
|
|
format_status(Status, local_status_field_format_order()).
|
|
|
|
-spec global_status() -> #{rebalances := [{node(), map()}], evacuations := [{node(), map()}]}.
|
|
global_status() ->
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
{RebalanceResults, _} = emqx_node_rebalance_status_proto_v1:rebalance_status(Nodes),
|
|
Rebalances = [
|
|
{Node, coordinator_rebalance(Status)}
|
|
|| {Node, {enabled, Status}} <- RebalanceResults
|
|
],
|
|
{EvacuatioResults, _} = emqx_node_rebalance_status_proto_v1:evacuation_status(Nodes),
|
|
Evacuations = [{Node, evacuation(Status)} || {Node, {enabled, Status}} <- EvacuatioResults],
|
|
#{rebalances => Rebalances, evacuations => Evacuations}.
|
|
|
|
-spec format_coordinator_status(map()) -> iodata().
|
|
format_coordinator_status(Status) ->
|
|
format_status(Status, coordinator_status_field_format_order()).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
evacuation(Status) ->
|
|
#{
|
|
state => maps:get(state, Status),
|
|
connection_eviction_rate => maps:get(conn_evict_rate, Status),
|
|
session_eviction_rate => maps:get(sess_evict_rate, Status),
|
|
connection_goal => 0,
|
|
session_goal => 0,
|
|
session_recipients => maps:get(migrate_to, Status),
|
|
stats => #{
|
|
initial_connected => maps:get(initial_conns, Status),
|
|
current_connected => maps:get(current_conns, Status),
|
|
initial_sessions => maps:get(initial_sessions, Status),
|
|
current_sessions => maps:get(current_sessions, Status)
|
|
}
|
|
}.
|
|
|
|
local_rebalance(#{donors := Donors} = Stats, Node) ->
|
|
case lists:member(Node, Donors) of
|
|
true -> {rebalance, donor_rebalance(Stats, Node)};
|
|
false -> disabled
|
|
end.
|
|
|
|
donor_rebalance(Status, Node) ->
|
|
Opts = maps:get(opts, Status),
|
|
InitialConnCounts = maps:get(initial_conn_counts, Status),
|
|
InitialSessCounts = maps:get(initial_sess_counts, Status),
|
|
|
|
CurrentStats = #{
|
|
initial_connected => maps:get(Node, InitialConnCounts),
|
|
initial_sessions => maps:get(Node, InitialSessCounts),
|
|
current_connected => emqx_eviction_agent:connection_count(),
|
|
current_sessions => emqx_eviction_agent:session_count(),
|
|
current_disconnected_sessions => emqx_eviction_agent:session_count(
|
|
disconnected
|
|
)
|
|
},
|
|
maps:from_list(
|
|
[
|
|
{state, maps:get(state, Status)},
|
|
{coordinator_node, maps:get(coordinator_node, Status)},
|
|
{connection_eviction_rate, maps:get(conn_evict_rate, Opts)},
|
|
{session_eviction_rate, maps:get(sess_evict_rate, Opts)},
|
|
{recipients, maps:get(recipients, Status)},
|
|
{stats, CurrentStats}
|
|
] ++
|
|
[
|
|
{connection_goal, maps:get(recipient_conn_avg, Status)}
|
|
|| maps:is_key(recipient_conn_avg, Status)
|
|
] ++
|
|
[
|
|
{disconnected_session_goal, maps:get(recipient_sess_avg, Status)}
|
|
|| maps:is_key(recipient_sess_avg, Status)
|
|
]
|
|
).
|
|
|
|
coordinator_rebalance(Status) ->
|
|
Opts = maps:get(opts, Status),
|
|
maps:from_list(
|
|
[
|
|
{state, maps:get(state, Status)},
|
|
{coordinator_node, maps:get(coordinator_node, Status)},
|
|
{connection_eviction_rate, maps:get(conn_evict_rate, Opts)},
|
|
{session_eviction_rate, maps:get(sess_evict_rate, Opts)},
|
|
{recipients, maps:get(recipients, Status)},
|
|
{donors, maps:get(donors, Status)}
|
|
] ++
|
|
[
|
|
{connection_goal, maps:get(recipient_conn_avg, Status)}
|
|
|| maps:is_key(recipient_conn_avg, Status)
|
|
] ++
|
|
[
|
|
{disconnected_session_goal, maps:get(recipient_sess_avg, Status)}
|
|
|| maps:is_key(recipient_sess_avg, Status)
|
|
] ++
|
|
[
|
|
{donor_conn_avg, maps:get(donor_conn_avg, Status)}
|
|
|| maps:is_key(donor_conn_avg, Status)
|
|
] ++
|
|
[
|
|
{donor_sess_avg, maps:get(donor_sess_avg, Status)}
|
|
|| maps:is_key(donor_sess_avg, Status)
|
|
]
|
|
).
|
|
|
|
local_status_field_format_order() ->
|
|
[
|
|
state,
|
|
coordinator_node,
|
|
connection_eviction_rate,
|
|
session_eviction_rate,
|
|
connection_goal,
|
|
session_goal,
|
|
disconnected_session_goal,
|
|
session_recipients,
|
|
recipients,
|
|
stats
|
|
].
|
|
|
|
coordinator_status_field_format_order() ->
|
|
[
|
|
state,
|
|
coordinator_node,
|
|
donors,
|
|
recipients,
|
|
connection_eviction_rate,
|
|
session_eviction_rate,
|
|
connection_goal,
|
|
disconnected_session_goal,
|
|
donor_conn_avg,
|
|
donor_sess_avg
|
|
].
|
|
|
|
format_status(Status, FieldOrder) ->
|
|
Fields = lists:flatmap(
|
|
fun(FieldName) ->
|
|
maps:to_list(maps:with([FieldName], Status))
|
|
end,
|
|
FieldOrder
|
|
),
|
|
lists:map(
|
|
fun format_local_status_field/1,
|
|
Fields
|
|
).
|
|
|
|
format_local_status_field({state, State}) ->
|
|
io_lib:format("Rebalance state: ~p~n", [State]);
|
|
format_local_status_field({coordinator_node, Node}) ->
|
|
io_lib:format("Coordinator node: ~p~n", [Node]);
|
|
format_local_status_field({connection_eviction_rate, ConnEvictRate}) ->
|
|
io_lib:format("Connection eviction rate: ~p connections/second~n", [ConnEvictRate]);
|
|
format_local_status_field({session_eviction_rate, SessEvictRate}) ->
|
|
io_lib:format("Session eviction rate: ~p sessions/second~n", [SessEvictRate]);
|
|
format_local_status_field({connection_goal, ConnGoal}) ->
|
|
io_lib:format("Connection goal: ~p~n", [ConnGoal]);
|
|
format_local_status_field({session_goal, SessGoal}) ->
|
|
io_lib:format("Session goal: ~p~n", [SessGoal]);
|
|
format_local_status_field({disconnected_session_goal, DisconnSessGoal}) ->
|
|
io_lib:format("Disconnected session goal: ~p~n", [DisconnSessGoal]);
|
|
format_local_status_field({session_recipients, SessionRecipients}) ->
|
|
io_lib:format("Session recipient nodes: ~p~n", [SessionRecipients]);
|
|
format_local_status_field({recipients, Recipients}) ->
|
|
io_lib:format("Recipient nodes: ~p~n", [Recipients]);
|
|
format_local_status_field({donors, Donors}) ->
|
|
io_lib:format("Donor nodes: ~p~n", [Donors]);
|
|
format_local_status_field({donor_conn_avg, DonorConnAvg}) ->
|
|
io_lib:format("Current average donor node connection count: ~p~n", [DonorConnAvg]);
|
|
format_local_status_field({donor_sess_avg, DonorSessAvg}) ->
|
|
io_lib:format("Current average donor node disconnected session count: ~p~n", [DonorSessAvg]);
|
|
format_local_status_field({stats, Stats}) ->
|
|
format_local_stats(Stats).
|
|
|
|
format_local_stats(Stats) ->
|
|
[
|
|
"Channel statistics:\n"
|
|
| lists:map(
|
|
fun({Name, Value}) ->
|
|
io_lib:format(" ~p: ~p~n", [Name, Value])
|
|
end,
|
|
maps:to_list(Stats)
|
|
)
|
|
].
|
|
|
|
evacuation_status() ->
|
|
{node(), emqx_node_rebalance_evacuation:status()}.
|
|
|
|
rebalance_status() ->
|
|
{node(), emqx_node_rebalance:status()}.
|