Merge pull request #7884 from lafirest/fix/gateway_cluster_status

fix(gateway): add node status for some list APIs
This commit is contained in:
lafirest 2022-05-09 17:41:12 +08:00 committed by GitHub
commit d05e627332
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 320 additions and 12 deletions

View File

@ -24,3 +24,6 @@
{emqx_delayed,1}. {emqx_delayed,1}.
{emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,1}.
{emqx_retainer,1}. {emqx_retainer,1}.
{emqx_gateway_http,1}.
{emqx_gateway_api_listeners,1}.

View File

@ -118,4 +118,19 @@ emqx_gateway_api {
zh: """监听器类型""" zh: """监听器类型"""
} }
} }
gateway_node_status {
desc {
en: """The status of the gateway on each node in the cluster"""
zh: """网关在集群中每个节点上的状态"""
}
}
node {
desc {
en: """Node Name"""
zh: """节点名称"""
}
}
} }

View File

@ -111,4 +111,26 @@ emqx_gateway_api_listeners {
zh: """监听器 ID""" zh: """监听器 ID"""
} }
} }
listener_node_status {
desc {
en: """listener status of each node in the cluster"""
zh: """监听器在集群中每个节点上的状态"""
}
}
node {
desc {
en: """Node Name"""
zh: """节点名称"""
}
}
current_connections {
desc {
en: """Current Connections"""
zh: """当前连接数"""
}
}
} }

View File

@ -302,7 +302,9 @@ fields(gateway_overview) ->
required => {false, recursively}, required => {false, recursively},
desc => ?DESC(gateway_listeners) desc => ?DESC(gateway_listeners)
} }
)} )},
{node_status,
mk(hoconsc:array(ref(gateway_node_status)), #{desc => ?DESC(gateway_node_status)})}
]; ];
fields(gateway_listener_overview) -> fields(gateway_listener_overview) ->
[ [
@ -322,6 +324,25 @@ fields(gateway_listener_overview) ->
#{desc => ?DESC(gateway_listener_type)} #{desc => ?DESC(gateway_listener_type)}
)} )}
]; ];
fields(gateway_node_status) ->
[
{node, mk(node(), #{desc => ?DESC(node)})},
{status,
mk(
hoconsc:enum([running, stopped, unloaded]),
#{desc => ?DESC(gateway_status)}
)},
{max_connections,
mk(
pos_integer(),
#{desc => ?DESC(gateway_max_connections)}
)},
{current_connections,
mk(
non_neg_integer(),
#{desc => ?DESC(gateway_current_connections)}
)}
];
fields(Gw) when fields(Gw) when
Gw == stomp; Gw == stomp;
Gw == mqttsn; Gw == mqttsn;
@ -472,7 +493,15 @@ examples_gateway_overview() ->
} }
], ],
created_at => <<"2021-12-08T14:41:26.171+08:00">>, created_at => <<"2021-12-08T14:41:26.171+08:00">>,
started_at => <<"2021-12-08T14:41:26.202+08:00">> started_at => <<"2021-12-08T14:41:26.202+08:00">>,
node_status => [
#{
node => <<"node@127.0.0.1">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000
}
]
}, },
#{ #{
name => <<"mqttsn">>, name => <<"mqttsn">>,
@ -489,7 +518,15 @@ examples_gateway_overview() ->
} }
], ],
created_at => <<"2021-12-08T14:41:45.071+08:00">>, created_at => <<"2021-12-08T14:41:45.071+08:00">>,
stopped_at => <<"2021-12-08T14:56:35.576+08:00">> stopped_at => <<"2021-12-08T14:56:35.576+08:00">>,
node_status => [
#{
node => <<"node@127.0.0.1">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000
}
]
}, },
#{ #{
name => <<"stomp">>, name => <<"stomp">>,
@ -506,7 +543,15 @@ examples_gateway_overview() ->
} }
], ],
created_at => <<"2021-12-08T14:42:15.272+08:00">>, created_at => <<"2021-12-08T14:42:15.272+08:00">>,
started_at => <<"2021-12-08T14:42:15.274+08:00">> started_at => <<"2021-12-08T14:42:15.274+08:00">>,
node_status => [
#{
node => <<"node@127.0.0.1">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000
}
]
} }
]. ].

View File

@ -58,6 +58,9 @@
import_users/2 import_users/2
]). ]).
%% RPC
-export([do_listeners_cluster_status/1]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% minirest behaviour callbacks %% minirest behaviour callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -80,7 +83,8 @@ paths() ->
listeners(get, #{bindings := #{name := Name0}}) -> listeners(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
{200, emqx_gateway_conf:listeners(GwName)} Result = get_cluster_listeners_info(GwName),
{200, Result}
end); end);
listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
with_gateway(Name0, fun(GwName, Gateway) -> with_gateway(Name0, fun(GwName, Gateway) ->
@ -265,6 +269,68 @@ import_users(post, #{
page_params(Qs) -> page_params(Qs) ->
maps:with([<<"page">>, <<"limit">>], Qs). maps:with([<<"page">>, <<"limit">>], Qs).
get_cluster_listeners_info(GwName) ->
Listeners = emqx_gateway_conf:listeners(GwName),
ListenOns = lists:map(
fun(#{id := Id} = Conf) ->
ListenOn = emqx_gateway_conf:get_bind(Conf),
{Id, ListenOn}
end,
Listeners
),
ClusterStatus = listeners_cluster_status(ListenOns),
lists:map(
fun(#{id := Id} = Listener) ->
NodeStatus = lists:foldl(
fun(Info, Acc) ->
Status = maps:get(Id, Info),
[Status | Acc]
end,
[],
ClusterStatus
),
{MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus),
Listener#{
max_connections => MaxCons,
current_connections => CurrCons,
node_status => NodeStatus
}
end,
Listeners
).
listeners_cluster_status(Listeners) ->
Nodes = mria_mnesia:running_nodes(),
case emqx_gateway_api_listeners_proto_v1:listeners_cluster_status(Nodes, Listeners) of
{Results, []} ->
Results;
{_, _BadNodes} ->
error(badrpc)
end.
do_listeners_cluster_status(Listeners) ->
Node = node(),
lists:foldl(
fun({Id, ListenOn}, Acc) ->
BinId = erlang:atom_to_binary(Id),
{ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
Curr = esockd:get_current_connections({Id, ListenOn}),
Acc#{
Id => #{
node => Node,
current_connections => Curr,
max_connections => Max
}
}
end,
#{},
Listeners
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Swagger defines %% Swagger defines
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -280,7 +346,7 @@ schema("/gateway/:name/listeners") ->
?STANDARD_RESP( ?STANDARD_RESP(
#{ #{
200 => emqx_dashboard_swagger:schema_with_example( 200 => emqx_dashboard_swagger:schema_with_example(
hoconsc:array(emqx_gateway_api:listener_schema()), hoconsc:array(listener_node_status_schema()),
examples_listener_list() examples_listener_list()
) )
} }
@ -553,14 +619,50 @@ params_paging_in_qs() ->
roots() -> roots() ->
[listener]. [listener].
fields(listener_node_status) ->
[
{current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})},
{node_status,
mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{
desc => ?DESC(listener_node_status)
})}
];
fields(tcp_listener) ->
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status);
fields(ssl_listener) ->
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status);
fields(udp_listener) ->
emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status);
fields(dtls_listener) ->
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status);
fields(_) -> fields(_) ->
[]. [].
listener_node_status_schema() ->
hoconsc:union([
ref(tcp_listener),
ref(ssl_listener),
ref(udp_listener),
ref(dtls_listener)
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% examples %% examples
examples_listener_list() -> examples_listener_list() ->
[Config || #{value := Config} <- maps:values(examples_listener())]. Convert = fun(Cfg) ->
Cfg#{
current_connections => 0,
node_status => [
#{
node => <<"127.0.0.1">>,
current_connections => 0,
max_connections => 1024000
}
]
}
end,
[Convert(Config) || #{value := Config} <- maps:values(examples_listener())].
examples_listener() -> examples_listener() ->
#{ #{

View File

@ -50,6 +50,8 @@
remove_authn/2 remove_authn/2
]). ]).
-export([get_bind/1]).
%% internal exports %% internal exports
-export([ -export([
unconvert_listeners/1, unconvert_listeners/1,
@ -196,6 +198,15 @@ bind2str(LConf = #{bind := Bind}) when is_binary(Bind) ->
bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) -> bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) ->
LConf. LConf.
get_bind(#{bind := Bind}) when is_integer(Bind) ->
Bind;
get_bind(#{<<"bind">> := Bind}) when is_integer(Bind) ->
Bind;
get_bind(#{bind := Bind}) when is_binary(Bind) ->
erlang:binary_to_integer(Bind);
get_bind(#{<<"bind">> := Bind}) when is_binary(Bind) ->
erlang:binary_to_integer(Bind).
-spec listeners(atom_or_bin()) -> [map()]. -spec listeners(atom_or_bin()) -> [map()].
listeners(GwName0) -> listeners(GwName0) ->
GwName = bin(GwName0), GwName = bin(GwName0),

View File

@ -63,9 +63,13 @@
with_listener_authn/3, with_listener_authn/3,
checks/2, checks/2,
reason2resp/1, reason2resp/1,
reason2msg/1 reason2msg/1,
sum_cluster_connections/1
]). ]).
%% RPC
-export([gateway_status/1, cluster_gateway_status/1]).
-type gateway_summary() :: -type gateway_summary() ::
#{ #{
name := binary(), name := binary(),
@ -113,10 +117,13 @@ gateways(Status) ->
], ],
GwInfo0 GwInfo0
), ),
NodeStatus = cluster_gateway_status(GwName),
{MaxCons, CurrCons} = sum_cluster_connections(NodeStatus),
GwInfo1#{ GwInfo1#{
max_connections => max_connections_count(Config), max_connections => MaxCons,
current_connections => current_connections_count(GwName), current_connections => CurrCons,
listeners => get_listeners_status(GwName, Config) listeners => get_listeners_status(GwName, Config),
node_status => NodeStatus
} }
end end
end, end,
@ -127,6 +134,28 @@ gateways(Status) ->
_ -> [Gw || Gw = #{status := S} <- Gateways, S == Status] _ -> [Gw || Gw = #{status := S} <- Gateways, S == Status]
end. end.
gateway_status(GwName) ->
case emqx_gateway:lookup(GwName) of
undefined ->
#{node => node(), status => unloaded};
#{status := Status, config := Config} ->
#{
node => node(),
status => Status,
max_connections => max_connections_count(Config),
current_connections => current_connections_count(GwName)
}
end.
cluster_gateway_status(GwName) ->
Nodes = mria_mnesia:running_nodes(),
case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of
{Results, []} ->
Results;
{_, _BadNodes} ->
error(badrpc)
end.
%% @private %% @private
max_connections_count(Config) -> max_connections_count(Config) ->
Listeners = emqx_gateway_utils:normalize_config(Config), Listeners = emqx_gateway_utils:normalize_config(Config),
@ -540,5 +569,16 @@ to_list(A) when is_atom(A) ->
to_list(B) when is_binary(B) -> to_list(B) when is_binary(B) ->
binary_to_list(B). binary_to_list(B).
sum_cluster_connections(List) ->
sum_cluster_connections(List, 0, 0).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs
sum_cluster_connections(
[#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc
) ->
sum_cluster_connections(T, MaxAcc + Max, Current + CurrAcc);
sum_cluster_connections([_ | T], MaxAcc, CurrAcc) ->
sum_cluster_connections(T, MaxAcc, CurrAcc);
sum_cluster_connections([], MaxAcc, CurrAcc) ->
{MaxAcc, CurrAcc}.

View File

@ -326,7 +326,9 @@ parse_listener_id(Id) ->
_:_ -> error({invalid_listener_id, Id}) _:_ -> error({invalid_listener_id, Id})
end. end.
is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> is_running(ListenerId, #{<<"bind">> := ListenOn}) ->
is_running(ListenerId, ListenOn);
is_running(ListenerId, ListenOn0) ->
ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0), ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0),
try esockd:listener({ListenerId, ListenOn}) of try esockd:listener({ListenerId, ListenOn}) of
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_api_listeners_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
listeners_cluster_status/2
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec listeners_cluster_status([node()], list()) ->
emqx_rpc:multicall_result([map()]).
listeners_cluster_status(Nodes, Listeners) ->
rpc:multicall(Nodes, emqx_gateway_api_listeners, do_listeners_cluster_status, [Listeners]).

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_http_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_cluster_status/2
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec get_cluster_status([node()], emqx_gateway_cm:gateway_name()) ->
emqx_rpc:multicall_result([map()]).
get_cluster_status(Nodes, GwName) ->
rpc:multicall(Nodes, emqx_gateway_http, gateway_status, [GwName]).