emqx/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl

306 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_cli).
%% APIs
-export([
load/0,
unload/0,
cli/1
]).
load() ->
emqx_ctl:register_command(rebalance, {?MODULE, cli}, []).
unload() ->
emqx_ctl:unregister_command(rebalance).
cli(["start" | StartArgs]) ->
case start_args(StartArgs) of
{evacuation, Opts} ->
case emqx_node_rebalance_evacuation:status() of
disabled ->
ok = emqx_node_rebalance_evacuation:start(Opts),
emqx_ctl:print("Rebalance(evacuation) started~n"),
true;
{enabled, _} ->
emqx_ctl:print("Rebalance is already enabled~n"),
false
end;
{rebalance, Opts} ->
case emqx_node_rebalance:start(Opts) of
ok ->
emqx_ctl:print("Rebalance started~n"),
true;
{error, Reason} ->
emqx_ctl:print("Rebalance start error: ~p~n", [Reason]),
false
end;
{error, Error} ->
emqx_ctl:print("Rebalance start error: ~s~n", [Error]),
false
end;
cli(["node-status", NodeStr]) ->
case emqx_utils:safe_to_existing_atom(NodeStr, utf8) of
{ok, Node} ->
node_status(emqx_node_rebalance_status:local_status(Node));
{error, _} ->
emqx_ctl:print("Node status error: invalid node~n"),
false
end;
cli(["node-status"]) ->
node_status(emqx_node_rebalance_status:local_status());
cli(["status"]) ->
#{
evacuations := Evacuations,
rebalances := Rebalances
} = emqx_node_rebalance_status:global_status(),
lists:foreach(
fun({Node, Status}) ->
emqx_ctl:print(
"--------------------------------------------------------------------~n"
),
emqx_ctl:print(
"Node ~p: evacuation~n~s",
[Node, emqx_node_rebalance_status:format_local_status(Status)]
)
end,
Evacuations
),
lists:foreach(
fun({Node, Status}) ->
emqx_ctl:print(
"--------------------------------------------------------------------~n"
),
emqx_ctl:print(
"Node ~p: rebalance coordinator~n~s",
[Node, emqx_node_rebalance_status:format_coordinator_status(Status)]
)
end,
Rebalances
);
cli(["stop"]) ->
case emqx_node_rebalance_evacuation:status() of
{enabled, _} ->
ok = emqx_node_rebalance_evacuation:stop(),
emqx_ctl:print("Rebalance(evacuation) stopped~n"),
true;
disabled ->
case emqx_node_rebalance:status() of
{enabled, _} ->
ok = emqx_node_rebalance:stop(),
emqx_ctl:print("Rebalance stopped~n"),
true;
disabled ->
emqx_ctl:print("Rebalance is already disabled~n"),
false
end
end;
cli(_) ->
emqx_ctl:usage(
[
{
"rebalance start --evacuation \\\n"
" [--redirect-to \"Host1:Port1 Host2:Port2 ...\"] \\\n"
" [--conn-evict-rate CountPerSec] \\\n"
" [--migrate-to \"node1@host1 node2@host2 ...\"] \\\n"
" [--wait-takeover Secs] \\\n"
" [--sess-evict-rate CountPerSec]",
"Start current node evacuation with optional server redirect to the specified servers"
},
{
"rebalance start \\\n"
" [--nodes \"node1@host1 node2@host2\"] \\\n"
" [--wait-health-check Secs] \\\n"
" [--conn-evict-rate ConnPerSec] \\\n"
" [--abs-conn-threshold Count] \\\n"
" [--rel-conn-threshold Fraction] \\\n"
" [--conn-evict-rate ConnPerSec] \\\n"
" [--wait-takeover Secs] \\\n"
" [--sess-evict-rate CountPerSec] \\\n"
" [--abs-sess-threshold Count] \\\n"
" [--rel-sess-threshold Fraction]",
"Start rebalance on the specified nodes using the current node as the coordinator"
},
{"rebalance node-status", "Get current node rebalance status"},
{"rebalance node-status \"node1@host1\"", "Get remote node rebalance status"},
{"rebalance status",
"Get statuses of all current rebalance/evacuation processes across the cluster"},
{"rebalance stop", "Stop node rebalance"}
]
).
node_status(NodeStatus) ->
case NodeStatus of
{Process, Status} when Process =:= evacuation orelse Process =:= rebalance ->
emqx_ctl:print(
"Rebalance type: ~p~n~s~n",
[Process, emqx_node_rebalance_status:format_local_status(Status)]
);
disabled ->
emqx_ctl:print("Rebalance disabled~n");
Other ->
emqx_ctl:print("Error detecting rebalance status: ~p~n", [Other])
end.
start_args(Args) ->
case collect_args(Args, #{}) of
{ok, #{"--evacuation" := true} = Collected} ->
case validate_evacuation(maps:to_list(Collected), #{}) of
{ok, Validated} ->
{evacuation, Validated};
{error, _} = Error ->
Error
end;
{ok, #{} = Collected} ->
case validate_rebalance(maps:to_list(Collected), #{}) of
{ok, Validated} ->
{rebalance, Validated};
{error, _} = Error ->
Error
end;
{error, _} = Error ->
Error
end.
collect_args([], Map) ->
{ok, Map};
%% evacuation
collect_args(["--evacuation" | Args], Map) ->
collect_args(Args, Map#{"--evacuation" => true});
collect_args(["--redirect-to", ServerReference | Args], Map) ->
collect_args(Args, Map#{"--redirect-to" => ServerReference});
collect_args(["--migrate-to", MigrateTo | Args], Map) ->
collect_args(Args, Map#{"--migrate-to" => MigrateTo});
%% rebalance
collect_args(["--nodes", Nodes | Args], Map) ->
collect_args(Args, Map#{"--nodes" => Nodes});
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
collect_args(["--abs-conn-threshold", AbsConnThres | Args], Map) ->
collect_args(Args, Map#{"--abs-conn-threshold" => AbsConnThres});
collect_args(["--rel-conn-threshold", RelConnThres | Args], Map) ->
collect_args(Args, Map#{"--rel-conn-threshold" => RelConnThres});
collect_args(["--abs-sess-threshold", AbsSessThres | Args], Map) ->
collect_args(Args, Map#{"--abs-sess-threshold" => AbsSessThres});
collect_args(["--rel-sess-threshold", RelSessThres | Args], Map) ->
collect_args(Args, Map#{"--rel-sess-threshold" => RelSessThres});
%% common
collect_args(["--conn-evict-rate", ConnEvictRate | Args], Map) ->
collect_args(Args, Map#{"--conn-evict-rate" => ConnEvictRate});
collect_args(["--wait-takeover", WaitTakeover | Args], Map) ->
collect_args(Args, Map#{"--wait-takeover" => WaitTakeover});
collect_args(["--sess-evict-rate", SessEvictRate | Args], Map) ->
collect_args(Args, Map#{"--sess-evict-rate" => SessEvictRate});
%% fallback
collect_args(Args, _Map) ->
{error, io_lib:format("unknown arguments: ~p", [Args])}.
validate_evacuation([], Map) ->
{ok, Map};
validate_evacuation([{"--evacuation", _} | Rest], Map) ->
validate_evacuation(Rest, Map);
validate_evacuation([{"--redirect-to", ServerReference} | Rest], Map) ->
validate_evacuation(Rest, Map#{server_reference => list_to_binary(ServerReference)});
validate_evacuation([{"--conn-evict-rate", _} | _] = Opts, Map) ->
validate_pos_int(conn_evict_rate, Opts, Map, fun validate_evacuation/2);
validate_evacuation([{"--sess-evict-rate", _} | _] = Opts, Map) ->
validate_pos_int(sess_evict_rate, Opts, Map, fun validate_evacuation/2);
validate_evacuation([{"--wait-takeover", _} | _] = Opts, Map) ->
validate_pos_int(wait_takeover, Opts, Map, fun validate_evacuation/2);
validate_evacuation([{"--migrate-to", MigrateTo} | Rest], Map) ->
case strings_to_atoms(string:tokens(MigrateTo, ", ")) of
{_, Invalid} when Invalid =/= [] ->
{error, io_lib:format("invalid --migrate-to, invalid nodes: ~p", [Invalid])};
{Nodes, []} ->
case emqx_node_rebalance_evacuation:available_nodes(Nodes) of
[] ->
{error, "invalid --migrate-to, no nodes"};
Nodes ->
validate_evacuation(Rest, Map#{migrate_to => Nodes});
OtherNodes ->
{error,
io_lib:format(
"invalid --migrate-to, unavailable nodes: ~p",
[Nodes -- OtherNodes]
)}
end
end;
validate_evacuation(Rest, _Map) ->
{error, io_lib:format("unknown evacuation arguments: ~p", [Rest])}.
validate_rebalance([], Map) ->
{ok, Map};
validate_rebalance([{"--wait-health-check", _} | _] = Opts, Map) ->
validate_pos_int(wait_health_check, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--conn-evict-rate", _} | _] = Opts, Map) ->
validate_pos_int(conn_evict_rate, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--sess-evict-rate", _} | _] = Opts, Map) ->
validate_pos_int(sess_evict_rate, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--abs-conn-threshold", _} | _] = Opts, Map) ->
validate_pos_int(abs_conn_threshold, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--rel-conn-threshold", _} | _] = Opts, Map) ->
validate_fraction(rel_conn_threshold, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--abs-sess-threshold", _} | _] = Opts, Map) ->
validate_pos_int(abs_sess_threshold, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--rel-sess-threshold", _} | _] = Opts, Map) ->
validate_fraction(rel_sess_threshold, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--wait-takeover", _} | _] = Opts, Map) ->
validate_pos_int(wait_takeover, Opts, Map, fun validate_rebalance/2);
validate_rebalance([{"--nodes", NodeStr} | Rest], Map) ->
case strings_to_atoms(string:tokens(NodeStr, ", ")) of
{_, Invalid} when Invalid =/= [] ->
{error, io_lib:format("invalid --nodes, invalid nodes: ~p", [Invalid])};
{Nodes, []} ->
case emqx_node_rebalance:available_nodes(Nodes) of
[] ->
{error, "invalid --nodes, no nodes"};
Nodes ->
validate_rebalance(Rest, Map#{nodes => Nodes});
OtherNodes ->
{error,
io_lib:format(
"invalid --nodes, unavailable nodes: ~p",
[Nodes -- OtherNodes]
)}
end
end;
validate_rebalance(Rest, _Map) ->
{error, io_lib:format("unknown rebalance arguments: ~p", [Rest])}.
validate_fraction(Name, [{OptionName, Value} | Rest], Map, Next) ->
case string:to_float(Value) of
{Num, ""} when Num > 1.0 ->
Next(Rest, Map#{Name => Num});
_ ->
{error, "invalid " ++ OptionName ++ " value"}
end.
validate_pos_int(Name, [{OptionName, Value} | Rest], Map, Next) ->
case string:to_integer(Value) of
{Int, ""} when Int > 0 ->
Next(Rest, Map#{Name => Int});
_ ->
{error, "invalid " ++ OptionName ++ " value"}
end.
strings_to_atoms(Strings) ->
strings_to_atoms(Strings, [], []).
strings_to_atoms([], Atoms, Invalid) ->
{lists:reverse(Atoms), lists:reverse(Invalid)};
strings_to_atoms([Str | Rest], Atoms, Invalid) ->
case emqx_utils:safe_to_existing_atom(Str, utf8) of
{ok, Atom} ->
strings_to_atoms(Rest, [Atom | Atoms], Invalid);
{error, _} ->
strings_to_atoms(Rest, Atoms, [Str | Invalid])
end.