diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 9f1373f5a..ea8ceda2e 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -24,3 +24,6 @@ {emqx_delayed,1}. {emqx_mgmt_cluster,1}. {emqx_retainer,1}. +{emqx_gateway_http_proto_v1,1}. +{emqx_gateway_api_listeners_proto_v1,1}. + diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf index 71959c31c..a2a1c8317 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf @@ -111,4 +111,26 @@ emqx_gateway_api_listeners { zh: """监听器 ID""" } } + + listener_node_status { + desc { + en: """listener status of each node in the cluster""" + zh: """监听器在集群中每个节点上的状态""" + } + } + + node { + desc { + en: """Node Name""" + zh: """节点名称""" + } + } + + running { + desc { + en: """Is In Listening""" + zh: """是否正在监听""" + } + } + } diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 9eefbb2c5..8e5772471 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -58,6 +58,9 @@ import_users/2 ]). +%% RPC +-export([do_listeners_cluster_status/1]). + %%-------------------------------------------------------------------- %% minirest behaviour callbacks %%-------------------------------------------------------------------- @@ -80,7 +83,38 @@ paths() -> listeners(get, #{bindings := #{name := Name0}}) -> with_gateway(Name0, fun(GwName, _) -> - {200, emqx_gateway_conf:listeners(GwName)} + Listeners = emqx_gateway_conf:listeners(GwName), + ListenOns = lists:map( + fun(#{id := Id, <<"bind">> := ListenOn}) -> + {Id, ListenOn} + end, + Listeners + ), + + ClusterStatus = listeners_cluster_status(ListenOns), + + Result = lists:map( + fun(#{id := Id} = Listener) -> + Listener#{ + node_status => + lists:foldl( + fun(Info, Acc) -> + case maps:get(Id, Info, undefined) of + undefined -> + Acc; + Status -> + [Status | Acc] + end + end, + [], + ClusterStatus + ) + } + end, + Listeners + ), + + {200, Result} end); listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> with_gateway(Name0, fun(GwName, Gateway) -> @@ -280,7 +314,7 @@ schema("/gateway/:name/listeners") -> ?STANDARD_RESP( #{ 200 => emqx_dashboard_swagger:schema_with_example( - hoconsc:array(emqx_gateway_api:listener_schema()), + hoconsc:array(listener_node_status_schema()), examples_listener_list() ) } @@ -553,9 +587,32 @@ params_paging_in_qs() -> roots() -> [listener]. +fields(listener_node_status) -> + [{node_status, mk(hoconsc:array(ref(node_status)), #{desc => ?DESC(listener_node_status)})}]; +fields(node_status) -> + [ + {node, mk(node, #{desc => ?DESC(node)})}, + {running, mk(boolean(), #{desc => ?DESC(running)})} + ]; +fields(tcp_listener) -> + emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); +fields(ssl_listener) -> + emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status); +fields(udp_listener) -> + emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status); +fields(dtls_listener) -> + emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status); fields(_) -> []. +listener_node_status_schema() -> + hoconsc:union([ + ref(tcp_listener), + ref(ssl_listener), + ref(udp_listener), + ref(dtls_listener) + ]). + %%-------------------------------------------------------------------- %% examples @@ -587,7 +644,13 @@ examples_listener() -> high_watermark => <<"1MB">>, nodelay => false, reuseaddr => true + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, ssl_listener => @@ -620,7 +683,13 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, udp_listener => @@ -639,7 +708,13 @@ examples_listener() -> buffer => <<"10KB">>, reuseaddr => true } + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] }, dtls_listener => #{ @@ -666,7 +741,13 @@ examples_listener() -> #{ active_n => 100, backlog => 1024 + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, dtls_listener_with_psk_ciphers => @@ -694,7 +775,13 @@ examples_listener() -> "RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA" >>, fail_if_no_peer_cert => false + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } }, lisetner_with_authn => @@ -715,7 +802,36 @@ examples_listener() -> password_hash_algorithm => #{name => <<"sha256">>}, user_id_type => <<"username">> + }, + node_status => [ + #{ + node => <<"node@127.0.0.1">>, + running => true } + ] } } }. + +listeners_cluster_status(Listeners) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_api_listeners_proto_v1:listeners_cluster_status(Nodes, Listeners) of + {Results, []} -> + Results; + {_, _BadNodes} -> + error(badrpc) + end. + +do_listeners_cluster_status(Listeners) -> + Node = node(), + maps:from_list( + lists:map( + fun({Id, ListenOn}) -> + {Id, #{ + node => Node, + running => emqx_gateway_utils:is_running(Id, ListenOn) + }} + end, + Listeners + ) + ). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 48f92b6d3..fc17ff6d8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -146,7 +146,7 @@ gateway_status(GwName) -> cluster_gateway_status(GwName) -> Nodes = mria_mnesia:running_nodes(), - case emqx_gateway_http_proto_v1:get_node_status(Nodes, GwName) of + case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of {Results, []} -> Results; {_, _BadNodes} -> diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 4a0fe3695..6a491de3d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -326,7 +326,9 @@ parse_listener_id(Id) -> _:_ -> error({invalid_listener_id, Id}) end. -is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> +is_running(ListenerId, #{<<"bind">> := ListenOn}) -> + is_running(ListenerId, ListenOn); +is_running(ListenerId, ListenOn0) -> ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0), try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid) -> diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl new file mode 100644 index 000000000..b64db8fe8 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_api_listeners_proto_v1.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_api_listeners_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + listeners_cluster_status/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec listeners_cluster_status([node()], list()) -> + emqx_rpc:multicall_result([map()]). +listeners_cluster_status(Nodes, Listeners) -> + rpc:multicall(Nodes, emqx_gateway_api_listeners, do_listeners_cluster_status, [Listeners]). diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl index 6609adba5..7bd827915 100644 --- a/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl +++ b/apps/emqx_gateway/src/proto/emqx_gateway_http_proto_v1.erl @@ -20,7 +20,6 @@ -export([ introduced_in/0, - get_cluster_status/2 ]).