feat(gw): change the listeners api fields

This commit is contained in:
JianBo He 2022-07-26 11:03:13 +08:00
parent 2c71d27a89
commit bad82b29ce
6 changed files with 100 additions and 56 deletions

View File

@ -112,6 +112,13 @@ emqx_gateway_api_listeners {
}
}
listener_status {
desc {
en: """listener status """
zh: """监听器状态"""
}
}
listener_node_status {
desc {
en: """listener status of each node in the cluster"""

View File

@ -81,7 +81,7 @@ paths() ->
listeners(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
Result = get_cluster_listeners_info(GwName),
Result = lists:map(fun bind2str/1, get_cluster_listeners_info(GwName)),
{200, Result}
end);
listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
@ -119,7 +119,7 @@ listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_conf:listener(ListenerId) of
{ok, Listener} ->
{200, Listener};
{200, bind2str(Listener)};
{error, not_found} ->
return_http_error(404, "Listener not found");
{error, Reason} ->
@ -266,11 +266,14 @@ get_cluster_listeners_info(GwName) ->
ClusterStatus
),
{MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus),
{MaxCons, CurrCons, Running} = aggregate_listener_status(NodeStatus),
Listener#{
max_connections => MaxCons,
current_connections => CurrCons,
status => #{
running => Running,
max_connections => MaxCons,
current_connections => CurrCons
},
node_status => NodeStatus
}
end,
@ -292,20 +295,23 @@ do_listeners_cluster_status(Listeners) ->
fun({Id, ListenOn}, Acc) ->
BinId = erlang:atom_to_binary(Id),
{ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
Curr =
{Running, Curr} =
try esockd:get_current_connections({Id, ListenOn}) of
Int -> Int
Int -> {true, Int}
catch
%% not started
error:not_found ->
0
{false, 0}
end,
Acc#{
Id => #{
node => Node,
current_connections => Curr,
%% XXX: Since it is taken from raw-conf, it is possible a string
max_connections => int(Max)
status => #{
running => Running,
current_connections => Curr,
%% XXX: Since it is taken from raw-conf, it is possible a string
max_connections => int(Max)
}
}
}
end,
@ -317,6 +323,31 @@ int(B) when is_binary(B) ->
binary_to_integer(B);
int(I) when is_integer(I) ->
I.
aggregate_listener_status(NodeStatus) ->
aggregate_listener_status(NodeStatus, 0, 0, undefined).
aggregate_listener_status(
[
#{status := #{running := Running, max_connections := Max, current_connections := Current}}
| T
],
MaxAcc,
CurrAcc,
RunningAcc
) ->
NRunning = aggregate_running(Running, RunningAcc),
aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning);
aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) ->
{MaxAcc, CurrAcc, RunningAcc}.
aggregate_running(R, R) -> R;
aggregate_running(R, undefined) -> R;
aggregate_running(_, _) -> inconsistent.
bind2str(Listener = #{bind := Bind}) ->
Listener#{bind := iolist_to_binary(emqx_gateway_utils:format_listenon(Bind))};
bind2str(Listener = #{<<"bind">> := Bind}) ->
Listener#{<<"bind">> := iolist_to_binary(emqx_gateway_utils:format_listenon(Bind))}.
%%--------------------------------------------------------------------
%% Swagger defines
@ -590,22 +621,25 @@ params_paging_in_qs() ->
roots() ->
[listener].
fields(listener_node_status) ->
fields(listener_status) ->
[
{current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})},
{status,
mk(ref(emqx_mgmt_api_listeners, status), #{
desc => ?DESC(listener_status)
})},
{node_status,
mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{
desc => ?DESC(listener_node_status)
})}
];
fields(tcp_listener) ->
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_status);
fields(ssl_listener) ->
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_status);
fields(udp_listener) ->
emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(udp_listener) ++ fields(listener_status);
fields(dtls_listener) ->
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_status);
fields(_) ->
[].
@ -623,12 +657,19 @@ listener_node_status_schema() ->
examples_listener_list() ->
Convert = fun(Cfg) ->
Cfg#{
current_connections => 0,
status => #{
running => true,
max_connections => 1024000,
current_connections => 10
},
node_status => [
#{
node => <<"127.0.0.1">>,
current_connections => 0,
max_connections => 1024000
node => <<"emqx@127.0.0.1">>,
status => #{
running => true,
current_connections => 10,
max_connections => 1024000
}
}
]
}

View File

@ -181,24 +181,11 @@ do_convert_listener(GwName, LType, Conf) ->
do_convert_listener2(GwName, LType, LName, LConf) ->
ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName),
Running = emqx_gateway_utils:is_running(ListenerId, LConf),
bind2str(
LConf#{
id => ListenerId,
type => LType,
name => LName,
running => Running
}
).
bind2str(LConf = #{bind := Bind}) when is_integer(Bind) ->
maps:put(bind, integer_to_binary(Bind), LConf);
bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) ->
maps:put(<<"bind">>, integer_to_binary(Bind), LConf);
bind2str(LConf = #{bind := Bind}) when is_binary(Bind) ->
LConf;
bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) ->
LConf.
LConf#{
id => ListenerId,
type => LType,
name => LName
}.
get_bind(#{bind := Bind}) ->
emqx_gateway_utils:parse_listenon(Bind);

View File

@ -287,12 +287,8 @@ apply(F, A2) when
->
erlang:apply(F, A2).
format_listenon(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format_listenon({Addr, Port}) when is_list(Addr) ->
io_lib:format("~ts:~w", [Addr, Port]);
format_listenon({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
format_listenon(Term) ->
emqx_mgmt_util:format_listen_on(Term).
parse_listenon(Port) when is_integer(Port) ->
Port;

View File

@ -582,7 +582,7 @@ listeners([]) ->
end,
Info =
[
{listen_on, {string, format_listen_on(Bind)}},
{listen_on, {string, emqx_mgmt_util:format_listen_on(Port)}},
{acceptors, Acceptors},
{proxy_protocol, ProxyProtocol},
{running, Running}
@ -802,15 +802,6 @@ indent_print({Key, {string, Val}}) ->
indent_print({Key, Val}) ->
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
format_listen_on(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format_listen_on({Addr, Port}) when is_list(Addr) ->
io_lib:format("~ts:~w", [Addr, Port]);
format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 4 ->
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 8 ->
io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]).
name(Filter) ->
iolist_to_binary(["CLI-", Filter]).

View File

@ -43,7 +43,10 @@
batch_schema/1
]).
-export([urldecode/1]).
-export([
urldecode/1,
format_listen_on/1
]).
-define(KB, 1024).
-define(MB, (1024 * 1024)).
@ -86,6 +89,25 @@ merge_maps(Default, New) ->
urldecode(S) ->
emqx_http_lib:uri_decode(S).
-spec format_listen_on(
integer() | {tuple(), integer()} | string() | binary()
) -> io_lib:chars().
format_listen_on(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format_listen_on({Addr, Port}) when is_list(Addr) ->
io_lib:format("~ts:~w", [Addr, Port]);
format_listen_on({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
format_listen_on(Str) when is_list(Str) ->
case emqx_schema:to_ip_port(Str) of
{ok, {Ip, Port}} ->
format_listen_on({Ip, Port});
{error, _} ->
format_listen_on(list_to_integer(Str))
end;
format_listen_on(Bin) when is_binary(Bin) ->
format_listen_on(binary_to_list(Bin)).
%%%==============================================================================================
%% schema util
schema(Ref) when is_atom(Ref) ->