306 lines
12 KiB
Erlang
306 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_node_rebalance_cli).
|
|
|
|
%% APIs
|
|
-export([
|
|
load/0,
|
|
unload/0,
|
|
cli/1
|
|
]).
|
|
|
|
load() ->
|
|
emqx_ctl:register_command(rebalance, {?MODULE, cli}, []).
|
|
|
|
unload() ->
|
|
emqx_ctl:unregister_command(rebalance).
|
|
|
|
cli(["start" | StartArgs]) ->
|
|
case start_args(StartArgs) of
|
|
{evacuation, Opts} ->
|
|
case emqx_node_rebalance_evacuation:status() of
|
|
disabled ->
|
|
ok = emqx_node_rebalance_evacuation:start(Opts),
|
|
emqx_ctl:print("Rebalance(evacuation) started~n"),
|
|
true;
|
|
{enabled, _} ->
|
|
emqx_ctl:print("Rebalance is already enabled~n"),
|
|
false
|
|
end;
|
|
{rebalance, Opts} ->
|
|
case emqx_node_rebalance:start(Opts) of
|
|
ok ->
|
|
emqx_ctl:print("Rebalance started~n"),
|
|
true;
|
|
{error, Reason} ->
|
|
emqx_ctl:print("Rebalance start error: ~p~n", [Reason]),
|
|
false
|
|
end;
|
|
{error, Error} ->
|
|
emqx_ctl:print("Rebalance start error: ~s~n", [Error]),
|
|
false
|
|
end;
|
|
cli(["node-status", NodeStr]) ->
|
|
case emqx_utils:safe_to_existing_atom(NodeStr, utf8) of
|
|
{ok, Node} ->
|
|
node_status(emqx_node_rebalance_status:local_status(Node));
|
|
{error, _} ->
|
|
emqx_ctl:print("Node status error: invalid node~n"),
|
|
false
|
|
end;
|
|
cli(["node-status"]) ->
|
|
node_status(emqx_node_rebalance_status:local_status());
|
|
cli(["status"]) ->
|
|
#{
|
|
evacuations := Evacuations,
|
|
rebalances := Rebalances
|
|
} = emqx_node_rebalance_status:global_status(),
|
|
lists:foreach(
|
|
fun({Node, Status}) ->
|
|
emqx_ctl:print(
|
|
"--------------------------------------------------------------------~n"
|
|
),
|
|
emqx_ctl:print(
|
|
"Node ~p: evacuation~n~s",
|
|
[Node, emqx_node_rebalance_status:format_local_status(Status)]
|
|
)
|
|
end,
|
|
Evacuations
|
|
),
|
|
lists:foreach(
|
|
fun({Node, Status}) ->
|
|
emqx_ctl:print(
|
|
"--------------------------------------------------------------------~n"
|
|
),
|
|
emqx_ctl:print(
|
|
"Node ~p: rebalance coordinator~n~s",
|
|
[Node, emqx_node_rebalance_status:format_coordinator_status(Status)]
|
|
)
|
|
end,
|
|
Rebalances
|
|
);
|
|
cli(["stop"]) ->
|
|
case emqx_node_rebalance_evacuation:status() of
|
|
{enabled, _} ->
|
|
ok = emqx_node_rebalance_evacuation:stop(),
|
|
emqx_ctl:print("Rebalance(evacuation) stopped~n"),
|
|
true;
|
|
disabled ->
|
|
case emqx_node_rebalance:status() of
|
|
{enabled, _} ->
|
|
ok = emqx_node_rebalance:stop(),
|
|
emqx_ctl:print("Rebalance stopped~n"),
|
|
true;
|
|
disabled ->
|
|
emqx_ctl:print("Rebalance is already disabled~n"),
|
|
false
|
|
end
|
|
end;
|
|
cli(_) ->
|
|
emqx_ctl:usage(
|
|
[
|
|
{
|
|
"rebalance start --evacuation \\\n"
|
|
" [--redirect-to \"Host1:Port1 Host2:Port2 ...\"] \\\n"
|
|
" [--conn-evict-rate CountPerSec] \\\n"
|
|
" [--migrate-to \"node1@host1 node2@host2 ...\"] \\\n"
|
|
" [--wait-takeover Secs] \\\n"
|
|
" [--sess-evict-rate CountPerSec]",
|
|
"Start current node evacuation with optional server redirect to the specified servers"
|
|
},
|
|
|
|
{
|
|
"rebalance start \\\n"
|
|
" [--nodes \"node1@host1 node2@host2\"] \\\n"
|
|
" [--wait-health-check Secs] \\\n"
|
|
" [--conn-evict-rate ConnPerSec] \\\n"
|
|
" [--abs-conn-threshold Count] \\\n"
|
|
" [--rel-conn-threshold Fraction] \\\n"
|
|
" [--conn-evict-rate ConnPerSec] \\\n"
|
|
" [--wait-takeover Secs] \\\n"
|
|
" [--sess-evict-rate CountPerSec] \\\n"
|
|
" [--abs-sess-threshold Count] \\\n"
|
|
" [--rel-sess-threshold Fraction]",
|
|
"Start rebalance on the specified nodes using the current node as the coordinator"
|
|
},
|
|
|
|
{"rebalance node-status", "Get current node rebalance status"},
|
|
|
|
{"rebalance node-status \"node1@host1\"", "Get remote node rebalance status"},
|
|
|
|
{"rebalance status",
|
|
"Get statuses of all current rebalance/evacuation processes across the cluster"},
|
|
|
|
{"rebalance stop", "Stop node rebalance"}
|
|
]
|
|
).
|
|
|
|
node_status(NodeStatus) ->
|
|
case NodeStatus of
|
|
{Process, Status} when Process =:= evacuation orelse Process =:= rebalance ->
|
|
emqx_ctl:print(
|
|
"Rebalance type: ~p~n~s~n",
|
|
[Process, emqx_node_rebalance_status:format_local_status(Status)]
|
|
);
|
|
disabled ->
|
|
emqx_ctl:print("Rebalance disabled~n");
|
|
Other ->
|
|
emqx_ctl:print("Error detecting rebalance status: ~p~n", [Other])
|
|
end.
|
|
|
|
start_args(Args) ->
|
|
case collect_args(Args, #{}) of
|
|
{ok, #{"--evacuation" := true} = Collected} ->
|
|
case validate_evacuation(maps:to_list(Collected), #{}) of
|
|
{ok, Validated} ->
|
|
{evacuation, Validated};
|
|
{error, _} = Error ->
|
|
Error
|
|
end;
|
|
{ok, #{} = Collected} ->
|
|
case validate_rebalance(maps:to_list(Collected), #{}) of
|
|
{ok, Validated} ->
|
|
{rebalance, Validated};
|
|
{error, _} = Error ->
|
|
Error
|
|
end;
|
|
{error, _} = Error ->
|
|
Error
|
|
end.
|
|
|
|
collect_args([], Map) ->
|
|
{ok, Map};
|
|
%% evacuation
|
|
collect_args(["--evacuation" | Args], Map) ->
|
|
collect_args(Args, Map#{"--evacuation" => true});
|
|
collect_args(["--redirect-to", ServerReference | Args], Map) ->
|
|
collect_args(Args, Map#{"--redirect-to" => ServerReference});
|
|
collect_args(["--migrate-to", MigrateTo | Args], Map) ->
|
|
collect_args(Args, Map#{"--migrate-to" => MigrateTo});
|
|
%% rebalance
|
|
collect_args(["--nodes", Nodes | Args], Map) ->
|
|
collect_args(Args, Map#{"--nodes" => Nodes});
|
|
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
|
|
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
|
|
collect_args(["--abs-conn-threshold", AbsConnThres | Args], Map) ->
|
|
collect_args(Args, Map#{"--abs-conn-threshold" => AbsConnThres});
|
|
collect_args(["--rel-conn-threshold", RelConnThres | Args], Map) ->
|
|
collect_args(Args, Map#{"--rel-conn-threshold" => RelConnThres});
|
|
collect_args(["--abs-sess-threshold", AbsSessThres | Args], Map) ->
|
|
collect_args(Args, Map#{"--abs-sess-threshold" => AbsSessThres});
|
|
collect_args(["--rel-sess-threshold", RelSessThres | Args], Map) ->
|
|
collect_args(Args, Map#{"--rel-sess-threshold" => RelSessThres});
|
|
%% common
|
|
collect_args(["--conn-evict-rate", ConnEvictRate | Args], Map) ->
|
|
collect_args(Args, Map#{"--conn-evict-rate" => ConnEvictRate});
|
|
collect_args(["--wait-takeover", WaitTakeover | Args], Map) ->
|
|
collect_args(Args, Map#{"--wait-takeover" => WaitTakeover});
|
|
collect_args(["--sess-evict-rate", SessEvictRate | Args], Map) ->
|
|
collect_args(Args, Map#{"--sess-evict-rate" => SessEvictRate});
|
|
%% fallback
|
|
collect_args(Args, _Map) ->
|
|
{error, io_lib:format("unknown arguments: ~p", [Args])}.
|
|
|
|
validate_evacuation([], Map) ->
|
|
{ok, Map};
|
|
validate_evacuation([{"--evacuation", _} | Rest], Map) ->
|
|
validate_evacuation(Rest, Map);
|
|
validate_evacuation([{"--redirect-to", ServerReference} | Rest], Map) ->
|
|
validate_evacuation(Rest, Map#{server_reference => list_to_binary(ServerReference)});
|
|
validate_evacuation([{"--conn-evict-rate", _} | _] = Opts, Map) ->
|
|
validate_pos_int(conn_evict_rate, Opts, Map, fun validate_evacuation/2);
|
|
validate_evacuation([{"--sess-evict-rate", _} | _] = Opts, Map) ->
|
|
validate_pos_int(sess_evict_rate, Opts, Map, fun validate_evacuation/2);
|
|
validate_evacuation([{"--wait-takeover", _} | _] = Opts, Map) ->
|
|
validate_pos_int(wait_takeover, Opts, Map, fun validate_evacuation/2);
|
|
validate_evacuation([{"--migrate-to", MigrateTo} | Rest], Map) ->
|
|
case strings_to_atoms(string:tokens(MigrateTo, ", ")) of
|
|
{_, Invalid} when Invalid =/= [] ->
|
|
{error, io_lib:format("invalid --migrate-to, invalid nodes: ~p", [Invalid])};
|
|
{Nodes, []} ->
|
|
case emqx_node_rebalance_evacuation:available_nodes(Nodes) of
|
|
[] ->
|
|
{error, "invalid --migrate-to, no nodes"};
|
|
Nodes ->
|
|
validate_evacuation(Rest, Map#{migrate_to => Nodes});
|
|
OtherNodes ->
|
|
{error,
|
|
io_lib:format(
|
|
"invalid --migrate-to, unavailable nodes: ~p",
|
|
[Nodes -- OtherNodes]
|
|
)}
|
|
end
|
|
end;
|
|
validate_evacuation(Rest, _Map) ->
|
|
{error, io_lib:format("unknown evacuation arguments: ~p", [Rest])}.
|
|
|
|
validate_rebalance([], Map) ->
|
|
{ok, Map};
|
|
validate_rebalance([{"--wait-health-check", _} | _] = Opts, Map) ->
|
|
validate_pos_int(wait_health_check, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--conn-evict-rate", _} | _] = Opts, Map) ->
|
|
validate_pos_int(conn_evict_rate, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--sess-evict-rate", _} | _] = Opts, Map) ->
|
|
validate_pos_int(sess_evict_rate, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--abs-conn-threshold", _} | _] = Opts, Map) ->
|
|
validate_pos_int(abs_conn_threshold, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--rel-conn-threshold", _} | _] = Opts, Map) ->
|
|
validate_fraction(rel_conn_threshold, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--abs-sess-threshold", _} | _] = Opts, Map) ->
|
|
validate_pos_int(abs_sess_threshold, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--rel-sess-threshold", _} | _] = Opts, Map) ->
|
|
validate_fraction(rel_sess_threshold, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--wait-takeover", _} | _] = Opts, Map) ->
|
|
validate_pos_int(wait_takeover, Opts, Map, fun validate_rebalance/2);
|
|
validate_rebalance([{"--nodes", NodeStr} | Rest], Map) ->
|
|
case strings_to_atoms(string:tokens(NodeStr, ", ")) of
|
|
{_, Invalid} when Invalid =/= [] ->
|
|
{error, io_lib:format("invalid --nodes, invalid nodes: ~p", [Invalid])};
|
|
{Nodes, []} ->
|
|
case emqx_node_rebalance:available_nodes(Nodes) of
|
|
[] ->
|
|
{error, "invalid --nodes, no nodes"};
|
|
Nodes ->
|
|
validate_rebalance(Rest, Map#{nodes => Nodes});
|
|
OtherNodes ->
|
|
{error,
|
|
io_lib:format(
|
|
"invalid --nodes, unavailable nodes: ~p",
|
|
[Nodes -- OtherNodes]
|
|
)}
|
|
end
|
|
end;
|
|
validate_rebalance(Rest, _Map) ->
|
|
{error, io_lib:format("unknown rebalance arguments: ~p", [Rest])}.
|
|
|
|
validate_fraction(Name, [{OptionName, Value} | Rest], Map, Next) ->
|
|
case string:to_float(Value) of
|
|
{Num, ""} when Num > 1.0 ->
|
|
Next(Rest, Map#{Name => Num});
|
|
_ ->
|
|
{error, "invalid " ++ OptionName ++ " value"}
|
|
end.
|
|
|
|
validate_pos_int(Name, [{OptionName, Value} | Rest], Map, Next) ->
|
|
case string:to_integer(Value) of
|
|
{Int, ""} when Int > 0 ->
|
|
Next(Rest, Map#{Name => Int});
|
|
_ ->
|
|
{error, "invalid " ++ OptionName ++ " value"}
|
|
end.
|
|
|
|
strings_to_atoms(Strings) ->
|
|
strings_to_atoms(Strings, [], []).
|
|
|
|
strings_to_atoms([], Atoms, Invalid) ->
|
|
{lists:reverse(Atoms), lists:reverse(Invalid)};
|
|
strings_to_atoms([Str | Rest], Atoms, Invalid) ->
|
|
case emqx_utils:safe_to_existing_atom(Str, utf8) of
|
|
{ok, Atom} ->
|
|
strings_to_atoms(Rest, [Atom | Atoms], Invalid);
|
|
{error, _} ->
|
|
strings_to_atoms(Rest, Atoms, [Str | Invalid])
|
|
end.
|