453 lines
14 KiB
Erlang
453 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_node_rebalance).
|
|
|
|
-include("emqx_node_rebalance.hrl").
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx/include/types.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-export([
|
|
start/1,
|
|
status/0,
|
|
status/1,
|
|
stop/0
|
|
]).
|
|
|
|
-export([start_link/0]).
|
|
|
|
-behaviour(gen_statem).
|
|
|
|
-export([
|
|
init/1,
|
|
callback_mode/0,
|
|
handle_event/4,
|
|
code_change/4
|
|
]).
|
|
|
|
-export([
|
|
is_node_available/0,
|
|
available_nodes/1,
|
|
connection_count/0,
|
|
session_count/0,
|
|
disconnected_session_count/0
|
|
]).
|
|
|
|
-export_type([
|
|
start_opts/0,
|
|
start_error/0
|
|
]).
|
|
|
|
-define(ENABLE_KIND, ?MODULE).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
-type start_opts() :: #{
|
|
conn_evict_rate => pos_integer(),
|
|
sess_evict_rate => pos_integer(),
|
|
wait_health_check => number(),
|
|
wait_takeover => number(),
|
|
abs_conn_threshold => pos_integer(),
|
|
rel_conn_threshold => number(),
|
|
abs_sess_threshold => pos_integer(),
|
|
rel_sess_threshold => number(),
|
|
nodes => [node()]
|
|
}.
|
|
-type start_error() :: already_started | [{node(), term()}].
|
|
|
|
-spec start(start_opts()) -> ok_or_error(start_error()).
|
|
start(StartOpts) ->
|
|
Opts = maps:merge(default_opts(), StartOpts),
|
|
gen_statem:call(?MODULE, {start, Opts}).
|
|
|
|
-spec stop() -> ok_or_error(not_started).
|
|
stop() ->
|
|
gen_statem:call(?MODULE, stop).
|
|
|
|
-spec status() -> disabled | {enabled, map()}.
|
|
status() ->
|
|
gen_statem:call(?MODULE, status).
|
|
|
|
-spec status(pid()) -> disabled | {enabled, map()}.
|
|
status(Pid) ->
|
|
gen_statem:call(Pid, status).
|
|
|
|
-spec start_link() -> startlink_ret().
|
|
start_link() ->
|
|
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
|
|
-spec available_nodes(list(node())) -> list(node()).
|
|
available_nodes(Nodes) when is_list(Nodes) ->
|
|
{Available, _} = emqx_node_rebalance_proto_v2:available_nodes(Nodes),
|
|
lists:filter(fun is_atom/1, Available).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% gen_statem callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
callback_mode() -> handle_event_function.
|
|
|
|
%% states: disabled, wait_health_check, evicting_conns, wait_takeover, evicting_sessions
|
|
|
|
init([]) ->
|
|
?tp(debug, emqx_node_rebalance_started, #{}),
|
|
{ok, disabled, #{}}.
|
|
|
|
%% start
|
|
handle_event(
|
|
{call, From},
|
|
{start, #{wait_health_check := WaitHealthCheck} = Opts},
|
|
disabled,
|
|
#{} = Data
|
|
) ->
|
|
case enable_rebalance(Data#{opts => Opts}) of
|
|
{ok, NewData} ->
|
|
?SLOG(warning, #{msg => "node_rebalance_enabled", opts => Opts}),
|
|
{next_state, wait_health_check, NewData, [
|
|
{state_timeout, seconds(WaitHealthCheck), evict_conns},
|
|
{reply, From, ok}
|
|
]};
|
|
{error, Reason} ->
|
|
?SLOG(warning, #{
|
|
msg => "node_rebalance_enable_failed",
|
|
reason => Reason
|
|
}),
|
|
{keep_state_and_data, [{reply, From, {error, Reason}}]}
|
|
end;
|
|
handle_event({call, From}, {start, _Opts}, _State, #{}) ->
|
|
{keep_state_and_data, [{reply, From, {error, already_started}}]};
|
|
%% stop
|
|
handle_event({call, From}, stop, disabled, #{}) ->
|
|
{keep_state_and_data, [{reply, From, {error, not_started}}]};
|
|
handle_event({call, From}, stop, _State, Data) ->
|
|
ok = disable_rebalance(Data),
|
|
?SLOG(warning, #{msg => "node_rebalance_stopped"}),
|
|
{next_state, disabled, deinit(Data), [{reply, From, ok}]};
|
|
%% status
|
|
handle_event({call, From}, status, disabled, #{}) ->
|
|
{keep_state_and_data, [{reply, From, disabled}]};
|
|
handle_event({call, From}, status, State, Data) ->
|
|
Stats = get_stats(State, Data),
|
|
{keep_state_and_data, [
|
|
{reply, From,
|
|
{enabled, Stats#{
|
|
state => State,
|
|
coordinator_node => node()
|
|
}}}
|
|
]};
|
|
%% conn eviction
|
|
handle_event(
|
|
state_timeout,
|
|
evict_conns,
|
|
wait_health_check,
|
|
#{donors := DonorNodes} = Data
|
|
) ->
|
|
?SLOG(warning, #{msg => "node_rebalance_wait_health_check_over"}),
|
|
_ = multicall(DonorNodes, enable_rebalance_agent, [
|
|
self(), ?ENABLE_KIND, #{allow_connections => false}
|
|
]),
|
|
?tp(debug, node_rebalance_enable_started_prohibiting, #{}),
|
|
{next_state, evicting_conns, Data, [{state_timeout, 0, evict_conns}]};
|
|
handle_event(
|
|
state_timeout,
|
|
evict_conns,
|
|
evicting_conns,
|
|
#{
|
|
opts := #{
|
|
wait_takeover := WaitTakeover,
|
|
evict_interval := EvictInterval
|
|
}
|
|
} = Data
|
|
) ->
|
|
case evict_conns(Data) of
|
|
ok ->
|
|
?SLOG(warning, #{msg => "node_rebalance_evict_conns_over"}),
|
|
{next_state, wait_takeover, Data, [
|
|
{state_timeout, seconds(WaitTakeover), evict_sessions}
|
|
]};
|
|
{continue, NewData} ->
|
|
{keep_state, NewData, [{state_timeout, EvictInterval, evict_conns}]}
|
|
end;
|
|
handle_event(
|
|
state_timeout,
|
|
evict_sessions,
|
|
wait_takeover,
|
|
Data
|
|
) ->
|
|
?SLOG(warning, #{msg => "node_rebalance_wait_takeover_over"}),
|
|
{next_state, evicting_sessions, Data, [{state_timeout, 0, evict_sessions}]};
|
|
handle_event(
|
|
state_timeout,
|
|
evict_sessions,
|
|
evicting_sessions,
|
|
#{opts := #{evict_interval := EvictInterval}} = Data
|
|
) ->
|
|
case evict_sessions(Data) of
|
|
ok ->
|
|
?tp(debug, emqx_node_rebalance_evict_sess_over, #{}),
|
|
?SLOG(warning, #{msg => "node_rebalance_evict_sessions_over"}),
|
|
ok = disable_rebalance(Data),
|
|
?SLOG(warning, #{msg => "node_rebalance_finished_successfully"}),
|
|
{next_state, disabled, deinit(Data)};
|
|
{continue, NewData} ->
|
|
{keep_state, NewData, [{state_timeout, EvictInterval, evict_sessions}]}
|
|
end;
|
|
handle_event({call, From}, Msg, _State, _Data) ->
|
|
?SLOG(warning, #{msg => "node_rebalance_unknown_call", call => Msg}),
|
|
{keep_state_and_data, [{reply, From, ignored}]};
|
|
handle_event(info, Msg, _State, _Data) ->
|
|
?SLOG(warning, #{msg => "node_rebalance_unknown_info", info => Msg}),
|
|
keep_state_and_data;
|
|
handle_event(cast, Msg, _State, _Data) ->
|
|
?SLOG(warning, #{msg => "node_rebalance_unknown_cast", cast => Msg}),
|
|
keep_state_and_data.
|
|
|
|
code_change(_Vsn, State, Data, _Extra) ->
|
|
{ok, State, Data}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% internal funs
|
|
%%--------------------------------------------------------------------
|
|
|
|
enable_rebalance(#{opts := Opts} = Data) ->
|
|
Nodes = maps:get(nodes, Opts),
|
|
ConnCounts = multicall(Nodes, connection_counts, []),
|
|
SessCounts = multicall(Nodes, session_counts, []),
|
|
{_, Counts} = lists:unzip(ConnCounts),
|
|
Avg = avg(Counts),
|
|
{DonorCounts, RecipientCounts} = lists:partition(
|
|
fun({_Node, Count}) ->
|
|
Count >= Avg
|
|
end,
|
|
ConnCounts
|
|
),
|
|
?SLOG(warning, #{
|
|
msg => "node_rebalance_enabling",
|
|
conn_counts => ConnCounts,
|
|
donor_counts => DonorCounts,
|
|
recipient_counts => RecipientCounts
|
|
}),
|
|
{DonorNodes, _} = lists:unzip(DonorCounts),
|
|
{RecipientNodes, _} = lists:unzip(RecipientCounts),
|
|
case need_rebalance(DonorNodes, RecipientNodes, ConnCounts, SessCounts, Opts) of
|
|
false ->
|
|
{error, nothing_to_balance};
|
|
true ->
|
|
_ = multicall(DonorNodes, enable_rebalance_agent, [
|
|
self(), ?ENABLE_KIND, #{allow_connections => true}
|
|
]),
|
|
{ok, Data#{
|
|
donors => DonorNodes,
|
|
recipients => RecipientNodes,
|
|
initial_conn_counts => maps:from_list(ConnCounts),
|
|
initial_sess_counts => maps:from_list(SessCounts)
|
|
}}
|
|
end.
|
|
|
|
disable_rebalance(#{donors := DonorNodes}) ->
|
|
_ = multicall(DonorNodes, disable_rebalance_agent, [self(), ?ENABLE_KIND]),
|
|
ok.
|
|
|
|
evict_conns(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts} = Data) ->
|
|
DonorNodeCounts = multicall(DonorNodes, connection_counts, []),
|
|
{_, DonorCounts} = lists:unzip(DonorNodeCounts),
|
|
RecipientNodeCounts = multicall(RecipientNodes, connection_counts, []),
|
|
{_, RecipientCounts} = lists:unzip(RecipientNodeCounts),
|
|
|
|
DonorAvg = avg(DonorCounts),
|
|
RecipientAvg = avg(RecipientCounts),
|
|
Thresholds = thresholds(conn, Opts),
|
|
NewData = Data#{
|
|
donor_conn_avg => DonorAvg,
|
|
recipient_conn_avg => RecipientAvg,
|
|
donor_conn_counts => maps:from_list(DonorNodeCounts),
|
|
recipient_conn_counts => maps:from_list(RecipientNodeCounts)
|
|
},
|
|
case within_thresholds(DonorAvg, RecipientAvg, Thresholds) of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
ConnEvictRate = maps:get(conn_evict_rate, Opts),
|
|
NodesToEvict = nodes_to_evict(RecipientAvg, DonorNodeCounts),
|
|
?SLOG(warning, #{
|
|
donor_conn_avg => DonorAvg,
|
|
recipient_conn_avg => RecipientAvg,
|
|
thresholds => Thresholds,
|
|
msg => "node_rebalance_evict_conns",
|
|
nodes => NodesToEvict,
|
|
counts => ConnEvictRate
|
|
}),
|
|
_ = multicall(NodesToEvict, evict_connections, [ConnEvictRate]),
|
|
{continue, NewData}
|
|
end.
|
|
|
|
evict_sessions(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts} = Data) ->
|
|
DonorNodeCounts = multicall(DonorNodes, disconnected_session_counts, []),
|
|
{_, DonorCounts} = lists:unzip(DonorNodeCounts),
|
|
RecipientNodeCounts = multicall(RecipientNodes, disconnected_session_counts, []),
|
|
{_, RecipientCounts} = lists:unzip(RecipientNodeCounts),
|
|
|
|
DonorAvg = avg(DonorCounts),
|
|
RecipientAvg = avg(RecipientCounts),
|
|
Thresholds = thresholds(sess, Opts),
|
|
NewData = Data#{
|
|
donor_sess_avg => DonorAvg,
|
|
recipient_sess_avg => RecipientAvg,
|
|
donor_sess_counts => maps:from_list(DonorNodeCounts),
|
|
recipient_sess_counts => maps:from_list(RecipientNodeCounts)
|
|
},
|
|
case within_thresholds(DonorAvg, RecipientAvg, Thresholds) of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
SessEvictRate = maps:get(sess_evict_rate, Opts),
|
|
NodesToEvict = nodes_to_evict(RecipientAvg, DonorNodeCounts),
|
|
?SLOG(warning, #{
|
|
donor_sess_avg => DonorAvg,
|
|
recipient_sess_avg => RecipientAvg,
|
|
thresholds => Thresholds,
|
|
msg => "node_rebalance_evict_sessions",
|
|
nodes => NodesToEvict,
|
|
counts => SessEvictRate
|
|
}),
|
|
_ = multicall(
|
|
NodesToEvict,
|
|
evict_sessions,
|
|
[SessEvictRate, RecipientNodes, disconnected]
|
|
),
|
|
{continue, NewData}
|
|
end.
|
|
|
|
need_rebalance([] = _DonorNodes, _RecipientNodes, _ConnCounts, _SessCounts, _Opts) ->
|
|
false;
|
|
need_rebalance(_DonorNodes, [] = _RecipientNodes, _ConnCounts, _SessCounts, _Opts) ->
|
|
false;
|
|
need_rebalance(DonorNodes, RecipientNodes, ConnCounts, SessCounts, Opts) ->
|
|
DonorConnAvg = avg_for_nodes(DonorNodes, ConnCounts),
|
|
RecipientConnAvg = avg_for_nodes(RecipientNodes, ConnCounts),
|
|
DonorSessAvg = avg_for_nodes(DonorNodes, SessCounts),
|
|
RecipientSessAvg = avg_for_nodes(RecipientNodes, SessCounts),
|
|
Result =
|
|
(not within_thresholds(DonorConnAvg, RecipientConnAvg, thresholds(conn, Opts))) orelse
|
|
(not within_thresholds(DonorSessAvg, RecipientSessAvg, thresholds(sess, Opts))),
|
|
?tp(
|
|
debug,
|
|
emqx_node_rebalance_need_rebalance,
|
|
#{
|
|
donors => DonorNodes,
|
|
recipients => RecipientNodes,
|
|
conn_counts => ConnCounts,
|
|
sess_counts => SessCounts,
|
|
opts => Opts,
|
|
result => Result
|
|
}
|
|
),
|
|
Result.
|
|
|
|
avg_for_nodes(Nodes, Counts) ->
|
|
avg(maps:values(maps:with(Nodes, maps:from_list(Counts)))).
|
|
|
|
within_thresholds(Value, GoalValue, {AbsThres, RelThres}) ->
|
|
(Value =< GoalValue + AbsThres) orelse (Value =< GoalValue * RelThres).
|
|
|
|
thresholds(conn, #{abs_conn_threshold := Abs, rel_conn_threshold := Rel}) ->
|
|
{Abs, Rel};
|
|
thresholds(sess, #{abs_sess_threshold := Abs, rel_sess_threshold := Rel}) ->
|
|
{Abs, Rel}.
|
|
|
|
nodes_to_evict(Goal, NodeCounts) ->
|
|
{Nodes, _} = lists:unzip(
|
|
lists:filter(
|
|
fun({_Node, Count}) ->
|
|
Count > Goal
|
|
end,
|
|
NodeCounts
|
|
)
|
|
),
|
|
Nodes.
|
|
|
|
get_stats(disabled, _Data) -> #{};
|
|
get_stats(_State, Data) -> Data.
|
|
|
|
avg(List) when length(List) >= 1 ->
|
|
lists:sum(List) / length(List).
|
|
|
|
multicall(Nodes, F, A) ->
|
|
case apply(emqx_node_rebalance_proto_v3, F, [Nodes | A]) of
|
|
{Results, []} ->
|
|
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
|
|
{OkResults, []} ->
|
|
[{Node, ok_result(Result)} || {Node, Result} <- OkResults];
|
|
{_, BadResults} ->
|
|
error({bad_nodes, BadResults})
|
|
end;
|
|
{_, [_BadNode | _] = BadNodes} ->
|
|
error({bad_nodes, BadNodes})
|
|
end.
|
|
|
|
is_ok({_Node, {ok, _}}) -> true;
|
|
is_ok({_Node, ok}) -> true;
|
|
is_ok(_) -> false.
|
|
|
|
ok_result({ok, Result}) -> Result;
|
|
ok_result(ok) -> ok.
|
|
|
|
connection_count() ->
|
|
{ok, emqx_eviction_agent:connection_count()}.
|
|
|
|
session_count() ->
|
|
{ok, emqx_eviction_agent:session_count()}.
|
|
|
|
disconnected_session_count() ->
|
|
{ok, emqx_eviction_agent:session_count(disconnected)}.
|
|
|
|
default_opts() ->
|
|
#{
|
|
conn_evict_rate => ?DEFAULT_CONN_EVICT_RATE,
|
|
abs_conn_threshold => ?DEFAULT_ABS_CONN_THRESHOLD,
|
|
rel_conn_threshold => ?DEFAULT_REL_CONN_THRESHOLD,
|
|
|
|
sess_evict_rate => ?DEFAULT_SESS_EVICT_RATE,
|
|
abs_sess_threshold => ?DEFAULT_ABS_SESS_THRESHOLD,
|
|
rel_sess_threshold => ?DEFAULT_REL_SESS_THRESHOLD,
|
|
|
|
wait_health_check => ?DEFAULT_WAIT_HEALTH_CHECK,
|
|
wait_takeover => ?DEFAULT_WAIT_TAKEOVER,
|
|
|
|
evict_interval => ?EVICT_INTERVAL,
|
|
|
|
nodes => all_nodes()
|
|
}.
|
|
|
|
deinit(Data) ->
|
|
Keys = [
|
|
recipient_conn_avg,
|
|
recipient_sess_avg,
|
|
donor_conn_avg,
|
|
donor_sess_avg,
|
|
recipient_conn_counts,
|
|
recipient_sess_counts,
|
|
donor_conn_counts,
|
|
donor_sess_counts,
|
|
initial_conn_counts,
|
|
initial_sess_counts,
|
|
opts
|
|
],
|
|
maps:without(Keys, Data).
|
|
|
|
is_node_available() ->
|
|
true = is_pid(whereis(emqx_node_rebalance_agent)),
|
|
disabled = emqx_eviction_agent:status(),
|
|
node().
|
|
|
|
all_nodes() ->
|
|
emqx:running_nodes().
|
|
|
|
seconds(Sec) ->
|
|
round(timer:seconds(Sec)).
|