fix(gateway): make gateway listeners list return act like MQTT listeners

This commit is contained in:
firest 2022-05-06 18:36:29 +08:00
parent f20f05161f
commit 93ab0458d1
4 changed files with 57 additions and 73 deletions

View File

@ -24,6 +24,6 @@
{emqx_delayed,1}. {emqx_delayed,1}.
{emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,1}.
{emqx_retainer,1}. {emqx_retainer,1}.
{emqx_gateway_http_proto_v1,1}. {emqx_gateway_http,1}.
{emqx_gateway_api_listeners_proto_v1,1}. {emqx_gateway_api_listeners,1}.

View File

@ -126,10 +126,10 @@ emqx_gateway_api_listeners {
} }
} }
running { current_connections {
desc { desc {
en: """Is In Listening""" en: """Current Connections"""
zh: """是否正在监听""" zh: """当前连接数"""
} }
} }

View File

@ -85,8 +85,8 @@ listeners(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
Listeners = emqx_gateway_conf:listeners(GwName), Listeners = emqx_gateway_conf:listeners(GwName),
ListenOns = lists:map( ListenOns = lists:map(
fun(#{id := Id, <<"bind">> := ListenOn}) -> fun(#{id := Id, <<"bind">> := BinListenOn}) ->
{Id, ListenOn} {Id, erlang:binary_to_integer(BinListenOn)}
end, end,
Listeners Listeners
), ),
@ -95,20 +95,21 @@ listeners(get, #{bindings := #{name := Name0}}) ->
Result = lists:map( Result = lists:map(
fun(#{id := Id} = Listener) -> fun(#{id := Id} = Listener) ->
NodeStatus = lists:foldl(
fun(Info, Acc) ->
Status = maps:get(Id, Info),
[Status | Acc]
end,
[],
ClusterStatus
),
{MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus),
Listener#{ Listener#{
node_status => max_connections => MaxCons,
lists:foldl( current_connections => CurrCons,
fun(Info, Acc) -> node_status => NodeStatus
case maps:get(Id, Info, undefined) of
undefined ->
Acc;
Status ->
[Status | Acc]
end
end,
[],
ClusterStatus
)
} }
end, end,
Listeners Listeners
@ -588,11 +589,12 @@ roots() ->
[listener]. [listener].
fields(listener_node_status) -> fields(listener_node_status) ->
[{node_status, mk(hoconsc:array(ref(node_status)), #{desc => ?DESC(listener_node_status)})}];
fields(node_status) ->
[ [
{node, mk(node, #{desc => ?DESC(node)})}, {current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})},
{running, mk(boolean(), #{desc => ?DESC(running)})} {node_status,
mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{
desc => ?DESC(listener_node_status)
})}
]; ];
fields(tcp_listener) -> fields(tcp_listener) ->
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status); emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status);
@ -617,7 +619,19 @@ listener_node_status_schema() ->
%% examples %% examples
examples_listener_list() -> examples_listener_list() ->
[Config || #{value := Config} <- maps:values(examples_listener())]. Convert = fun(Cfg) ->
Cfg#{
current_connections => 0,
node_status => [
#{
node => <<"127.0.0.1">>,
current_connections => 0,
max_connections => 1024000
}
]
}
end,
[Convert(Config) || #{value := Config} <- maps:values(examples_listener())].
examples_listener() -> examples_listener() ->
#{ #{
@ -644,13 +658,7 @@ examples_listener() ->
high_watermark => <<"1MB">>, high_watermark => <<"1MB">>,
nodelay => false, nodelay => false,
reuseaddr => true reuseaddr => true
},
node_status => [
#{
node => <<"node@127.0.0.1">>,
running => true
} }
]
} }
}, },
ssl_listener => ssl_listener =>
@ -683,13 +691,7 @@ examples_listener() ->
#{ #{
active_n => 100, active_n => 100,
backlog => 1024 backlog => 1024
},
node_status => [
#{
node => <<"node@127.0.0.1">>,
running => true
} }
]
} }
}, },
udp_listener => udp_listener =>
@ -708,13 +710,7 @@ examples_listener() ->
buffer => <<"10KB">>, buffer => <<"10KB">>,
reuseaddr => true reuseaddr => true
} }
},
node_status => [
#{
node => <<"node@127.0.0.1">>,
running => true
} }
]
}, },
dtls_listener => dtls_listener =>
#{ #{
@ -741,13 +737,7 @@ examples_listener() ->
#{ #{
active_n => 100, active_n => 100,
backlog => 1024 backlog => 1024
},
node_status => [
#{
node => <<"node@127.0.0.1">>,
running => true
} }
]
} }
}, },
dtls_listener_with_psk_ciphers => dtls_listener_with_psk_ciphers =>
@ -775,13 +765,7 @@ examples_listener() ->
"RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA" "RSA-PSK-AES128-CBC-SHA256,RSA-PSK-AES256-CBC-SHA,RSA-PSK-AES128-CBC-SHA"
>>, >>,
fail_if_no_peer_cert => false fail_if_no_peer_cert => false
},
node_status => [
#{
node => <<"node@127.0.0.1">>,
running => true
} }
]
} }
}, },
lisetner_with_authn => lisetner_with_authn =>
@ -802,13 +786,7 @@ examples_listener() ->
password_hash_algorithm => password_hash_algorithm =>
#{name => <<"sha256">>}, #{name => <<"sha256">>},
user_id_type => <<"username">> user_id_type => <<"username">>
},
node_status => [
#{
node => <<"node@127.0.0.1">>,
running => true
} }
]
} }
} }
}. }.
@ -824,14 +802,19 @@ listeners_cluster_status(Listeners) ->
do_listeners_cluster_status(Listeners) -> do_listeners_cluster_status(Listeners) ->
Node = node(), Node = node(),
maps:from_list( lists:foldl(
lists:map( fun({Id, ListenOn}, Acc) ->
fun({Id, ListenOn}) -> BinId = erlang:atom_to_binary(Id),
{Id, #{ {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
Curr = esockd:get_current_connections({Id, ListenOn}),
Acc#{
Id => #{
node => Node, node => Node,
running => emqx_gateway_utils:is_running(Id, ListenOn) current_connections => Curr,
}} max_connections => Max
end, }
Listeners }
) end,
#{},
Listeners
). ).

View File

@ -63,7 +63,8 @@
with_listener_authn/3, with_listener_authn/3,
checks/2, checks/2,
reason2resp/1, reason2resp/1,
reason2msg/1 reason2msg/1,
sum_cluster_connections/1
]). ]).
%% RPC %% RPC
@ -568,11 +569,11 @@ to_list(A) when is_atom(A) ->
to_list(B) when is_binary(B) -> to_list(B) when is_binary(B) ->
binary_to_list(B). binary_to_list(B).
%%--------------------------------------------------------------------
%% Internal funcs
sum_cluster_connections(List) -> sum_cluster_connections(List) ->
sum_cluster_connections(List, 0, 0). sum_cluster_connections(List, 0, 0).
%%--------------------------------------------------------------------
%% Internal funcs
sum_cluster_connections( sum_cluster_connections(
[#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc [#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc
) -> ) ->