From bad82b29ce4941afaf77588dfbfd3880bff6d39b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 26 Jul 2022 11:03:13 +0800 Subject: [PATCH] feat(gw): change the listeners api fields --- .../i18n/emqx_gateway_api_listeners_i18n.conf | 7 ++ .../src/emqx_gateway_api_listeners.erl | 83 ++++++++++++++----- apps/emqx_gateway/src/emqx_gateway_conf.erl | 23 ++--- apps/emqx_gateway/src/emqx_gateway_utils.erl | 8 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 11 +-- apps/emqx_management/src/emqx_mgmt_util.erl | 24 +++++- 6 files changed, 100 insertions(+), 56 deletions(-) diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf index e4f7413d0..9c5de67c3 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf @@ -112,6 +112,13 @@ emqx_gateway_api_listeners { } } + listener_status { + desc { + en: """listener status """ + zh: """监听器状态""" + } + } + listener_node_status { desc { en: """listener status of each node in the cluster""" diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 0f0ec8606..473305610 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -81,7 +81,7 @@ paths() -> listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> - Result = get_cluster_listeners_info(GwName), + Result = lists:map(fun bind2str/1, get_cluster_listeners_info(GwName)), {200, Result} end); listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> @@ -119,7 +119,7 @@ listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) -> with_gateway(Name0, fun(_GwName, _) -> case emqx_gateway_conf:listener(ListenerId) of {ok, Listener} -> - {200, Listener}; + {200, bind2str(Listener)}; {error, not_found} -> return_http_error(404, "Listener not found"); {error, Reason} -> @@ -266,11 +266,14 @@ get_cluster_listeners_info(GwName) -> ClusterStatus ), - {MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus), + {MaxCons, CurrCons, Running} = aggregate_listener_status(NodeStatus), Listener#{ - max_connections => MaxCons, - current_connections => CurrCons, + status => #{ + running => Running, + max_connections => MaxCons, + current_connections => CurrCons + }, node_status => NodeStatus } end, @@ -292,20 +295,23 @@ do_listeners_cluster_status(Listeners) -> fun({Id, ListenOn}, Acc) -> BinId = erlang:atom_to_binary(Id), {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), - Curr = + {Running, Curr} = try esockd:get_current_connections({Id, ListenOn}) of - Int -> Int + Int -> {true, Int} catch %% not started error:not_found -> - 0 + {false, 0} end, Acc#{ Id => #{ node => Node, - current_connections => Curr, - %% XXX: Since it is taken from raw-conf, it is possible a string - max_connections => int(Max) + status => #{ + running => Running, + current_connections => Curr, + %% XXX: Since it is taken from raw-conf, it is possible a string + max_connections => int(Max) + } } } end, @@ -317,6 +323,31 @@ int(B) when is_binary(B) -> binary_to_integer(B); int(I) when is_integer(I) -> I. +aggregate_listener_status(NodeStatus) -> + aggregate_listener_status(NodeStatus, 0, 0, undefined). + +aggregate_listener_status( + [ + #{status := #{running := Running, max_connections := Max, current_connections := Current}} + | T + ], + MaxAcc, + CurrAcc, + RunningAcc +) -> + NRunning = aggregate_running(Running, RunningAcc), + aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning); +aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) -> + {MaxAcc, CurrAcc, RunningAcc}. + +aggregate_running(R, R) -> R; +aggregate_running(R, undefined) -> R; +aggregate_running(_, _) -> inconsistent. + +bind2str(Listener = #{bind := Bind}) -> + Listener#{bind := iolist_to_binary(emqx_gateway_utils:format_listenon(Bind))}; +bind2str(Listener = #{<<"bind">> := Bind}) -> + Listener#{<<"bind">> := iolist_to_binary(emqx_gateway_utils:format_listenon(Bind))}. %%-------------------------------------------------------------------- %% Swagger defines @@ -590,22 +621,25 @@ params_paging_in_qs() -> roots() -> [listener]. -fields(listener_node_status) -> +fields(listener_status) -> [ - {current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})}, + {status, + mk(ref(emqx_mgmt_api_listeners, status), #{ + desc => ?DESC(listener_status) + })}, {node_status, mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{ desc => ?DESC(listener_node_status) })} ]; fields(tcp_listener) -> - emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(tcp_listener) ++ fields(listener_status); fields(ssl_listener) -> - emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(ssl_listener) ++ fields(listener_status); fields(udp_listener) -> - emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(udp_listener) ++ fields(listener_status); fields(dtls_listener) -> - emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status); + emqx_gateway_api:fields(dtls_listener) ++ fields(listener_status); fields(_) -> []. @@ -623,12 +657,19 @@ listener_node_status_schema() -> examples_listener_list() -> Convert = fun(Cfg) -> Cfg#{ - current_connections => 0, + status => #{ + running => true, + max_connections => 1024000, + current_connections => 10 + }, node_status => [ #{ - node => <<"127.0.0.1">>, - current_connections => 0, - max_connections => 1024000 + node => <<"emqx@127.0.0.1">>, + status => #{ + running => true, + current_connections => 10, + max_connections => 1024000 + } } ] } diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 50fc069fc..5fe858fa9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -181,24 +181,11 @@ do_convert_listener(GwName, LType, Conf) -> do_convert_listener2(GwName, LType, LName, LConf) -> ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName), - Running = emqx_gateway_utils:is_running(ListenerId, LConf), - bind2str( - LConf#{ - id => ListenerId, - type => LType, - name => LName, - running => Running - } - ). - -bind2str(LConf = #{bind := Bind}) when is_integer(Bind) -> - maps:put(bind, integer_to_binary(Bind), LConf); -bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) -> - maps:put(<<"bind">>, integer_to_binary(Bind), LConf); -bind2str(LConf = #{bind := Bind}) when is_binary(Bind) -> - LConf; -bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) -> - LConf. + LConf#{ + id => ListenerId, + type => LType, + name => LName + }. get_bind(#{bind := Bind}) -> emqx_gateway_utils:parse_listenon(Bind); diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 6a491de3d..f8c0549bd 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -287,12 +287,8 @@ apply(F, A2) when -> erlang:apply(F, A2). -format_listenon(Port) when is_integer(Port) -> - io_lib:format("0.0.0.0:~w", [Port]); -format_listenon({Addr, Port}) when is_list(Addr) -> - io_lib:format("~ts:~w", [Addr, Port]); -format_listenon({Addr, Port}) when is_tuple(Addr) -> - io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]). +format_listenon(Term) -> + emqx_mgmt_util:format_listen_on(Term). parse_listenon(Port) when is_integer(Port) -> Port; diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index c846c9d58..e373f11cd 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -582,7 +582,7 @@ listeners([]) -> end, Info = [ - {listen_on, {string, format_listen_on(Bind)}}, + {listen_on, {string, emqx_mgmt_util:format_listen_on(Port)}}, {acceptors, Acceptors}, {proxy_protocol, ProxyProtocol}, {running, Running} @@ -802,15 +802,6 @@ indent_print({Key, {string, Val}}) -> indent_print({Key, Val}) -> emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]). -format_listen_on(Port) when is_integer(Port) -> - io_lib:format("0.0.0.0:~w", [Port]); -format_listen_on({Addr, Port}) when is_list(Addr) -> - io_lib:format("~ts:~w", [Addr, Port]); -format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 4 -> - io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]); -format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 8 -> - io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]). - name(Filter) -> iolist_to_binary(["CLI-", Filter]). diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 58fd6e952..2802046fb 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -43,7 +43,10 @@ batch_schema/1 ]). --export([urldecode/1]). +-export([ + urldecode/1, + format_listen_on/1 +]). -define(KB, 1024). -define(MB, (1024 * 1024)). @@ -86,6 +89,25 @@ merge_maps(Default, New) -> urldecode(S) -> emqx_http_lib:uri_decode(S). +-spec format_listen_on( + integer() | {tuple(), integer()} | string() | binary() +) -> io_lib:chars(). +format_listen_on(Port) when is_integer(Port) -> + io_lib:format("0.0.0.0:~w", [Port]); +format_listen_on({Addr, Port}) when is_list(Addr) -> + io_lib:format("~ts:~w", [Addr, Port]); +format_listen_on({Addr, Port}) when is_tuple(Addr) -> + io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]); +format_listen_on(Str) when is_list(Str) -> + case emqx_schema:to_ip_port(Str) of + {ok, {Ip, Port}} -> + format_listen_on({Ip, Port}); + {error, _} -> + format_listen_on(list_to_integer(Str)) + end; +format_listen_on(Bin) when is_binary(Bin) -> + format_listen_on(binary_to_list(Bin)). + %%%============================================================================================== %% schema util schema(Ref) when is_atom(Ref) ->