From 9642bcce885734f3d948a478123ed09e013b960d Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Tue, 20 Jul 2021 09:31:46 +0800 Subject: [PATCH] refactor: listeners api; add: listeners list function; fix: listener already start error --- apps/emqx/src/emqx_listeners.erl | 57 ++- apps/emqx_management/src/emqx_mgmt.erl | 57 +-- .../src/emqx_mgmt_api_listeners.erl | 345 +++++++++++++++--- .../test/emqx_mgmt_listeners_api_SUITE.erl | 120 ++++++ 4 files changed, 510 insertions(+), 69 deletions(-) create mode 100644 apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index e2d9c698c..faad35c3d 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -20,9 +20,11 @@ -include("emqx_mqtt.hrl"). %% APIs --export([ start/0 +-export([ list/0 + , start/0 , restart/0 , stop/0 + , is_running/1 ]). -export([ start_listener/1 @@ -33,6 +35,57 @@ , restart_listener/3 ]). +-spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]). +list() -> + Zones = maps:to_list(emqx_config:get([zones], #{})), + lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]). + +list(ZoneName, ZoneConf) -> + Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})), + [ + begin + ListenerId = listener_id(ZoneName, LName), + Running = is_running(ListenerId), + Conf = merge_zone_and_listener_confs(ZoneConf, LConf), + {ListenerId, maps:put(running, Running, Conf)} + end + || {LName, LConf} <- Listeners]. + +-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}. +is_running(ListenerId) -> + Zones = maps:to_list(emqx_config:get([zones], #{})), + Listeners = lists:append( + [ + [{listener_id(ZoneName, LName),merge_zone_and_listener_confs(ZoneConf, LConf)} + || {LName, LConf} <- maps:to_list(maps:get(listeners, ZoneConf, #{}))] + || {ZoneName, ZoneConf} <- Zones]), + case proplists:get_value(ListenerId, Listeners, undefined) of + undefined -> + {error, no_found}; + Conf -> + is_running(ListenerId, Conf) + end. + +is_running(ListenerId, #{type := tcp, bind := ListenOn})-> + try esockd:listener({ListenerId, ListenOn}) of + Pid when is_pid(Pid)-> + true + catch _:_ -> + false + end; + +is_running(ListenerId, #{type := ws})-> + try + Info = ranch:info(ListenerId), + proplists:get_value(status, Info) =:= running + catch _:_ -> + false + end; + +is_running(_ListenerId, #{type := quic})-> +%% TODO: quic support + {error, no_found}. + %% @doc Start all listeners. -spec(start() -> ok). start() -> @@ -52,6 +105,8 @@ start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) -> {ok, _} -> console_print("Start ~s listener ~s on ~s successfully.~n", [Type, listener_id(ZoneName, ListenerName), format(Bind)]); + {error, {already_started, Pid}} -> + {error, {already_started, Pid}}; {error, Reason} -> io:format(standard_error, "Failed to start ~s listener ~s on ~s: ~0p~n", [Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]), diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index b539b223b..eae5563ba 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -85,7 +85,10 @@ %% Listeners -export([ list_listeners/0 , list_listeners/1 - , restart_listener/2 + , list_listeners/2 + , list_listeners_by_id/1 + , get_listener/2 + , manage_listener/2 ]). %% Alarms @@ -451,37 +454,39 @@ reload_plugin(Node, Plugin) -> %%-------------------------------------------------------------------- list_listeners() -> - [{Node, list_listeners(Node)} || Node <- ekka_mnesia:running_nodes()]. + lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). + +list_listeners(Node, Identifier) -> + listener_id_filter(Identifier, list_listeners(Node)). list_listeners(Node) when Node =:= node() -> - Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) -> - #{protocol => Protocol, - listen_on => ListenOn, - identifier => Protocol, - acceptors => esockd:get_acceptors({Protocol, ListenOn}), - max_conns => esockd:get_max_connections({Protocol, ListenOn}), - current_conns => esockd:get_current_connections({Protocol, ListenOn}), - shutdown_count => esockd:get_shutdown_count({Protocol, ListenOn})} - end, esockd:listeners()), - Http = lists:map(fun({Protocol, Opts}) -> - #{protocol => Protocol, - listen_on => proplists:get_value(port, Opts), - acceptors => maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0), - max_conns => proplists:get_value(max_connections, Opts), - current_conns => proplists:get_value(all_connections, Opts), - shutdown_count => []} - end, ranch:info()), - Tcp ++ Http; + [{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()]; list_listeners(Node) -> rpc_call(Node, list_listeners, [Node]). --spec restart_listener(node(), atom()) -> ok | {error, term()}. -restart_listener(Node, Identifier) when Node =:= node() -> - emqx_listeners:restart_listener(Identifier); +list_listeners_by_id(Identifier) -> + listener_id_filter(Identifier, list_listeners()). -restart_listener(Node, Identifier) -> - rpc_call(Node, restart_listener, [Node, Identifier]). +get_listener(Node, Identifier) -> + case listener_id_filter(Identifier, list_listeners(Node)) of + [] -> + {error, not_found}; + [Listener] -> + Listener + end. + +listener_id_filter(Identifier, Listeners) -> + Filter = + fun({Id, _}) -> Id =:= Identifier end, + lists:filter(Filter, Listeners). + +-spec manage_listener(Operation :: start_listener|stop_listener|restart_listener, Param :: map()) -> + ok | {error, Reason :: term()}. +manage_listener(Operation, #{identifier := Identifier, node := Node}) when Node =:= node()-> + erlang:apply(emqx_listeners, Operation, [Identifier]); +manage_listener(Operation, Param = #{node := Node}) -> + rpc_call(Node, restart_listener, [Operation, Param]). %%-------------------------------------------------------------------- %% Get Alarms @@ -542,7 +547,7 @@ item(route, {Topic, Node}) -> #{topic => Topic, node => Node}. %%-------------------------------------------------------------------- -%% Internel Functions. +%% Internal Functions. %%-------------------------------------------------------------------- rpc_call(Node, Fun, Args) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index a2bc7309d..34e16bc53 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -16,58 +16,319 @@ -module(emqx_mgmt_api_listeners). --rest_api(#{name => list_listeners, - method => 'GET', - path => "/listeners/", - func => list, - descr => "A list of listeners in the cluster"}). +-behavior(minirest_api). --rest_api(#{name => list_node_listeners, - method => 'GET', - path => "/nodes/:atom:node/listeners", - func => list, - descr => "A list of listeners on the node"}). +-export([api_spec/0]). --rest_api(#{name => restart_listener, - method => 'PUT', - path => "/listeners/:atom:identifier/restart", - func => restart, - descr => "Restart a listener in the cluster"}). +-export([ listeners/2 + , listener/2 + , node_listener/2 + , node_listeners/2 + , manage_listeners/2 + , manage_nodes_listeners/2]). --rest_api(#{name => restart_node_listener, - method => 'PUT', - path => "/nodes/:atom:node/listeners/:atom:identifier/restart", - func => restart, - descr => "Restart a listener on a node"}). +-export([format/1]). --export([list/2, restart/2]). +-include_lib("emqx/include/emqx.hrl"). -%% List listeners on a node. -list(#{node := Node}, _Params) -> - emqx_mgmt:return({ok, format(emqx_mgmt:list_listeners(Node))}); +api_spec() -> + { + [ + listeners_api(), + restart_listeners_api(), + nodes_listeners_api(), + nodes_listener_api(), + manage_listeners_api(), + manage_nodes_listeners_api() + ], + [listener_schema()] + }. + +listener_schema() -> + #{ + listener => #{ + type => object, + properties => #{ + node => #{ + type => string, + description => <<"Node">>, + example => node()}, + identifier => #{ + type => string, + description => <<"Identifier">>}, + acceptors => #{ + type => integer, + description => <<"Number of Acceptor proce">>}, + max_conn => #{ + type => integer, + description => <<"Maximum number of allowed connection">>}, + type => #{ + type => string, + description => <<"Plugin decription">>}, + listen_on => #{ + type => string, + description => <<"Litening port">>}, + running => #{ + type => boolean, + description => <<"Open or close">>}, + auth => #{ + type => boolean, + description => <<"Has auth">>}}}}. + +listeners_api() -> + Metadata = #{ + get => #{ + description => "List listeners in cluster", + responses => #{ + <<"200">> => + emqx_mgmt_util:response_array_schema(<<"List all listeners">>, <<"listener">>)}}}, + {"/listeners", Metadata, listeners}. + +restart_listeners_api() -> + Metadata = #{ + get => #{ + description => "List listeners by listener ID", + parameters => [param_path_identifier()], + responses => #{ + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']), + <<"200">> => + emqx_mgmt_util:response_array_schema(<<"List listener info ok">>, <<"listener">>)}}}, + {"/listeners/:identifier", Metadata, listener}. + +manage_listeners_api() -> + Metadata = #{ + get => #{ + description => "Restart listeners in cluster", + parameters => [ + param_path_identifier(), + param_path_operation()], + responses => #{ + <<"500">> => + emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, + ['BAD_LISTENER_ID']), + <<"400">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, + ['BAD_REQUEST']), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Operation success">>)}}}, + {"/listeners/:identifier/:operation", Metadata, manage_listeners}. + +manage_nodes_listeners_api() -> + Metadata = #{ + get => #{ + description => "Restart listeners in cluster", + parameters => [ + param_path_node(), + param_path_identifier(), + param_path_operation()], + responses => #{ + <<"500">> => + emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Bad node or Listener id not found">>, + ['BAD_NODE_NAME','BAD_LISTENER_ID']), + <<"400">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, + ['BAD_REQUEST']), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Operation success">>)}}}, + {"/node/:node/listeners/:identifier/:operation", Metadata, manage_nodes_listeners}. + +nodes_listeners_api() -> + Metadata = #{ + get => #{ + description => "Get listener info in one node", + parameters => [param_path_node(), param_path_identifier()], + responses => #{ + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Node name or listener id not found">>, + ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Get listener info ok">>, <<"listener">>)}}}, + {"/nodes/:node/listeners/:identifier", Metadata, node_listener}. + +nodes_listener_api() -> + Metadata = #{ + get => #{ + description => "List listeners in one node", + parameters => [param_path_node()], + responses => #{ + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Get listener info ok">>, <<"listener">>)}}}, + {"/nodes/:node/listeners", Metadata, node_listeners}. +%%%============================================================================================== +%% parameters +param_path_node() -> + #{ + name => node, + in => path, + schema => #{type => string}, + required => true, + example => node() + }. + +param_path_identifier() -> + {Example,_} = hd(emqx_mgmt:list_listeners(node())), + #{ + name => identifier, + in => path, + schema => #{type => string}, + required => true, + example => Example + }. + +param_path_operation()-> + #{ + name => operation, + in => path, + required => true, + schema => #{ + type => string, + enum => [start, stop, restart]}, + example => restart + }. + +%%%============================================================================================== +%% api +listeners(get, _Request) -> + list(). + +listener(get, Request) -> + ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)), + get_listeners(#{identifier => ListenerID}). + +node_listeners(get, Request) -> + Node = binary_to_atom(cowboy_req:binding(node, Request)), + get_listeners(#{node => Node}). + +node_listener(get, Request) -> + Node = binary_to_atom(cowboy_req:binding(node, Request)), + ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)), + get_listeners(#{node => Node, identifier => ListenerID}). + +manage_listeners(_, Request) -> + Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)), + Operation = binary_to_atom(cowboy_req:binding(operation, Request)), + manage(Operation, #{identifier => Identifier}). + +manage_nodes_listeners(_, Request) -> + Node = binary_to_atom(cowboy_req:binding(node, Request)), + Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)), + Operation = binary_to_atom(cowboy_req:binding(operation, Request)), + manage(Operation, #{identifier => Identifier, node => Node}). + +%%%============================================================================================== %% List listeners in the cluster. -list(_Binding, _Params) -> - emqx_mgmt:return({ok, [#{node => Node, listeners => format(Listeners)} - || {Node, Listeners} <- emqx_mgmt:list_listeners()]}). +list() -> + {200, format(emqx_mgmt:list_listeners())}. -%% Restart listeners on a node. -restart(#{node := Node, identifier := Identifier}, _Params) -> - case emqx_mgmt:restart_listener(Node, Identifier) of - ok -> emqx_mgmt:return({ok, "Listener restarted."}); - {error, Error} -> emqx_mgmt:return({error, Error}) - end; -%% Restart listeners on all nodes in the cluster. -restart(#{identifier := Identifier}, _Params) -> - Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()], - case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of - [] -> emqx_mgmt:return(ok); - Errors -> emqx_mgmt:return({error, {restart, Errors}}) +get_listeners(Param) -> + case list_listener(Param) of + {error, not_found} -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; + {error, nodedown} -> + Node = maps:get(node, Param), + Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])), + Response = #{code => 'BAD_NODE_NAME', message => Reason}, + {404, Response}; + [] -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; + Data -> + {200, Data} end. +manage(Operation0, Param) -> + OperationMap = #{start => start_listener, stop => stop_listener, restart => restart_listener}, + Operation = maps:get(Operation0, OperationMap), + case list_listener(Param) of + {error, not_found} -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; + {error, nodedown} -> + Node = maps:get(node, Param), + Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])), + Response = #{code => 'BAD_NODE_NAME', message => Reason}, + {404, Response}; + [] -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}}; + ListenersOrSingleListener -> + manage_(Operation, ListenersOrSingleListener) + end. + +manage_(Operation, Listener) when is_map(Listener) -> + manage_(Operation, [Listener]); +manage_(Operation, Listeners) when is_list(Listeners) -> + Results = [emqx_mgmt:manage_listener(Operation, Listener) || Listener <- Listeners], + case lists:filter(fun(Result) -> Result =/= ok end, Results) of + [] -> + {200}; + Errors -> + case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of + [] -> + Identifier = maps:get(identifier, hd(Listeners)), + Message = list_to_binary(io_lib:format("Already Started: ~s", [Identifier])), + {400, #{code => 'BAD_REQUEST', message => Message}}; + _ -> + case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of + [] -> + Identifier = maps:get(identifier, hd(Listeners)), + Message = list_to_binary(io_lib:format("Already Stoped: ~s", [Identifier])), + {400, #{code => 'BAD_REQUEST', message => Message}}; + _ -> + Reason = list_to_binary(io_lib:format("~p", [Errors])), + {500, #{code => 'UNKNOW_ERROR', message => Reason}} + end + end + end. + +%%%============================================================================================== +%% util function +list_listener(Params) -> + format(list_listener_(Params)). + +list_listener_(#{node := Node, identifier := Identifier}) -> + emqx_mgmt:get_listener(Node, Identifier); +list_listener_(#{identifier := Identifier}) -> + emqx_mgmt:list_listeners_by_id(Identifier); +list_listener_(#{node := Node}) -> + emqx_mgmt:list_listeners(Node); +list_listener_(#{}) -> + emqx_mgmt:list_listeners(). + format(Listeners) when is_list(Listeners) -> - [ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))} - || Info = #{listen_on := ListenOn} <- Listeners ]; + [format(Listener) || Listener <- Listeners]; -format({error, Reason}) -> [{error, Reason}]. +format({error, Reason}) -> + {error, Reason}; +format({Identifier, Conf}) -> + #{ + identifier => Identifier, + node => maps:get(node, Conf), + acceptors => maps:get(acceptors, Conf), + max_conn => maps:get(max_connections, Conf), + type => maps:get(type, Conf), + listen_on => list_to_binary(esockd:to_string(maps:get(bind, Conf))), + running => trans_running(Conf), + auth => maps:get(enable, maps:get(auth, Conf)) + }. +trans_running(Conf) -> + case maps:get(running, Conf) of + {error, _} -> + false; + Running -> + Running + end. diff --git a/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl new file mode 100644 index 000000000..e3d50f57c --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl @@ -0,0 +1,120 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_listeners_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_list_listeners(_) -> + Path = emqx_mgmt_api_test_util:api_path(["listeners"]), + get_api(Path). + +t_list_node_listeners(_) -> + Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]), + get_api(Path). + +t_get_listeners(_) -> + LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), + Identifier = maps:get(identifier, LocalListener), + Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]), + get_api(Path). + +t_get_node_listeners(_) -> + LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), + Identifier = maps:get(identifier, LocalListener), + Path = emqx_mgmt_api_test_util:api_path( + ["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(Identifier)]), + get_api(Path). + +t_stop_listener(_) -> + LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), + Identifier = maps:get(identifier, LocalListener), + Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier), "stop"]), + {ok, _} = emqx_mgmt_api_test_util:request_api(get, Path), + GetPath = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]), + {ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath), + Listeners = emqx_json:decode(ListenersResponse, [return_maps]), + [listener_stats(Listener, false) || Listener <- Listeners]. + +get_api(Path) -> + {ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path), + LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()), + case emqx_json:decode(ListenersData, [return_maps]) of + [Listener] -> + Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8), + Filter = + fun(Local) -> + maps:get(identifier, Local) =:= Identifier + end, + LocalListener = hd(lists:filter(Filter, LocalListeners)), + comparison_listener(LocalListener, Listener); + Listeners when is_list(Listeners) -> + ?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)), + Fun = + fun(LocalListener) -> + Identifier = maps:get(identifier, LocalListener), + IdentifierBinary = atom_to_binary(Identifier, utf8), + Filter = + fun(Listener) -> + maps:get(<<"identifier">>, Listener) =:= IdentifierBinary + end, + Listener = hd(lists:filter(Filter, Listeners)), + comparison_listener(LocalListener, Listener) + end, + lists:foreach(Fun, LocalListeners); + Listener when is_map(Listener) -> + Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8), + Filter = + fun(Local) -> + maps:get(identifier, Local) =:= Identifier + end, + LocalListener = hd(lists:filter(Filter, LocalListeners)), + comparison_listener(LocalListener, Listener) + end. + +comparison_listener(Local, Response) -> + ?assertEqual(maps:get(identifier, Local), binary_to_atom(maps:get(<<"identifier">>, Response))), + ?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))), + ?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)), + ?assertEqual(maps:get(max_conn, Local), maps:get(<<"max_conn">>, Response)), + ?assertEqual(maps:get(listen_on, Local), maps:get(<<"listen_on">>, Response)), + ?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)), + ?assertEqual(maps:get(auth, Local), maps:get(<<"auth">>, Response)). + + +listener_stats(Listener, Stats) -> + ?assertEqual(maps:get(<<"running">>, Listener), Stats).