Merge pull request #10967 from savonarola/0607-rebalance-fixes

Fix rebalance issues
This commit is contained in:
Ilya Averyanov 2023-06-08 08:29:37 +03:00 committed by GitHub
commit b9f1a70214
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 98 additions and 53 deletions

View File

@ -48,8 +48,8 @@
-type start_opts() :: #{
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_health_check => pos_integer(),
wait_takeover => pos_integer(),
wait_health_check => number(),
wait_takeover => number(),
abs_conn_threshold => pos_integer(),
rel_conn_threshold => number(),
abs_sess_threshold => pos_integer(),
@ -438,7 +438,7 @@ is_node_available() ->
node().
all_nodes() ->
mria_mnesia:running_nodes().
emqx:running_nodes().
seconds(Sec) ->
round(timer:seconds(Sec)).

View File

@ -202,10 +202,10 @@ schema("/load_rebalance/:node/evacuation/stop") ->
}}.
'/load_rebalance/availability_check'(get, #{}) ->
case emqx_eviction_agent:status() of
case emqx_node_rebalance_status:local_status() of
disabled ->
{200, #{}};
{enabled, _Stats} ->
_ ->
error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
end.
@ -258,11 +258,11 @@ wrap_rpc(Node, RPCResult) ->
{200, #{}};
{error, Reason} ->
error_response(
400, ?BAD_REQUEST, io_lib:format("error on node ~p: ~p", [Node, Reason])
400, ?BAD_REQUEST, binfmt("error on node ~p: ~p", [Node, Reason])
);
{badrpc, Reason} ->
error_response(
503, ?RPC_ERROR, io_lib:format("RPC error on node ~p: ~p", [Node, Reason])
503, ?RPC_ERROR, binfmt("RPC error on node ~p: ~p", [Node, Reason])
)
end.
@ -299,9 +299,9 @@ with_nodes_at_key(Key, Params, Fun) ->
{ok, Params1} ->
Fun(Params1);
{error, {unavailable, Nodes}} ->
error_response(400, ?NOT_FOUND, io_lib:format("Nodes unavailable: ~p", [Nodes]));
error_response(400, ?NOT_FOUND, binfmt("Nodes unavailable: ~p", [Nodes]));
{error, {invalid, Nodes}} ->
error_response(400, ?BAD_REQUEST, io_lib:format("Invalid nodes: ~p", [Nodes]))
error_response(400, ?BAD_REQUEST, binfmt("Invalid nodes: ~p", [Nodes]))
end.
parse_node(Bin) when is_binary(Bin) ->
@ -331,6 +331,8 @@ without(Keys, Props) ->
Props
).
binfmt(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).
%%------------------------------------------------------------------------------
%% Schema
%%------------------------------------------------------------------------------
@ -432,6 +434,14 @@ fields(rebalance_start) ->
];
fields(rebalance_evacuation_start) ->
[
{"wait_health_check",
mk(
emqx_schema:timeout_duration_s(),
#{
desc => ?DESC(wait_health_check),
required => false
}
)},
{"conn_evict_rate",
mk(
pos_integer(),
@ -712,6 +722,7 @@ rebalance_example() ->
rebalance_evacuation_example() ->
#{
wait_health_check => 10,
conn_evict_rate => 100,
sess_evict_rate => 100,
redirect_to => <<"othernode:1883">>,

View File

@ -103,6 +103,7 @@ cli(_) ->
[
{
"rebalance start --evacuation \\\n"
" [--wait-health-check Secs] \\\n"
" [--redirect-to \"Host1:Port1 Host2:Port2 ...\"] \\\n"
" [--conn-evict-rate CountPerSec] \\\n"
" [--migrate-to \"node1@host1 node2@host2 ...\"] \\\n"
@ -182,8 +183,6 @@ collect_args(["--migrate-to", MigrateTo | Args], Map) ->
%% rebalance
collect_args(["--nodes", Nodes | Args], Map) ->
collect_args(Args, Map#{"--nodes" => Nodes});
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
collect_args(["--abs-conn-threshold", AbsConnThres | Args], Map) ->
collect_args(Args, Map#{"--abs-conn-threshold" => AbsConnThres});
collect_args(["--rel-conn-threshold", RelConnThres | Args], Map) ->
@ -193,6 +192,8 @@ collect_args(["--abs-sess-threshold", AbsSessThres | Args], Map) ->
collect_args(["--rel-sess-threshold", RelSessThres | Args], Map) ->
collect_args(Args, Map#{"--rel-sess-threshold" => RelSessThres});
%% common
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
collect_args(["--conn-evict-rate", ConnEvictRate | Args], Map) ->
collect_args(Args, Map#{"--conn-evict-rate" => ConnEvictRate});
collect_args(["--wait-takeover", WaitTakeover | Args], Map) ->
@ -207,6 +208,8 @@ validate_evacuation([], Map) ->
{ok, Map};
validate_evacuation([{"--evacuation", _} | Rest], Map) ->
validate_evacuation(Rest, Map);
validate_evacuation([{"--wait-health-check", _} | _] = Opts, Map) ->
validate_pos_int(wait_health_check, Opts, Map, fun validate_evacuation/2);
validate_evacuation([{"--redirect-to", ServerReference} | Rest], Map) ->
validate_evacuation(Rest, Map#{server_reference => list_to_binary(ServerReference)});
validate_evacuation([{"--conn-evict-rate", _} | _] = Opts, Map) ->

View File

@ -53,10 +53,11 @@
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer(),
migrate_to => migrate_to()
wait_takeover => number(),
migrate_to => migrate_to(),
wait_health_check => number()
}.
-type start_error() :: already_started | eviction_agent_busy.
-type start_error() :: already_started.
-type stats() :: #{
initial_conns := non_neg_integer(),
initial_sessions := non_neg_integer(),
@ -97,7 +98,7 @@ available_nodes(Nodes) when is_list(Nodes) ->
callback_mode() -> handle_event_function.
%% states: disabled, evicting_conns, waiting_takeover, evicting_sessions, prohibiting
%% states: disabled, waiting_health_check, evicting_conns, waiting_takeover, evicting_sessions, prohibiting
init([]) ->
case emqx_node_rebalance_evacuation_persist:read(default_opts()) of
@ -119,25 +120,20 @@ init([]) ->
%% start
handle_event(
{call, From},
{start, #{server_reference := ServerReference} = Opts},
{start, #{wait_health_check := WaitHealthCheck} = Opts},
disabled,
#{} = Data
) ->
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
ok ->
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
{next_state, evicting_conns, NewData, [
{state_timeout, 0, evict_conns},
{reply, From, ok}
]};
{error, eviction_agent_busy} ->
{keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]}
end;
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
{next_state, waiting_health_check, NewData, [
{state_timeout, seconds(WaitHealthCheck), start_eviction},
{reply, From, ok}
]};
handle_event({call, From}, {start, _Opts}, _State, #{}) ->
{keep_state_and_data, [{reply, From, {error, already_started}}]};
%% stop
@ -167,6 +163,27 @@ handle_event({call, From}, status, State, #{migrate_to := MigrateTo} = Data) ->
{keep_state_and_data, [
{reply, From, {enabled, Stats#{state => State, migrate_to => migrate_to(MigrateTo)}}}
]};
%% start eviction
handle_event(
state_timeout,
start_eviction,
waiting_health_check,
#{server_reference := ServerReference} = Data
) ->
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
ok ->
?tp(debug, eviction_agent_started, #{
data => Data
}),
{next_state, evicting_conns, Data, [
{state_timeout, 0, evict_conns}
]};
{error, eviction_agent_busy} ->
?tp(warning, eviction_agent_busy, #{
data => Data
}),
{next_state, disabled, deinit(Data)}
end;
%% conn eviction
handle_event(
state_timeout,
@ -270,12 +287,14 @@ default_opts() ->
conn_evict_rate => ?DEFAULT_CONN_EVICT_RATE,
sess_evict_rate => ?DEFAULT_SESS_EVICT_RATE,
wait_takeover => ?DEFAULT_WAIT_TAKEOVER,
wait_health_check => ?DEFAULT_WAIT_HEALTH_CHECK,
migrate_to => undefined
}.
init_data(Data0, Opts) ->
Data1 = maps:merge(Data0, Opts),
{enabled, #{connections := ConnCount, sessions := SessCount}} = emqx_eviction_agent:status(),
ConnCount = emqx_eviction_agent:connection_count(),
SessCount = emqx_eviction_agent:session_count(),
Data1#{
initial_conns => ConnCount,
current_conns => ConnCount,
@ -305,4 +324,7 @@ is_node_available() ->
node().
all_nodes() ->
mria_mnesia:running_nodes() -- [node()].
emqx:running_nodes() -- [node()].
seconds(Sec) ->
round(timer:seconds(Sec)).

View File

@ -21,24 +21,16 @@
%% APIs
%%--------------------------------------------------------------------
%% do not persist `migrate_to`:
%% * after restart there is nothing to migrate
%% * this value may be invalid after node was offline
-type persisted_start_opts() :: #{
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer()
}.
-type start_opts() :: #{
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer(),
migrate_to => emqx_node_rebalance_evacuation:migrate_to()
wait_takeover => number(),
migrate_to => emqx_node_rebalance_evacuation:migrate_to(),
wait_health_check => number()
}.
-spec save(persisted_start_opts()) -> ok_or_error(term()).
-spec save(start_opts()) -> ok_or_error(term()).
save(
#{
server_reference := ServerReference,
@ -50,7 +42,7 @@ save(
(is_binary(ServerReference) orelse ServerReference =:= undefined) andalso
is_integer(ConnEvictRate) andalso ConnEvictRate > 0 andalso
is_integer(SessEvictRate) andalso SessEvictRate > 0 andalso
is_integer(WaitTakeover) andalso WaitTakeover >= 0
is_number(WaitTakeover) andalso WaitTakeover >= 0
->
Filepath = evacuation_filepath(),
case filelib:ensure_dir(Filepath) of

View File

@ -69,6 +69,7 @@ t_start_evacuation_validation(Config) ->
#{sess_evict_rate => <<"sess">>},
#{redirect_to => 123},
#{wait_takeover => <<"wait">>},
#{wait_health_check => <<"wait">>},
#{migrate_to => []},
#{migrate_to => <<"migrate_to">>},
#{migrate_to => [<<"bad_node">>]},
@ -103,6 +104,7 @@ t_start_evacuation_validation(Config) ->
conn_evict_rate => 10,
sess_evict_rate => 10,
wait_takeover => 10,
wait_health_check => 10,
redirect_to => <<"srv">>,
migrate_to => [atom_to_binary(RecipientNode)]
}

View File

@ -86,11 +86,13 @@ end_per_testcase(_Case, Config) ->
t_agent_busy(Config) ->
[{DonorNode, _DonorPort}] = ?config(cluster_nodes, Config),
ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]),
?assertEqual(
{error, eviction_agent_busy},
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)])
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_busy},
5000
).
t_already_started(Config) ->
@ -115,7 +117,12 @@ t_start(Config) ->
[{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_started},
5000
),
?assertMatch(
{error, {use_another_server, #{}}},
emqtt_try_connect([{port, DonorPort}])
@ -126,7 +133,11 @@ t_persistence(Config) ->
[{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_started},
5000
),
?assertMatch(
{error, {use_another_server, #{}}},
@ -179,7 +190,7 @@ t_conn_evicted(Config) ->
?assertWaitEvent(
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := node_evacuation_evict_conn},
1000
5000
),
?assertMatch(
@ -251,6 +262,7 @@ opts(Config) ->
conn_evict_rate => 10,
sess_evict_rate => 10,
wait_takeover => 1,
wait_health_check => 1,
migrate_to => migrate_to(Config)
}.

View File

@ -0,0 +1,3 @@
Fixed error message formatting in rebalance API: previously they could be displayed as unclear dumps of internal Erlang structures.
Added `wait_health_check` option to node evacuation CLI and API. This is a time interval when the node reports "unhealthy status" without beginning actual evacuation. We need this to allow a Load Balancer (if any) to remove the evacuated node from balancing and not forward (re)connecting clients to the evacuated node.

View File

@ -49,7 +49,7 @@ param_node.label:
"""Node name"""
wait_health_check.desc:
"""Time to wait before starting the rebalance process, in seconds"""
"""Time to wait before starting the rebalance/evacuation process, in seconds"""
wait_health_check.label:
"""Wait health check"""