diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 9f1373f5a..1c401deb6 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -24,3 +24,6 @@ {emqx_delayed,1}. {emqx_mgmt_cluster,1}. {emqx_retainer,1}. +{emqx_gateway_http,1}. +{emqx_gateway_api_listeners,1}. + diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf index 468a88b7b..5ab9277b2 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_i18n.conf @@ -118,4 +118,19 @@ emqx_gateway_api { zh: """监听器类型""" } } + + gateway_node_status { + desc { + en: """The status of the gateway on each node in the cluster""" + zh: """网关在集群中每个节点上的状态""" + } + } + + node { + desc { + en: """Node Name""" + zh: """节点名称""" + } + } + } diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf index 71959c31c..e4f7413d0 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf @@ -111,4 +111,26 @@ emqx_gateway_api_listeners { zh: """监听器 ID""" } } + + listener_node_status { + desc { + en: """listener status of each node in the cluster""" + zh: """监听器在集群中每个节点上的状态""" + } + } + + node { + desc { + en: """Node Name""" + zh: """节点名称""" + } + } + + current_connections { + desc { + en: """Current Connections""" + zh: """当前连接数""" + } + } + } diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 86e2a946d..162fa2b62 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -302,7 +302,9 @@ fields(gateway_overview) -> required => {false, recursively}, desc => ?DESC(gateway_listeners) } - )} + )}, + {node_status, + mk(hoconsc:array(ref(gateway_node_status)), #{desc => ?DESC(gateway_node_status)})} ]; fields(gateway_listener_overview) -> [ @@ -322,6 +324,25 @@ fields(gateway_listener_overview) -> #{desc => ?DESC(gateway_listener_type)} )} ]; +fields(gateway_node_status) -> + [ + {node, mk(node(), #{desc => ?DESC(node)})}, + {status, + mk( + hoconsc:enum([running, stopped, unloaded]), + #{desc => ?DESC(gateway_status)} + )}, + {max_connections, + mk( + pos_integer(), + #{desc => ?DESC(gateway_max_connections)} + )}, + {current_connections, + mk( + non_neg_integer(), + #{desc => ?DESC(gateway_current_connections)} + )} + ]; fields(Gw) when Gw == stomp; Gw == mqttsn; @@ -472,7 +493,15 @@ examples_gateway_overview() -> } ], created_at => <<"2021-12-08T14:41:26.171+08:00">>, - started_at => <<"2021-12-08T14:41:26.202+08:00">> + started_at => <<"2021-12-08T14:41:26.202+08:00">>, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + status => <<"running">>, + current_connections => 0, + max_connections => 1024000 + } + ] }, #{ name => <<"mqttsn">>, @@ -489,7 +518,15 @@ examples_gateway_overview() -> } ], created_at => <<"2021-12-08T14:41:45.071+08:00">>, - stopped_at => <<"2021-12-08T14:56:35.576+08:00">> + stopped_at => <<"2021-12-08T14:56:35.576+08:00">>, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + status => <<"running">>, + current_connections => 0, + max_connections => 1024000 + } + ] }, #{ name => <<"stomp">>, @@ -506,7 +543,15 @@ examples_gateway_overview() -> } ], created_at => <<"2021-12-08T14:42:15.272+08:00">>, - started_at => <<"2021-12-08T14:42:15.274+08:00">> + started_at => <<"2021-12-08T14:42:15.274+08:00">>, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + status => <<"running">>, + current_connections => 0, + max_connections => 1024000 + } + ] } ]. diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 9eefbb2c5..4064af05b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -58,6 +58,9 @@ import_users/2 ]). +%% RPC +-export([do_listeners_cluster_status/1]). + %%-------------------------------------------------------------------- %% minirest behaviour callbacks %%-------------------------------------------------------------------- @@ -80,7 +83,8 @@ paths() -> listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> - {200, emqx_gateway_conf:listeners(GwName)} + Result = get_cluster_listeners_info(GwName), + {200, Result} end); listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> with_gateway(Name0, fun(GwName, Gateway) -> @@ -265,6 +269,68 @@ import_users(post, #{ page_params(Qs) -> maps:with([<<"page">>, <<"limit">>], Qs). +get_cluster_listeners_info(GwName) -> + Listeners = emqx_gateway_conf:listeners(GwName), + ListenOns = lists:map( + fun(#{id := Id} = Conf) -> + ListenOn = emqx_gateway_conf:get_bind(Conf), + {Id, ListenOn} + end, + Listeners + ), + + ClusterStatus = listeners_cluster_status(ListenOns), + + lists:map( + fun(#{id := Id} = Listener) -> + NodeStatus = lists:foldl( + fun(Info, Acc) -> + Status = maps:get(Id, Info), + [Status | Acc] + end, + [], + ClusterStatus + ), + + {MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus), + + Listener#{ + max_connections => MaxCons, + current_connections => CurrCons, + node_status => NodeStatus + } + end, + Listeners + ). + +listeners_cluster_status(Listeners) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_api_listeners_proto_v1:listeners_cluster_status(Nodes, Listeners) of + {Results, []} -> + Results; + {_, _BadNodes} -> + error(badrpc) + end. + +do_listeners_cluster_status(Listeners) -> + Node = node(), + lists:foldl( + fun({Id, ListenOn}, Acc) -> + BinId = erlang:atom_to_binary(Id), + {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), + Curr = esockd:get_current_connections({Id, ListenOn}), + Acc#{ + Id => #{ + node => Node, + current_connections => Curr, + max_connections => Max + } + } + end, + #{}, + Listeners + ). + %%-------------------------------------------------------------------- %% Swagger defines %%-------------------------------------------------------------------- @@ -280,7 +346,7 @@ schema("/gateway/:name/listeners") -> ?STANDARD_RESP( #{ 200 => emqx_dashboard_swagger:schema_with_example( - hoconsc:array(emqx_gateway_api:listener_schema()), + hoconsc:array(listener_node_status_schema()), examples_listener_list() ) } @@ -553,14 +619,50 @@ params_paging_in_qs() -> roots() -> [listener]. +fields(listener_node_status) -> + [ + {current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})}, + {node_status, + mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{ + desc => ?DESC(listener_node_status) + })} + ]; +fields(tcp_listener) -> + emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); +fields(ssl_listener) -> + emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status); +fields(udp_listener) -> + emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status); +fields(dtls_listener) -> + emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status); fields(_) -> []. +listener_node_status_schema() -> + hoconsc:union([ + ref(tcp_listener), + ref(ssl_listener), + ref(udp_listener), + ref(dtls_listener) + ]). + %%-------------------------------------------------------------------- %% examples examples_listener_list() -> - [Config || #{value := Config} <- maps:values(examples_listener())]. + Convert = fun(Cfg) -> + Cfg#{ + current_connections => 0, + node_status => [ + #{ + node => <<"127.0.0.1">>, + current_connections => 0, + max_connections => 1024000 + } + ] + } + end, + [Convert(Config) || #{value := Config} <- maps:values(examples_listener())]. examples_listener() -> #{ diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index a61754d12..2aba5bff0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -50,6 +50,8 @@ remove_authn/2 ]). +-export([get_bind/1]). + %% internal exports -export([ unconvert_listeners/1, @@ -196,6 +198,15 @@ bind2str(LConf = #{bind := Bind}) when is_binary(Bind) -> bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) -> LConf. +get_bind(#{bind := Bind}) when is_integer(Bind) -> + Bind; +get_bind(#{<<"bind">> := Bind}) when is_integer(Bind) -> + Bind; +get_bind(#{bind := Bind}) when is_binary(Bind) -> + erlang:binary_to_integer(Bind); +get_bind(#{<<"bind">> := Bind}) when is_binary(Bind) -> + erlang:binary_to_integer(Bind). + -spec listeners(atom_or_bin()) -> [map()]. listeners(GwName0) -> GwName = bin(GwName0), diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index f104c6629..5216ffc15 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -63,9 +63,13 @@ with_listener_authn/3, checks/2, reason2resp/1, - reason2msg/1 + reason2msg/1, + sum_cluster_connections/1 ]). +%% RPC +-export([gateway_status/1, cluster_gateway_status/1]). + -type gateway_summary() :: #{ name := binary(), @@ -113,10 +117,13 @@ gateways(Status) -> ], GwInfo0 ), + NodeStatus = cluster_gateway_status(GwName), + {MaxCons, CurrCons} = sum_cluster_connections(NodeStatus), GwInfo1#{ - max_connections => max_connections_count(Config), - current_connections => current_connections_count(GwName), - listeners => get_listeners_status(GwName, Config) + max_connections => MaxCons, + current_connections => CurrCons, + listeners => get_listeners_status(GwName, Config), + node_status => NodeStatus } end end, @@ -127,6 +134,28 @@ gateways(Status) -> _ -> [Gw || Gw = #{status := S} <- Gateways, S == Status] end. +gateway_status(GwName) -> + case emqx_gateway:lookup(GwName) of + undefined -> + #{node => node(), status => unloaded}; + #{status := Status, config := Config} -> + #{ + node => node(), + status => Status, + max_connections => max_connections_count(Config), + current_connections => current_connections_count(GwName) + } + end. + +cluster_gateway_status(GwName) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of + {Results, []} -> + Results; + {_, _BadNodes} -> + error(badrpc) + end. + %% @private max_connections_count(Config) -> Listeners = emqx_gateway_utils:normalize_config(Config), @@ -540,5 +569,16 @@ to_list(A) when is_atom(A) -> to_list(B) when is_binary(B) -> binary_to_list(B). +sum_cluster_connections(List) -> + sum_cluster_connections(List, 0, 0). + %%-------------------------------------------------------------------- %% Internal funcs +sum_cluster_connections( + [#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc +) -> + sum_cluster_connections(T, MaxAcc + Max, Current + CurrAcc); +sum_cluster_connections([_ | T], MaxAcc, CurrAcc) -> + sum_cluster_connections(T, MaxAcc, CurrAcc); +sum_cluster_connections([], MaxAcc, CurrAcc) -> + {MaxAcc, CurrAcc}. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 4a0fe3695..6a491de3d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -326,7 +326,9 @@ parse_listener_id(Id) -> _:_ -> error({invalid_listener_id, Id}) end. -is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> +is_running(ListenerId, #{<<"bind">> := ListenOn}) -> + is_running(ListenerId, ListenOn); +is_running(ListenerId, ListenOn0) -> ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0), try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid) -> diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl new file mode 100644 index 000000000..b64db8fe8 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_api_listeners_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + listeners_cluster_status/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec listeners_cluster_status([node()], list()) -> + emqx_rpc:multicall_result([map()]). +listeners_cluster_status(Nodes, Listeners) -> + rpc:multicall(Nodes, emqx_gateway_api_listeners, do_listeners_cluster_status, [Listeners]). diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl new file mode 100644 index 000000000..7bd827915 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_http_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_cluster_status/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_cluster_status([node()], emqx_gateway_cm:gateway_name()) -> + emqx_rpc:multicall_result([map()]). +get_cluster_status(Nodes, GwName) -> + rpc:multicall(Nodes, emqx_gateway_http, gateway_status, [GwName]).