From 93ab0458d179e634d019a3cec9b94a8cc3286616 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 6 May 2022 18:36:29 +0800 Subject: [PATCH] fix(gateway): make gateway listeners list return act like MQTT listeners --- apps/emqx/priv/bpapi.versions | 4 +- .../i18n/emqx_gateway_api_listeners_i18n.conf | 6 +- .../src/emqx_gateway_api_listeners.erl | 113 ++++++++---------- apps/emqx_gateway/src/emqx_gateway_http.erl | 7 +- 4 files changed, 57 insertions(+), 73 deletions(-) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index ea8ceda2e..1c401deb6 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -24,6 +24,6 @@ {emqx_delayed,1}. {emqx_mgmt_cluster,1}. {emqx_retainer,1}. -{emqx_gateway_http_proto_v1,1}. -{emqx_gateway_api_listeners_proto_v1,1}. +{emqx_gateway_http,1}. +{emqx_gateway_api_listeners,1}. diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf index a2a1c8317..e4f7413d0 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf @@ -126,10 +126,10 @@ emqx_gateway_api_listeners { } } - running { + current_connections { desc { - en: """Is In Listening""" - zh: """是否正在监听""" + en: """Current Connections""" + zh: """当前连接数""" } } diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 8e5772471..343351311 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -85,8 +85,8 @@ listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> Listeners = emqx_gateway_conf:listeners(GwName), ListenOns = lists:map( - fun(#{id := Id, <<"bind">> := ListenOn}) -> - {Id, ListenOn} + fun(#{id := Id, <<"bind">> := BinListenOn}) -> + {Id, erlang:binary_to_integer(BinListenOn)} end, Listeners ), @@ -95,20 +95,21 @@ listeners(get, #{bindings := #{name := Name0}}) -> Result = lists:map( fun(#{id := Id} = Listener) -> + NodeStatus = lists:foldl( + fun(Info, Acc) -> + Status = maps:get(Id, Info), + [Status | Acc] + end, + [], + ClusterStatus + ), + + {MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus), + Listener#{ - node_status => - lists:foldl( - fun(Info, Acc) -> - case maps:get(Id, Info, undefined) of - undefined -> - Acc; - Status -> - [Status | Acc] - end - end, - [], - ClusterStatus - ) + max_connections => MaxCons, + current_connections => CurrCons, + node_status => NodeStatus } end, Listeners @@ -588,11 +589,12 @@ roots() -> [listener]. fields(listener_node_status) -> - [{node_status, mk(hoconsc:array(ref(node_status)), #{desc => ?DESC(listener_node_status)})}]; -fields(node_status) -> [ - {node, mk(node, #{desc => ?DESC(node)})}, - {running, mk(boolean(), #{desc => ?DESC(running)})} + {current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})}, + {node_status, + mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{ + desc => ?DESC(listener_node_status) + })} ]; fields(tcp_listener) -> emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); @@ -617,7 +619,19 @@ listener_node_status_schema() -> %% examples examples_listener_list() -> - [Config || #{value := Config} <- maps:values(examples_listener())]. + Convert = fun(Cfg) -> + Cfg#{ + current_connections => 0, + node_status => [ + #{ + node => <<"127.0.0.1">>, + current_connections => 0, + max_connections => 1024000 + } + ] + } + end, + [Convert(Config) || #{value := Config} <- maps:values(examples_listener())]. examples_listener() -> #{ @@ -644,13 +658,7 @@ examples_listener() -> high_watermark => <<"1MB">>, nodelay => false, reuseaddr => true - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, ssl_listener => @@ -683,13 +691,7 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, udp_listener => @@ -708,13 +710,7 @@ examples_listener() -> buffer => <<"10KB">>, reuseaddr => true } - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] }, dtls_listener => #{ @@ -741,13 +737,7 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, dtls_listener_with_psk_ciphers => @@ -775,13 +765,7 @@ examples_listener() -> "RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA" >>, fail_if_no_peer_cert => false - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, lisetner_with_authn => @@ -802,13 +786,7 @@ examples_listener() -> password_hash_algorithm => #{name => <<"sha256">>}, user_id_type => <<"username">> - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } } }. @@ -824,14 +802,19 @@ listeners_cluster_status(Listeners) -> do_listeners_cluster_status(Listeners) -> Node = node(), - maps:from_list( - lists:map( - fun({Id, ListenOn}) -> - {Id, #{ + lists:foldl( + fun({Id, ListenOn}, Acc) -> + BinId = erlang:atom_to_binary(Id), + {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), + Curr = esockd:get_current_connections({Id, ListenOn}), + Acc#{ + Id => #{ node => Node, - running => emqx_gateway_utils:is_running(Id, ListenOn) - }} - end, - Listeners - ) + current_connections => Curr, + max_connections => Max + } + } + end, + #{}, + Listeners ). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 1ccea1455..5216ffc15 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -63,7 +63,8 @@ with_listener_authn/3, checks/2, reason2resp/1, - reason2msg/1 + reason2msg/1, + sum_cluster_connections/1 ]). %% RPC @@ -568,11 +569,11 @@ to_list(A) when is_atom(A) -> to_list(B) when is_binary(B) -> binary_to_list(B). -%%-------------------------------------------------------------------- -%% Internal funcs sum_cluster_connections(List) -> sum_cluster_connections(List, 0, 0). +%%-------------------------------------------------------------------- +%% Internal funcs sum_cluster_connections( [#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc ) ->