734 lines
21 KiB
Erlang
734 lines
21 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_node_rebalance_api).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
|
|
|
|
%% Swagger specs from hocon schema
|
|
-export([
|
|
api_spec/0,
|
|
paths/0,
|
|
schema/1,
|
|
namespace/0
|
|
]).
|
|
|
|
-export([
|
|
fields/1,
|
|
roots/0
|
|
]).
|
|
|
|
%% API callbacks
|
|
-export([
|
|
'/load_rebalance/status'/2,
|
|
'/load_rebalance/global_status'/2,
|
|
'/load_rebalance/availability_check'/2,
|
|
'/load_rebalance/:node/start'/2,
|
|
'/load_rebalance/:node/stop'/2,
|
|
'/load_rebalance/:node/evacuation/start'/2,
|
|
'/load_rebalance/:node/evacuation/stop'/2
|
|
]).
|
|
|
|
%% Schema examples
|
|
-export([
|
|
rebalance_example/0,
|
|
rebalance_evacuation_example/0,
|
|
translate/2
|
|
]).
|
|
|
|
-import(hoconsc, [mk/2, ref/1, ref/2]).
|
|
-import(emqx_dashboard_swagger, [error_codes/2]).
|
|
|
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
|
-define(NODE_EVACUATING, 'NODE_EVACUATING').
|
|
-define(RPC_ERROR, 'RPC_ERROR').
|
|
-define(NOT_FOUND, 'NOT_FOUND').
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API Spec
|
|
%%--------------------------------------------------------------------
|
|
|
|
namespace() -> "load_rebalance".
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
|
|
|
paths() ->
|
|
[
|
|
"/load_rebalance/status",
|
|
"/load_rebalance/global_status",
|
|
"/load_rebalance/availability_check",
|
|
"/load_rebalance/:node/start",
|
|
"/load_rebalance/:node/stop",
|
|
"/load_rebalance/:node/evacuation/start",
|
|
"/load_rebalance/:node/evacuation/stop"
|
|
].
|
|
|
|
schema("/load_rebalance/status") ->
|
|
#{
|
|
'operationId' => '/load_rebalance/status',
|
|
get => #{
|
|
tags => [<<"load_rebalance">>],
|
|
summary => <<"Get rebalance status">>,
|
|
description => ?DESC("load_rebalance_status"),
|
|
responses => #{
|
|
200 => local_status_response_schema()
|
|
}
|
|
}
|
|
};
|
|
schema("/load_rebalance/global_status") ->
|
|
#{
|
|
'operationId' => '/load_rebalance/global_status',
|
|
get => #{
|
|
tags => [<<"load_rebalance">>],
|
|
summary => <<"Get global rebalance status">>,
|
|
description => ?DESC("load_rebalance_global_status"),
|
|
responses => #{
|
|
200 => response_schema()
|
|
}
|
|
}
|
|
};
|
|
schema("/load_rebalance/availability_check") ->
|
|
#{
|
|
'operationId' => '/load_rebalance/availability_check',
|
|
get => #{
|
|
tags => [<<"load_rebalance">>],
|
|
summary => <<"Node rebalance availability check">>,
|
|
description => ?DESC("load_rebalance_availability_check"),
|
|
responses => #{
|
|
200 => response_schema(),
|
|
503 => error_codes([?NODE_EVACUATING], <<"Node Evacuating">>)
|
|
}
|
|
}
|
|
};
|
|
schema("/load_rebalance/:node/start") ->
|
|
#{
|
|
'operationId' => '/load_rebalance/:node/start',
|
|
post => #{
|
|
tags => [<<"load_rebalance">>],
|
|
summary => <<"Start rebalancing with the node as coordinator">>,
|
|
description => ?DESC("load_rebalance_start"),
|
|
parameters => [param_node()],
|
|
'requestBody' =>
|
|
emqx_dashboard_swagger:schema_with_examples(
|
|
ref(rebalance_start),
|
|
rebalance_example()
|
|
),
|
|
responses => #{
|
|
200 => response_schema(),
|
|
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
|
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
|
}
|
|
}
|
|
};
|
|
schema("/load_rebalance/:node/stop") ->
|
|
#{
|
|
'operationId' => '/load_rebalance/:node/stop',
|
|
post => #{
|
|
tags => [<<"load_rebalance">>],
|
|
summary => <<"Stop rebalancing coordinated by the node">>,
|
|
description => ?DESC("load_rebalance_stop"),
|
|
parameters => [param_node()],
|
|
responses => #{
|
|
200 => response_schema(),
|
|
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
|
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
|
}
|
|
}
|
|
};
|
|
schema("/load_rebalance/:node/evacuation/start") ->
|
|
#{
|
|
'operationId' => '/load_rebalance/:node/evacuation/start',
|
|
post => #{
|
|
tags => [<<"load_rebalance">>],
|
|
summary => <<"Start evacuation on a node">>,
|
|
description => ?DESC("load_rebalance_evacuation_start"),
|
|
parameters => [param_node()],
|
|
'requestBody' =>
|
|
emqx_dashboard_swagger:schema_with_examples(
|
|
ref(rebalance_evacuation_start),
|
|
rebalance_evacuation_example()
|
|
),
|
|
responses => #{
|
|
200 => response_schema(),
|
|
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
|
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
|
}
|
|
}
|
|
};
|
|
schema("/load_rebalance/:node/evacuation/stop") ->
|
|
#{
|
|
'operationId' => '/load_rebalance/:node/evacuation/stop',
|
|
post => #{
|
|
tags => [<<"load_rebalance">>],
|
|
summary => <<"Stop evacuation on a node">>,
|
|
description => ?DESC("load_rebalance_evacuation_stop"),
|
|
parameters => [param_node()],
|
|
responses => #{
|
|
200 => response_schema(),
|
|
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
|
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
|
}
|
|
}
|
|
}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handlers
|
|
%%--------------------------------------------------------------------
|
|
|
|
'/load_rebalance/status'(get, #{}) ->
|
|
case emqx_node_rebalance_status:local_status() of
|
|
disabled ->
|
|
{200, #{status => disabled}};
|
|
{rebalance, Stats} ->
|
|
{200, format_status(rebalance, Stats)};
|
|
{evacuation, Stats} ->
|
|
{200, format_status(evacuation, Stats)}
|
|
end.
|
|
|
|
'/load_rebalance/global_status'(get, #{}) ->
|
|
#{
|
|
evacuations := Evacuations,
|
|
rebalances := Rebalances
|
|
} = emqx_node_rebalance_status:global_status(),
|
|
{200, #{
|
|
evacuations => format_as_map_list(Evacuations),
|
|
rebalances => format_as_map_list(Rebalances)
|
|
}}.
|
|
|
|
'/load_rebalance/availability_check'(get, #{}) ->
|
|
case emqx_eviction_agent:status() of
|
|
disabled ->
|
|
{200, #{}};
|
|
{enabled, _Stats} ->
|
|
error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
|
|
end.
|
|
|
|
'/load_rebalance/:node/start'(post, #{bindings := #{node := NodeBin}, body := Params0}) ->
|
|
emqx_utils_api:with_node(NodeBin, fun(Node) ->
|
|
Params1 = translate(rebalance_start, Params0),
|
|
with_nodes_at_key(nodes, Params1, fun(Params2) ->
|
|
wrap_rpc(
|
|
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_start(Node, Params2)
|
|
)
|
|
end)
|
|
end).
|
|
|
|
'/load_rebalance/:node/stop'(post, #{bindings := #{node := NodeBin}}) ->
|
|
emqx_utils_api:with_node(NodeBin, fun(Node) ->
|
|
wrap_rpc(
|
|
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_stop(Node)
|
|
)
|
|
end).
|
|
|
|
'/load_rebalance/:node/evacuation/start'(post, #{
|
|
bindings := #{node := NodeBin}, body := Params0
|
|
}) ->
|
|
emqx_utils_api:with_node(NodeBin, fun(Node) ->
|
|
Params1 = translate(rebalance_evacuation_start, Params0),
|
|
with_nodes_at_key(migrate_to, Params1, fun(Params2) ->
|
|
wrap_rpc(
|
|
Node,
|
|
emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_start(
|
|
Node, Params2
|
|
)
|
|
)
|
|
end)
|
|
end).
|
|
|
|
'/load_rebalance/:node/evacuation/stop'(post, #{bindings := #{node := NodeBin}}) ->
|
|
emqx_utils_api:with_node(NodeBin, fun(Node) ->
|
|
wrap_rpc(
|
|
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_stop(Node)
|
|
)
|
|
end).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helpers
|
|
%%--------------------------------------------------------------------
|
|
|
|
wrap_rpc(Node, RPCResult) ->
|
|
case RPCResult of
|
|
ok ->
|
|
{200, #{}};
|
|
{error, Reason} ->
|
|
error_response(
|
|
400, ?BAD_REQUEST, io_lib:format("error on node ~p: ~p", [Node, Reason])
|
|
);
|
|
{badrpc, Reason} ->
|
|
error_response(
|
|
503, ?RPC_ERROR, io_lib:format("RPC error on node ~p: ~p", [Node, Reason])
|
|
)
|
|
end.
|
|
|
|
format_status(Process, Stats) ->
|
|
Stats#{process => Process, status => enabled}.
|
|
|
|
validate_nodes(Key, Params) when is_map_key(Key, Params) ->
|
|
BinNodes = maps:get(Key, Params),
|
|
{ValidNodes, InvalidNodes} = lists:foldl(
|
|
fun(BinNode, {Nodes, UnknownNodes}) ->
|
|
case parse_node(BinNode) of
|
|
{ok, Node} -> {[Node | Nodes], UnknownNodes};
|
|
{error, _} -> {Nodes, [BinNode | UnknownNodes]}
|
|
end
|
|
end,
|
|
{[], []},
|
|
BinNodes
|
|
),
|
|
case InvalidNodes of
|
|
[] ->
|
|
case emqx_node_rebalance_evacuation:available_nodes(ValidNodes) of
|
|
ValidNodes -> {ok, Params#{Key => ValidNodes}};
|
|
OtherNodes -> {error, {unavailable, ValidNodes -- OtherNodes}}
|
|
end;
|
|
_ ->
|
|
{error, {invalid, InvalidNodes}}
|
|
end;
|
|
validate_nodes(_Key, Params) ->
|
|
{ok, Params}.
|
|
|
|
with_nodes_at_key(Key, Params, Fun) ->
|
|
Res = validate_nodes(Key, Params),
|
|
case Res of
|
|
{ok, Params1} ->
|
|
Fun(Params1);
|
|
{error, {unavailable, Nodes}} ->
|
|
error_response(400, ?NOT_FOUND, io_lib:format("Nodes unavailable: ~p", [Nodes]));
|
|
{error, {invalid, Nodes}} ->
|
|
error_response(400, ?BAD_REQUEST, io_lib:format("Invalid nodes: ~p", [Nodes]))
|
|
end.
|
|
|
|
parse_node(Bin) when is_binary(Bin) ->
|
|
try
|
|
{ok, binary_to_existing_atom(Bin)}
|
|
catch
|
|
error:badarg ->
|
|
{error, {unknown, Bin}}
|
|
end.
|
|
|
|
format_as_map_list(List) ->
|
|
lists:map(
|
|
fun({Node, Info}) ->
|
|
Info#{node => Node}
|
|
end,
|
|
List
|
|
).
|
|
|
|
error_response(HttpCode, Code, Message) ->
|
|
{HttpCode, ?ERROR_MSG(Code, Message)}.
|
|
|
|
without(Keys, Props) ->
|
|
lists:filter(
|
|
fun({Key, _}) ->
|
|
not lists:member(Key, Keys)
|
|
end,
|
|
Props
|
|
).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Schema
|
|
%%------------------------------------------------------------------------------
|
|
|
|
translate(Ref, Conf) ->
|
|
Options = #{atom_key => true},
|
|
#{Ref := TranslatedConf} = hocon_tconf:check_plain(
|
|
?MODULE, #{atom_to_binary(Ref) => Conf}, Options, [Ref]
|
|
),
|
|
TranslatedConf.
|
|
|
|
param_node() ->
|
|
{
|
|
node,
|
|
mk(binary(), #{
|
|
in => path,
|
|
desc => ?DESC(param_node),
|
|
required => true
|
|
})
|
|
}.
|
|
|
|
fields(rebalance_start) ->
|
|
[
|
|
{"wait_health_check",
|
|
mk(
|
|
emqx_schema:duration_s(),
|
|
#{
|
|
desc => ?DESC(wait_health_check),
|
|
required => false
|
|
}
|
|
)},
|
|
{"conn_evict_rate",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(conn_evict_rate),
|
|
required => false
|
|
}
|
|
)},
|
|
{"sess_evict_rate",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(sess_evict_rate),
|
|
required => false
|
|
}
|
|
)},
|
|
{"abs_conn_threshold",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(abs_conn_threshold),
|
|
required => false
|
|
}
|
|
)},
|
|
{"rel_conn_threshold",
|
|
mk(
|
|
number(),
|
|
#{
|
|
desc => ?DESC(rel_conn_threshold),
|
|
required => false,
|
|
validator => [fun(Value) -> Value > 1.0 end]
|
|
}
|
|
)},
|
|
{"abs_sess_threshold",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(abs_sess_threshold),
|
|
required => false
|
|
}
|
|
)},
|
|
{"rel_sess_threshold",
|
|
mk(
|
|
number(),
|
|
#{
|
|
desc => ?DESC(rel_sess_threshold),
|
|
required => false,
|
|
validator => [fun(Value) -> Value > 1.0 end]
|
|
}
|
|
)},
|
|
{"wait_takeover",
|
|
mk(
|
|
emqx_schema:duration_s(),
|
|
#{
|
|
desc => ?DESC(wait_takeover),
|
|
required => false
|
|
}
|
|
)},
|
|
{"nodes",
|
|
mk(
|
|
list(binary()),
|
|
#{
|
|
desc => ?DESC(rebalance_nodes),
|
|
required => false,
|
|
validator => [fun(Values) -> length(Values) > 0 end]
|
|
}
|
|
)}
|
|
];
|
|
fields(rebalance_evacuation_start) ->
|
|
[
|
|
{"conn_evict_rate",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(conn_evict_rate),
|
|
required => false
|
|
}
|
|
)},
|
|
{"sess_evict_rate",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(sess_evict_rate),
|
|
required => false
|
|
}
|
|
)},
|
|
{"redirect_to",
|
|
mk(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC(redirect_to),
|
|
required => false
|
|
}
|
|
)},
|
|
{"wait_takeover",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(wait_takeover),
|
|
required => false
|
|
}
|
|
)},
|
|
{"migrate_to",
|
|
mk(
|
|
nonempty_list(binary()),
|
|
#{
|
|
desc => ?DESC(migrate_to),
|
|
required => false
|
|
}
|
|
)}
|
|
];
|
|
fields(local_status_disabled) ->
|
|
[
|
|
{"status",
|
|
mk(
|
|
disabled,
|
|
#{
|
|
desc => ?DESC(local_status_enabled),
|
|
required => true
|
|
}
|
|
)}
|
|
];
|
|
fields(local_status_enabled) ->
|
|
[
|
|
{"status",
|
|
mk(
|
|
enabled,
|
|
#{
|
|
desc => ?DESC(local_status_enabled),
|
|
required => true
|
|
}
|
|
)},
|
|
{"process",
|
|
mk(
|
|
hoconsc:union([rebalance, evacuation]),
|
|
#{
|
|
desc => ?DESC(local_status_process),
|
|
required => true
|
|
}
|
|
)},
|
|
{"state",
|
|
mk(
|
|
atom(),
|
|
#{
|
|
desc => ?DESC(local_status_state),
|
|
required => true
|
|
}
|
|
)},
|
|
{"coordinator_node",
|
|
mk(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC(local_status_coordinator_node),
|
|
required => false
|
|
}
|
|
)},
|
|
{"connection_eviction_rate",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(local_status_connection_eviction_rate),
|
|
required => false
|
|
}
|
|
)},
|
|
{"session_eviction_rate",
|
|
mk(
|
|
pos_integer(),
|
|
#{
|
|
desc => ?DESC(local_status_session_eviction_rate),
|
|
required => false
|
|
}
|
|
)},
|
|
{"connection_goal",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(local_status_connection_goal),
|
|
required => false
|
|
}
|
|
)},
|
|
{"session_goal",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(local_status_session_goal),
|
|
required => false
|
|
}
|
|
)},
|
|
{"disconnected_session_goal",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(local_status_disconnected_session_goal),
|
|
required => false
|
|
}
|
|
)},
|
|
{"session_recipients",
|
|
mk(
|
|
list(binary()),
|
|
#{
|
|
desc => ?DESC(local_status_session_recipients),
|
|
required => false
|
|
}
|
|
)},
|
|
{"recipients",
|
|
mk(
|
|
list(binary()),
|
|
#{
|
|
desc => ?DESC(local_status_recipients),
|
|
required => false
|
|
}
|
|
)},
|
|
{"stats",
|
|
mk(
|
|
ref(status_stats),
|
|
#{
|
|
desc => ?DESC(local_status_stats),
|
|
required => false
|
|
}
|
|
)}
|
|
];
|
|
fields(status_stats) ->
|
|
[
|
|
{"initial_connected",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(status_stats_initial_connected),
|
|
required => true
|
|
}
|
|
)},
|
|
{"current_connected",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(status_stats_current_connected),
|
|
required => true
|
|
}
|
|
)},
|
|
{"initial_sessions",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(status_stats_initial_sessions),
|
|
required => true
|
|
}
|
|
)},
|
|
{"current_sessions",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(status_stats_current_sessions),
|
|
required => true
|
|
}
|
|
)},
|
|
{"current_disconnected_sessions",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(status_stats_current_disconnected_sessions),
|
|
required => false
|
|
}
|
|
)}
|
|
];
|
|
fields(global_coordinator_status) ->
|
|
without(
|
|
["status", "process", "session_goal", "session_recipients", "stats"],
|
|
fields(local_status_enabled)
|
|
) ++
|
|
[
|
|
{"donors",
|
|
mk(
|
|
list(binary()),
|
|
#{
|
|
desc => ?DESC(coordinator_status_donors),
|
|
required => false
|
|
}
|
|
)},
|
|
{"donor_conn_avg",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(coordinator_status_donor_conn_avg),
|
|
required => false
|
|
}
|
|
)},
|
|
{"donor_sess_avg",
|
|
mk(
|
|
non_neg_integer(),
|
|
#{
|
|
desc => ?DESC(coordinator_status_donor_sess_avg),
|
|
required => false
|
|
}
|
|
)},
|
|
{"node",
|
|
mk(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC(coordinator_status_node),
|
|
required => true
|
|
}
|
|
)}
|
|
];
|
|
fields(global_evacuation_status) ->
|
|
without(["status", "process"], fields(local_status_enabled)) ++
|
|
[
|
|
{"node",
|
|
mk(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC(evacuation_status_node),
|
|
required => true
|
|
}
|
|
)}
|
|
];
|
|
fields(global_status) ->
|
|
[
|
|
{"evacuations",
|
|
mk(
|
|
hoconsc:array(ref(global_evacuation_status)),
|
|
#{
|
|
desc => ?DESC(global_status_evacuations),
|
|
required => false
|
|
}
|
|
)},
|
|
{"rebalances",
|
|
mk(
|
|
hoconsc:array(ref(global_coordinator_status)),
|
|
#{
|
|
desc => ?DESC(global_status_rebalances),
|
|
required => false
|
|
}
|
|
)}
|
|
].
|
|
|
|
rebalance_example() ->
|
|
#{
|
|
wait_health_check => 10,
|
|
conn_evict_rate => 10,
|
|
sess_evict_rate => 20,
|
|
abs_conn_threshold => 10,
|
|
rel_conn_threshold => 1.5,
|
|
abs_sess_threshold => 10,
|
|
rel_sess_threshold => 1.5,
|
|
wait_takeover => 10,
|
|
nodes => [<<"othernode@127.0.0.1">>]
|
|
}.
|
|
|
|
rebalance_evacuation_example() ->
|
|
#{
|
|
conn_evict_rate => 100,
|
|
sess_evict_rate => 100,
|
|
redirect_to => <<"othernode:1883">>,
|
|
wait_takeover => 10,
|
|
migrate_to => [<<"othernode@127.0.0.1">>]
|
|
}.
|
|
|
|
local_status_response_schema() ->
|
|
hoconsc:union([ref(local_status_disabled), ref(local_status_enabled)]).
|
|
|
|
response_schema() ->
|
|
mk(
|
|
map(),
|
|
#{
|
|
desc => ?DESC(empty_response)
|
|
}
|
|
).
|
|
|
|
roots() -> [].
|