From 2c558152ed27c73355d44089d0822fafb4123a21 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 29 Apr 2022 12:14:07 +0800 Subject: [PATCH 1/5] fix(gateway): add node_status into the result of /gateway API --- .../i18n/emqx_gateway_api_i18n.conf | 15 ++++++ apps/emqx_gateway/src/emqx_gateway_api.erl | 53 +++++++++++++++++-- apps/emqx_gateway/src/emqx_gateway_http.erl | 28 +++++++++- .../src/proto/emqx_gateway_http_proto_v1.erl | 35 ++++++++++++ 4 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf index 468a88b7b..5ab9277b2 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf @@ -118,4 +118,19 @@ emqx_gateway_api { zh: """监听器类型""" } } + + gateway_node_status { + desc { + en: """The status of the gateway on each node in the cluster""" + zh: """网关在集群中每个节点上的状态""" + } + } + + node { + desc { + en: """Node Name""" + zh: """节点名称""" + } + } + } diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 86e2a946d..162fa2b62 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -302,7 +302,9 @@ fields(gateway_overview) -> required => {false, recursively}, desc => ?DESC(gateway_listeners) } - )} + )}, + {node_status, + mk(hoconsc:array(ref(gateway_node_status)), #{desc => ?DESC(gateway_node_status)})} ]; fields(gateway_listener_overview) -> [ @@ -322,6 +324,25 @@ fields(gateway_listener_overview) -> #{desc => ?DESC(gateway_listener_type)} )} ]; +fields(gateway_node_status) -> + [ + {node, mk(node(), #{desc => ?DESC(node)})}, + {status, + mk( + hoconsc:enum([running, stopped, unloaded]), + #{desc => ?DESC(gateway_status)} + )}, + {max_connections, + mk( + pos_integer(), + #{desc => ?DESC(gateway_max_connections)} + )}, + {current_connections, + mk( + non_neg_integer(), + #{desc => ?DESC(gateway_current_connections)} + )} + ]; fields(Gw) when Gw == stomp; Gw == mqttsn; @@ -472,7 +493,15 @@ examples_gateway_overview() -> } ], created_at => <<"2021-12-08T14:41:26.171+08:00">>, - started_at => <<"2021-12-08T14:41:26.202+08:00">> + started_at => <<"2021-12-08T14:41:26.202+08:00">>, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + status => <<"running">>, + current_connections => 0, + max_connections => 1024000 + } + ] }, #{ name => <<"mqttsn">>, @@ -489,7 +518,15 @@ examples_gateway_overview() -> } ], created_at => <<"2021-12-08T14:41:45.071+08:00">>, - stopped_at => <<"2021-12-08T14:56:35.576+08:00">> + stopped_at => <<"2021-12-08T14:56:35.576+08:00">>, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + status => <<"running">>, + current_connections => 0, + max_connections => 1024000 + } + ] }, #{ name => <<"stomp">>, @@ -506,7 +543,15 @@ examples_gateway_overview() -> } ], created_at => <<"2021-12-08T14:42:15.272+08:00">>, - started_at => <<"2021-12-08T14:42:15.274+08:00">> + started_at => <<"2021-12-08T14:42:15.274+08:00">>, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + status => <<"running">>, + current_connections => 0, + max_connections => 1024000 + } + ] } ]. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index f104c6629..48f92b6d3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -66,6 +66,9 @@ reason2msg/1 ]). +%% RPC +-export([gateway_status/1, cluster_gateway_status/1]). + -type gateway_summary() :: #{ name := binary(), @@ -116,7 +119,8 @@ gateways(Status) -> GwInfo1#{ max_connections => max_connections_count(Config), current_connections => current_connections_count(GwName), - listeners => get_listeners_status(GwName, Config) + listeners => get_listeners_status(GwName, Config), + node_status => cluster_gateway_status(GwName) } end end, @@ -127,6 +131,28 @@ gateways(Status) -> _ -> [Gw || Gw = #{status := S} <- Gateways, S == Status] end. +gateway_status(GwName) -> + case emqx_gateway:lookup(GwName) of + undefined -> + #{node => node(), status => unloaded}; + #{status := Status, config := Config} -> + #{ + node => node(), + status => Status, + max_connections => max_connections_count(Config), + current_connections => current_connections_count(GwName) + } + end. + +cluster_gateway_status(GwName) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_http_proto_v1:get_node_status(Nodes, GwName) of + {Results, []} -> + Results; + {_, _BadNodes} -> + error(badrpc) + end. + %% @private max_connections_count(Config) -> Listeners = emqx_gateway_utils:normalize_config(Config), diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl new file mode 100644 index 000000000..6609adba5 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_http_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + get_cluster_status/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_cluster_status([node()], emqx_gateway_cm:gateway_name()) -> + emqx_rpc:multicall_result([map()]). +get_cluster_status(Nodes, GwName) -> + rpc:multicall(Nodes, emqx_gateway_http, gateway_status, [GwName]). From db9cb6c4a0d34c999fb81d123197c262cf786eaf Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 29 Apr 2022 16:09:19 +0800 Subject: [PATCH 2/5] fix(gateway): add node_status into the result of /gateway/{name}/listeners --- apps/emqx/priv/bpapi.versions | 3 + .../i18n/emqx_gateway_api_listeners_i18n.conf | 22 ++++ .../src/emqx_gateway_api_listeners.erl | 120 +++++++++++++++++- apps/emqx_gateway/src/emqx_gateway_http.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_utils.erl | 4 +- .../emqx_gateway_api_listeners_proto_v1.erl | 34 +++++ .../src/proto/emqx_gateway_http_proto_v1.erl | 1 - 7 files changed, 181 insertions(+), 5 deletions(-) create mode 100644 apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 9f1373f5a..ea8ceda2e 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -24,3 +24,6 @@ {emqx_delayed,1}. {emqx_mgmt_cluster,1}. {emqx_retainer,1}. +{emqx_gateway_http_proto_v1,1}. +{emqx_gateway_api_listeners_proto_v1,1}. + diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf index 71959c31c..a2a1c8317 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf @@ -111,4 +111,26 @@ emqx_gateway_api_listeners { zh: """监听器 ID""" } } + + listener_node_status { + desc { + en: """listener status of each node in the cluster""" + zh: """监听器在集群中每个节点上的状态""" + } + } + + node { + desc { + en: """Node Name""" + zh: """节点名称""" + } + } + + running { + desc { + en: """Is In Listening""" + zh: """是否正在监听""" + } + } + } diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 9eefbb2c5..8e5772471 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -58,6 +58,9 @@ import_users/2 ]). +%% RPC +-export([do_listeners_cluster_status/1]). + %%-------------------------------------------------------------------- %% minirest behaviour callbacks %%-------------------------------------------------------------------- @@ -80,7 +83,38 @@ paths() -> listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> - {200, emqx_gateway_conf:listeners(GwName)} + Listeners = emqx_gateway_conf:listeners(GwName), + ListenOns = lists:map( + fun(#{id := Id, <<"bind">> := ListenOn}) -> + {Id, ListenOn} + end, + Listeners + ), + + ClusterStatus = listeners_cluster_status(ListenOns), + + Result = lists:map( + fun(#{id := Id} = Listener) -> + Listener#{ + node_status => + lists:foldl( + fun(Info, Acc) -> + case maps:get(Id, Info, undefined) of + undefined -> + Acc; + Status -> + [Status | Acc] + end + end, + [], + ClusterStatus + ) + } + end, + Listeners + ), + + {200, Result} end); listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> with_gateway(Name0, fun(GwName, Gateway) -> @@ -280,7 +314,7 @@ schema("/gateway/:name/listeners") -> ?STANDARD_RESP( #{ 200 => emqx_dashboard_swagger:schema_with_example( - hoconsc:array(emqx_gateway_api:listener_schema()), + hoconsc:array(listener_node_status_schema()), examples_listener_list() ) } @@ -553,9 +587,32 @@ params_paging_in_qs() -> roots() -> [listener]. +fields(listener_node_status) -> + [{node_status, mk(hoconsc:array(ref(node_status)), #{desc => ?DESC(listener_node_status)})}]; +fields(node_status) -> + [ + {node, mk(node, #{desc => ?DESC(node)})}, + {running, mk(boolean(), #{desc => ?DESC(running)})} + ]; +fields(tcp_listener) -> + emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); +fields(ssl_listener) -> + emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status); +fields(udp_listener) -> + emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status); +fields(dtls_listener) -> + emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status); fields(_) -> []. +listener_node_status_schema() -> + hoconsc:union([ + ref(tcp_listener), + ref(ssl_listener), + ref(udp_listener), + ref(dtls_listener) + ]). + %%-------------------------------------------------------------------- %% examples @@ -587,7 +644,13 @@ examples_listener() -> high_watermark => <<"1MB">>, nodelay => false, reuseaddr => true + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, ssl_listener => @@ -620,7 +683,13 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, udp_listener => @@ -639,7 +708,13 @@ examples_listener() -> buffer => <<"10KB">>, reuseaddr => true } + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] }, dtls_listener => #{ @@ -666,7 +741,13 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, dtls_listener_with_psk_ciphers => @@ -694,7 +775,13 @@ examples_listener() -> "RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA" >>, fail_if_no_peer_cert => false + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, lisetner_with_authn => @@ -715,7 +802,36 @@ examples_listener() -> password_hash_algorithm => #{name => <<"sha256">>}, user_id_type => <<"username">> + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } } }. + +listeners_cluster_status(Listeners) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_api_listeners_proto_v1:listeners_cluster_status(Nodes, Listeners) of + {Results, []} -> + Results; + {_, _BadNodes} -> + error(badrpc) + end. + +do_listeners_cluster_status(Listeners) -> + Node = node(), + maps:from_list( + lists:map( + fun({Id, ListenOn}) -> + {Id, #{ + node => Node, + running => emqx_gateway_utils:is_running(Id, ListenOn) + }} + end, + Listeners + ) + ). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 48f92b6d3..fc17ff6d8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -146,7 +146,7 @@ gateway_status(GwName) -> cluster_gateway_status(GwName) -> Nodes = mria_mnesia:running_nodes(), - case emqx_gateway_http_proto_v1:get_node_status(Nodes, GwName) of + case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of {Results, []} -> Results; {_, _BadNodes} -> diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 4a0fe3695..6a491de3d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -326,7 +326,9 @@ parse_listener_id(Id) -> _:_ -> error({invalid_listener_id, Id}) end. -is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> +is_running(ListenerId, #{<<"bind">> := ListenOn}) -> + is_running(ListenerId, ListenOn); +is_running(ListenerId, ListenOn0) -> ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0), try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid) -> diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl new file mode 100644 index 000000000..b64db8fe8 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_api_listeners_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + listeners_cluster_status/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec listeners_cluster_status([node()], list()) -> + emqx_rpc:multicall_result([map()]). +listeners_cluster_status(Nodes, Listeners) -> + rpc:multicall(Nodes, emqx_gateway_api_listeners, do_listeners_cluster_status, [Listeners]). diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl index 6609adba5..7bd827915 100644 --- a/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl +++ b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl @@ -20,7 +20,6 @@ -export([ introduced_in/0, - get_cluster_status/2 ]). From f20f05161f1f936e9354fd43ee98c45c8e11789c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 5 May 2022 17:21:13 +0800 Subject: [PATCH 3/5] fix(gateway): sum the cluster connection data --- apps/emqx_gateway/src/emqx_gateway_http.erl | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index fc17ff6d8..1ccea1455 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -116,11 +116,13 @@ gateways(Status) -> ], GwInfo0 ), + NodeStatus = cluster_gateway_status(GwName), + {MaxCons, CurrCons} = sum_cluster_connections(NodeStatus), GwInfo1#{ - max_connections => max_connections_count(Config), - current_connections => current_connections_count(GwName), + max_connections => MaxCons, + current_connections => CurrCons, listeners => get_listeners_status(GwName, Config), - node_status => cluster_gateway_status(GwName) + node_status => NodeStatus } end end, @@ -568,3 +570,14 @@ to_list(B) when is_binary(B) -> %%-------------------------------------------------------------------- %% Internal funcs +sum_cluster_connections(List) -> + sum_cluster_connections(List, 0, 0). + +sum_cluster_connections( + [#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc +) -> + sum_cluster_connections(T, MaxAcc + Max, Current + CurrAcc); +sum_cluster_connections([_ | T], MaxAcc, CurrAcc) -> + sum_cluster_connections(T, MaxAcc, CurrAcc); +sum_cluster_connections([], MaxAcc, CurrAcc) -> + {MaxAcc, CurrAcc}. From 93ab0458d179e634d019a3cec9b94a8cc3286616 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 6 May 2022 18:36:29 +0800 Subject: [PATCH 4/5] fix(gateway): make gateway listeners list return act like MQTT listeners --- apps/emqx/priv/bpapi.versions | 4 +- .../i18n/emqx_gateway_api_listeners_i18n.conf | 6 +- .../src/emqx_gateway_api_listeners.erl | 113 ++++++++---------- apps/emqx_gateway/src/emqx_gateway_http.erl | 7 +- 4 files changed, 57 insertions(+), 73 deletions(-) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index ea8ceda2e..1c401deb6 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -24,6 +24,6 @@ {emqx_delayed,1}. {emqx_mgmt_cluster,1}. {emqx_retainer,1}. -{emqx_gateway_http_proto_v1,1}. -{emqx_gateway_api_listeners_proto_v1,1}. +{emqx_gateway_http,1}. +{emqx_gateway_api_listeners,1}. diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf index a2a1c8317..e4f7413d0 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf @@ -126,10 +126,10 @@ emqx_gateway_api_listeners { } } - running { + current_connections { desc { - en: """Is In Listening""" - zh: """是否正在监听""" + en: """Current Connections""" + zh: """当前连接数""" } } diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 8e5772471..343351311 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -85,8 +85,8 @@ listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> Listeners = emqx_gateway_conf:listeners(GwName), ListenOns = lists:map( - fun(#{id := Id, <<"bind">> := ListenOn}) -> - {Id, ListenOn} + fun(#{id := Id, <<"bind">> := BinListenOn}) -> + {Id, erlang:binary_to_integer(BinListenOn)} end, Listeners ), @@ -95,20 +95,21 @@ listeners(get, #{bindings := #{name := Name0}}) -> Result = lists:map( fun(#{id := Id} = Listener) -> + NodeStatus = lists:foldl( + fun(Info, Acc) -> + Status = maps:get(Id, Info), + [Status | Acc] + end, + [], + ClusterStatus + ), + + {MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus), + Listener#{ - node_status => - lists:foldl( - fun(Info, Acc) -> - case maps:get(Id, Info, undefined) of - undefined -> - Acc; - Status -> - [Status | Acc] - end - end, - [], - ClusterStatus - ) + max_connections => MaxCons, + current_connections => CurrCons, + node_status => NodeStatus } end, Listeners @@ -588,11 +589,12 @@ roots() -> [listener]. fields(listener_node_status) -> - [{node_status, mk(hoconsc:array(ref(node_status)), #{desc => ?DESC(listener_node_status)})}]; -fields(node_status) -> [ - {node, mk(node, #{desc => ?DESC(node)})}, - {running, mk(boolean(), #{desc => ?DESC(running)})} + {current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})}, + {node_status, + mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{ + desc => ?DESC(listener_node_status) + })} ]; fields(tcp_listener) -> emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); @@ -617,7 +619,19 @@ listener_node_status_schema() -> %% examples examples_listener_list() -> - [Config || #{value := Config} <- maps:values(examples_listener())]. + Convert = fun(Cfg) -> + Cfg#{ + current_connections => 0, + node_status => [ + #{ + node => <<"127.0.0.1">>, + current_connections => 0, + max_connections => 1024000 + } + ] + } + end, + [Convert(Config) || #{value := Config} <- maps:values(examples_listener())]. examples_listener() -> #{ @@ -644,13 +658,7 @@ examples_listener() -> high_watermark => <<"1MB">>, nodelay => false, reuseaddr => true - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, ssl_listener => @@ -683,13 +691,7 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, udp_listener => @@ -708,13 +710,7 @@ examples_listener() -> buffer => <<"10KB">>, reuseaddr => true } - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] }, dtls_listener => #{ @@ -741,13 +737,7 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, dtls_listener_with_psk_ciphers => @@ -775,13 +765,7 @@ examples_listener() -> "RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA" >>, fail_if_no_peer_cert => false - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } }, lisetner_with_authn => @@ -802,13 +786,7 @@ examples_listener() -> password_hash_algorithm => #{name => <<"sha256">>}, user_id_type => <<"username">> - }, - node_status => [ - #{ - node => <<"node@127.0.0.1">>, - running => true } - ] } } }. @@ -824,14 +802,19 @@ listeners_cluster_status(Listeners) -> do_listeners_cluster_status(Listeners) -> Node = node(), - maps:from_list( - lists:map( - fun({Id, ListenOn}) -> - {Id, #{ + lists:foldl( + fun({Id, ListenOn}, Acc) -> + BinId = erlang:atom_to_binary(Id), + {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), + Curr = esockd:get_current_connections({Id, ListenOn}), + Acc#{ + Id => #{ node => Node, - running => emqx_gateway_utils:is_running(Id, ListenOn) - }} - end, - Listeners - ) + current_connections => Curr, + max_connections => Max + } + } + end, + #{}, + Listeners ). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 1ccea1455..5216ffc15 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -63,7 +63,8 @@ with_listener_authn/3, checks/2, reason2resp/1, - reason2msg/1 + reason2msg/1, + sum_cluster_connections/1 ]). %% RPC @@ -568,11 +569,11 @@ to_list(A) when is_atom(A) -> to_list(B) when is_binary(B) -> binary_to_list(B). -%%-------------------------------------------------------------------- -%% Internal funcs sum_cluster_connections(List) -> sum_cluster_connections(List, 0, 0). +%%-------------------------------------------------------------------- +%% Internal funcs sum_cluster_connections( [#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc ) -> From 81e57f21481704aa6dfd6e734d81492db9166cbc Mon Sep 17 00:00:00 2001 From: firest Date: Sat, 7 May 2022 16:39:59 +0800 Subject: [PATCH 5/5] fix(gateway): make it safer to get the value of bind --- .../src/emqx_gateway_api_listeners.erl | 123 +++++++++--------- apps/emqx_gateway/src/emqx_gateway_conf.erl | 11 ++ 2 files changed, 74 insertions(+), 60 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 343351311..4064af05b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -83,38 +83,7 @@ paths() -> listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> - Listeners = emqx_gateway_conf:listeners(GwName), - ListenOns = lists:map( - fun(#{id := Id, <<"bind">> := BinListenOn}) -> - {Id, erlang:binary_to_integer(BinListenOn)} - end, - Listeners - ), - - ClusterStatus = listeners_cluster_status(ListenOns), - - Result = lists:map( - fun(#{id := Id} = Listener) -> - NodeStatus = lists:foldl( - fun(Info, Acc) -> - Status = maps:get(Id, Info), - [Status | Acc] - end, - [], - ClusterStatus - ), - - {MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus), - - Listener#{ - max_connections => MaxCons, - current_connections => CurrCons, - node_status => NodeStatus - } - end, - Listeners - ), - + Result = get_cluster_listeners_info(GwName), {200, Result} end); listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> @@ -300,6 +269,68 @@ import_users(post, #{ page_params(Qs) -> maps:with([<<"page">>, <<"limit">>], Qs). +get_cluster_listeners_info(GwName) -> + Listeners = emqx_gateway_conf:listeners(GwName), + ListenOns = lists:map( + fun(#{id := Id} = Conf) -> + ListenOn = emqx_gateway_conf:get_bind(Conf), + {Id, ListenOn} + end, + Listeners + ), + + ClusterStatus = listeners_cluster_status(ListenOns), + + lists:map( + fun(#{id := Id} = Listener) -> + NodeStatus = lists:foldl( + fun(Info, Acc) -> + Status = maps:get(Id, Info), + [Status | Acc] + end, + [], + ClusterStatus + ), + + {MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus), + + Listener#{ + max_connections => MaxCons, + current_connections => CurrCons, + node_status => NodeStatus + } + end, + Listeners + ). + +listeners_cluster_status(Listeners) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_api_listeners_proto_v1:listeners_cluster_status(Nodes, Listeners) of + {Results, []} -> + Results; + {_, _BadNodes} -> + error(badrpc) + end. + +do_listeners_cluster_status(Listeners) -> + Node = node(), + lists:foldl( + fun({Id, ListenOn}, Acc) -> + BinId = erlang:atom_to_binary(Id), + {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), + Curr = esockd:get_current_connections({Id, ListenOn}), + Acc#{ + Id => #{ + node => Node, + current_connections => Curr, + max_connections => Max + } + } + end, + #{}, + Listeners + ). + %%-------------------------------------------------------------------- %% Swagger defines %%-------------------------------------------------------------------- @@ -790,31 +821,3 @@ examples_listener() -> } } }. - -listeners_cluster_status(Listeners) -> - Nodes = mria_mnesia:running_nodes(), - case emqx_gateway_api_listeners_proto_v1:listeners_cluster_status(Nodes, Listeners) of - {Results, []} -> - Results; - {_, _BadNodes} -> - error(badrpc) - end. - -do_listeners_cluster_status(Listeners) -> - Node = node(), - lists:foldl( - fun({Id, ListenOn}, Acc) -> - BinId = erlang:atom_to_binary(Id), - {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), - Curr = esockd:get_current_connections({Id, ListenOn}), - Acc#{ - Id => #{ - node => Node, - current_connections => Curr, - max_connections => Max - } - } - end, - #{}, - Listeners - ). diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index a61754d12..2aba5bff0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -50,6 +50,8 @@ remove_authn/2 ]). +-export([get_bind/1]). + %% internal exports -export([ unconvert_listeners/1, @@ -196,6 +198,15 @@ bind2str(LConf = #{bind := Bind}) when is_binary(Bind) -> bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) -> LConf. +get_bind(#{bind := Bind}) when is_integer(Bind) -> + Bind; +get_bind(#{<<"bind">> := Bind}) when is_integer(Bind) -> + Bind; +get_bind(#{bind := Bind}) when is_binary(Bind) -> + erlang:binary_to_integer(Bind); +get_bind(#{<<"bind">> := Bind}) when is_binary(Bind) -> + erlang:binary_to_integer(Bind). + -spec listeners(atom_or_bin()) -> [map()]. listeners(GwName0) -> GwName = bin(GwName0),