From 9998b613c85ac6fbb54dfb26ad68f53f31b7a7d7 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 18 Apr 2022 19:36:12 +0800 Subject: [PATCH 1/2] refactor: mgmt rm `generate_response/1` --- apps/emqx/src/emqx_listeners.erl | 31 +++--- apps/emqx_authn/src/emqx_authn_api.erl | 13 ++- apps/emqx_authz/src/emqx_authz_api_mnesia.erl | 50 +++++++--- .../src/emqx_gateway_api_clients.erl | 53 +++++----- .../src/emqx_mgmt_api_alarms.erl | 11 ++- .../src/emqx_mgmt_api_clients.erl | 29 +++--- .../src/emqx_mgmt_api_listeners.erl | 98 +++++++++++++++++-- .../src/emqx_mgmt_api_subscriptions.erl | 26 +++-- .../src/emqx_mgmt_api_topics.erl | 13 ++- apps/emqx_management/src/emqx_mgmt_util.erl | 17 ---- 10 files changed, 236 insertions(+), 105 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 02cb91900..faf6ba8ec 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -87,13 +87,21 @@ do_list_raw() -> Listeners = maps:to_list(RawWithDefault), lists:flatmap(fun format_raw_listeners/1, Listeners). -format_raw_listeners({Type, Conf}) -> +format_raw_listeners({Type0, Conf}) -> + Type = binary_to_atom(Type0), lists:map( fun({LName, LConf0}) when is_map(LConf0) -> - Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0), + Bind = parse_bind(LConf0), + Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), LConf1 = maps:remove(<<"authentication">>, LConf0), LConf2 = maps:put(<<"running">>, Running, LConf1), - {Type, LName, LConf2} + CurrConn = + case Running of + true -> current_conns(Type, LName, Bind); + false -> 0 + end, + LConf3 = maps:put(<<"current_connections">>, CurrConn, LConf2), + {Type0, LName, LConf3} end, maps:to_list(Conf) ). @@ -112,16 +120,7 @@ is_running(ListenerId) -> end. is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> - ListenOn = - case Conf of - #{bind := Bind} -> - Bind; - #{<<"bind">> := Bind} -> - case emqx_schema:to_ip_port(binary_to_list(Bind)) of - {ok, L} -> L; - {error, _} -> binary_to_integer(Bind) - end - end, + #{bind := ListenOn} = Conf, try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid) -> true @@ -545,3 +544,9 @@ str(B) when is_binary(B) -> binary_to_list(B); str(S) when is_list(S) -> S. + +parse_bind(#{<<"bind">> := Bind}) -> + case emqx_schema:to_ip_port(binary_to_list(Bind)) of + {ok, L} -> L; + {error, _} -> binary_to_integer(Bind) + end. diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 5c2cc8a6a..ed0117b96 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -1162,8 +1162,17 @@ delete_user(ChainName, AuthenticatorID, UserID) -> end. list_users(ChainName, AuthenticatorID, QueryString) -> - Response = emqx_authentication:list_users(ChainName, AuthenticatorID, QueryString), - emqx_mgmt_util:generate_response(Response). + case emqx_authentication:list_users(ChainName, AuthenticatorID, QueryString) of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Reason} -> + {400, #{ + code => <<"INVALID_PARAMETER">>, + message => list_to_binary(io_lib:format("Reason ~p", [Reason])) + }}; + Result -> + {200, Result} + end. update_config(Path, ConfigRequest) -> emqx_conf:update(Path, ConfigRequest, #{ diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index 2f06ccdbc..dcdcf6878 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -405,14 +405,23 @@ fields(meta) -> %%-------------------------------------------------------------------- users(get, #{query_string := QueryString}) -> - Response = emqx_mgmt_api:node_query( - node(), - QueryString, - ?ACL_TABLE, - ?ACL_USERNAME_QSCHEMA, - ?QUERY_USERNAME_FUN - ), - emqx_mgmt_util:generate_response(Response); + case + emqx_mgmt_api:node_query( + node(), + QueryString, + ?ACL_TABLE, + ?ACL_USERNAME_QSCHEMA, + ?QUERY_USERNAME_FUN + ) + of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} + end; users(post, #{body := Body}) when is_list(Body) -> lists:foreach( fun(#{<<"username">> := Username, <<"rules">> := Rules}) -> @@ -423,14 +432,23 @@ users(post, #{body := Body}) when is_list(Body) -> {204}. clients(get, #{query_string := QueryString}) -> - Response = emqx_mgmt_api:node_query( - node(), - QueryString, - ?ACL_TABLE, - ?ACL_CLIENTID_QSCHEMA, - ?QUERY_CLIENTID_FUN - ), - emqx_mgmt_util:generate_response(Response); + case + emqx_mgmt_api:node_query( + node(), + QueryString, + ?ACL_TABLE, + ?ACL_CLIENTID_QSCHEMA, + ?QUERY_CLIENTID_FUN + ) + of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} + end; clients(post, #{body := Body}) when is_list(Body) -> lists:foreach( fun(#{<<"clientid">> := ClientID, <<"rules">> := Rules}) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 0c5120818..6eccdc045 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -101,30 +101,39 @@ clients(get, #{ bindings := #{name := Name0}, query_string := QString }) -> - with_gateway(Name0, fun(GwName, _) -> + Fun = fun(GwName, _) -> TabName = emqx_gateway_cm:tabname(info, GwName), - case maps:get(<<"node">>, QString, undefined) of - undefined -> - Response = emqx_mgmt_api:cluster_query( - QString, - TabName, - ?CLIENT_QSCHEMA, - ?QUERY_FUN - ), - emqx_mgmt_util:generate_response(Response); - Node1 -> - Node = binary_to_atom(Node1, utf8), - QStringWithoutNode = maps:without([<<"node">>], QString), - Response = emqx_mgmt_api:node_query( - Node, - QStringWithoutNode, - TabName, - ?CLIENT_QSCHEMA, - ?QUERY_FUN - ), - emqx_mgmt_util:generate_response(Response) + Result = + case maps:get(<<"node">>, QString, undefined) of + undefined -> + emqx_mgmt_api:cluster_query( + QString, + TabName, + ?CLIENT_QSCHEMA, + ?QUERY_FUN + ); + Node0 -> + Node1 = binary_to_atom(Node0, utf8), + QStringWithoutNode = maps:without([<<"node">>], QString), + emqx_mgmt_api:node_query( + Node1, + QStringWithoutNode, + TabName, + ?CLIENT_QSCHEMA, + ?QUERY_FUN + ) + end, + case Result of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} end - end). + end, + with_gateway(Name0, Fun). clients_insta(get, #{ bindings := #{ diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index d6b2280de..f5685993a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -91,8 +91,15 @@ alarms(get, #{query_string := QString}) -> true -> ?ACTIVATED_ALARM; false -> ?DEACTIVATED_ALARM end, - Response = emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}), - emqx_mgmt_util:generate_response(Response); + case emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}) of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} + end; alarms(delete, _Params) -> _ = emqx_mgmt:delete_all_deactivated_alarms(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 36baf041f..7b6b1cf40 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -454,17 +454,24 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> %% api apply list_clients(QString) -> - case maps:get(<<"node">>, QString, undefined) of - undefined -> - Response = emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB, - ?CLIENT_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response); - Node1 -> - Node = binary_to_atom(Node1, utf8), - QStringWithoutNode = maps:without([<<"node">>], QString), - Response = emqx_mgmt_api:node_query(Node, QStringWithoutNode, - ?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response) + Result = case maps:get(<<"node">>, QString, undefined) of + undefined -> + emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB, + ?CLIENT_QSCHEMA, ?QUERY_FUN); + Node0 -> + Node1 = binary_to_atom(Node0, utf8), + QStringWithoutNode = maps:without([<<"node">>], QString), + emqx_mgmt_api:node_query(Node1, QStringWithoutNode, + ?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN) + end, + case Result of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} end. lookup(#{clientid := ClientID}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index a2dbce1aa..30b8d61bd 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -20,8 +20,11 @@ -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). -import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). +-define(LISTENER_TYPE, [quic, wss, ws, ssl, tcp]). +-define(LISTENER_STATUS, [enable, disable]). -export([ + listener_status/2, list_listeners/2, crud_listeners_by_id/2, list_listeners_on_node/2, @@ -55,6 +58,7 @@ api_spec() -> paths() -> [ + "/listener/status", "/listeners", "/listeners/:id", "/listeners/:id/:action", @@ -63,13 +67,29 @@ paths() -> "/nodes/:node/listeners/:id/:action" ]. + +schema("/listener/status") -> + #{ + 'operationId' => listener_status, + get => #{ + tags => [<<"listeners">>], + desc => <<"List all running node's listeners live status.">>, + %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + %% Current we only support all node's listeners is the same, + %% so we don't return the node information right now. + responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_status)))} + } + }; schema("/listeners") -> #{ 'operationId' => list_listeners, get => #{ tags => [<<"listeners">>], desc => <<"List all running node's listeners.">>, - responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + %% Current we only support all node's listeners is the same, + %% so we don't return the node information right now. + responses => #{200 => ?HOCON(?ARRAY(listener_schema()))} } }; schema("/listeners/:id") -> @@ -80,17 +100,29 @@ schema("/listeners/:id") -> desc => <<"List all running node's listeners for the specified id.">>, parameters => [?R_REF(listener_id)], responses => #{ - 200 => ?HOCON(?ARRAY(?R_REF(listeners))) + %% 200 => ?HOCON(?ARRAY(?R_REF(listeners))) + 200 => ?HOCON(listener_schema()) } }, put => #{ tags => [<<"listeners">>], - desc => <<"Create or update the specified listener on all nodes.">>, + desc => <<"Update the specified listener on all nodes.">>, parameters => [?R_REF(listener_id)], 'requestBody' => ?HOCON(listener_schema(), #{}), responses => #{ 200 => ?HOCON(listener_schema(), #{}), - 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) + 400 => error_codes(['BAD_REQUEST']), + 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) + } + }, + post => #{ + tags => [<<"listeners">>], + desc => <<"Create the specified listener on all nodes.">>, + parameters => [?R_REF(listener_id)], + 'requestBody' => ?HOCON(listener_schema(), #{}), + responses => #{ + 200 => ?HOCON(listener_schema(), #{}), + 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) } }, delete => #{ @@ -230,6 +262,23 @@ fields(node) -> in => path })} ]; +fields(listener_status) -> + [ + {type, ?HOCON(?ENUM(?LISTENER_TYPE), #{desc => "Listener type", required => true})}, + {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})}, + {number, ?HOCON(non_neg_integer(), #{desc => "Listener number", required => true})}, + {status, ?HOCON(?R_REF(status))}, + {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))} + ]; +fields(status) -> + [ + {max_connections, + ?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})}, + {current_connections, + ?HOCON(non_neg_integer(), #{desc => "Current connections"})} + ]; +fields(node_status) -> + fields(node) ++ fields(status); fields(Type) -> Listeners = listeners_info(), [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], @@ -244,6 +293,7 @@ listeners_info() -> fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> Fields0 = hocon_schema:fields(Mod, Field), Fields1 = lists:keydelete("authentication", 1, Fields0), + Fields2 = lists:keydelete("limiter", 1, Fields1), TypeAtom = list_to_existing_atom(Type), #{ ref => ?R_REF(TypeAtom), @@ -256,7 +306,7 @@ listeners_info() -> required => true, validator => fun validate_id/1 })} - | Fields1 + | Fields2 ] } end, @@ -270,6 +320,10 @@ validate_id(Id) -> end. %% api +listener_status(get, _Request) -> + + {200, []}. + list_listeners(get, _Request) -> {200, list_listeners()}. @@ -278,11 +332,35 @@ crud_listeners_by_id(get, #{bindings := #{id := Id}}) -> crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> case parse_listener_conf(Body0) of {Id, Type, Name, Conf} -> - case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of - {ok, #{raw_config := _RawConf}} -> - crud_listeners_by_id(get, #{bindings => #{id => Id}}); - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + Key = [listeners, Type, Name], + case emqx_conf:get(Key, undefined) of + undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; + _PrevConf -> + case emqx_conf:update(Key, Conf, ?OPTS(cluster)) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end + end; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + _ -> + {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} + end; +crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) -> + case parse_listener_conf(Body0) of + {Id, Type, Name, Conf} -> + Key = [listeners, Type, Name], + case emqx_conf:get(Key, undefined) of + undefined -> + case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end; + _ -> {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}} end; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 92c702043..cd766f956 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -113,15 +113,23 @@ parameters() -> ]. subscriptions(get, #{query_string := QString}) -> - case maps:get(<<"node">>, QString, undefined) of - undefined -> - Response = emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE, - ?SUBS_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response); - Node -> - Response = emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), QString, - ?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response) + Response = + case maps:get(<<"node">>, QString, undefined) of + undefined -> + emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE, + ?SUBS_QSCHEMA, ?QUERY_FUN); + Node0 -> + emqx_mgmt_api:node_query(binary_to_atom(Node0, utf8), QString, + ?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN) + end, + case Response of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} end. format(Items) when is_list(Items) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 249879c62..99533eed4 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -103,9 +103,16 @@ topic(get, #{bindings := Bindings}) -> %%%============================================================================================== %% api apply do_list(Params) -> - Response = emqx_mgmt_api:node_query( - node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query}), - emqx_mgmt_util:generate_response(Response). + case emqx_mgmt_api:node_query( + node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query}) of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} + end. lookup(#{topic := Topic}) -> case emqx_router:lookup_routes(Topic) of diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 7c7d2c316..173000b2a 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -43,9 +43,6 @@ , batch_schema/1 ]). --export([generate_response/1]). - - -export([urldecode/1]). -define(KB, 1024). @@ -262,17 +259,3 @@ bad_request() -> bad_request(<<"Bad Request">>). bad_request(Desc) -> object_schema(properties([{message, string}, {code, string}]), Desc). - -%%%============================================================================================== -%% Response util - -generate_response(QueryResult) -> - case QueryResult of - {error, page_limit_invalid} -> - {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; - {error, Node, {badrpc, R}} -> - Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), - {500, #{code => <<"NODE_DOWN">>, message => Message}}; - Response -> - {200, Response} - end. From 0de367dc634849c0f2c9501bb75b791a78711853 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 19 Apr 2022 10:42:08 +0800 Subject: [PATCH 2/2] feat: Make api_listener align with gateway and remove unused APIs. --- apps/emqx/src/emqx_listeners.erl | 8 +- .../src/emqx_dashboard_schema.erl | 2 +- .../src/emqx_mgmt_api_listeners.erl | 420 +++++++++--------- .../test/emqx_mgmt_api_listeners_SUITE.erl | 66 +-- 4 files changed, 229 insertions(+), 267 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index faf6ba8ec..a745f76fb 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -94,14 +94,15 @@ format_raw_listeners({Type0, Conf}) -> Bind = parse_bind(LConf0), Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), LConf1 = maps:remove(<<"authentication">>, LConf0), - LConf2 = maps:put(<<"running">>, Running, LConf1), + LConf2 = maps:remove(<<"limiter">>, LConf1), + LConf3 = maps:put(<<"running">>, Running, LConf2), CurrConn = case Running of true -> current_conns(Type, LName, Bind); false -> 0 end, - LConf3 = maps:put(<<"current_connections">>, CurrConn, LConf2), - {Type0, LName, LConf3} + LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3), + {Type0, LName, LConf4} end, maps:to_list(Conf) ). @@ -545,6 +546,7 @@ str(B) when is_binary(B) -> str(S) when is_list(S) -> S. +parse_bind(#{<<"bind">> := Bind}) when is_integer(Bind) -> Bind; parse_bind(#{<<"bind">> := Bind}) -> case emqx_schema:to_ip_port(binary_to_list(Bind)) of {ok, L} -> L; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index bbe198f46..193ebf2bc 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -195,7 +195,7 @@ cors(_) -> undefined. i18n_lang(type) -> ?ENUM([en, zh]); -i18n_lang(default) -> zh; +i18n_lang(default) -> en; i18n_lang('readOnly') -> true; i18n_lang(desc) -> "Internationalization language support."; i18n_lang(_) -> undefined. diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 30b8d61bd..9cdb26ed6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -20,17 +20,12 @@ -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). -import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). --define(LISTENER_TYPE, [quic, wss, ws, ssl, tcp]). --define(LISTENER_STATUS, [enable, disable]). -export([ - listener_status/2, + listener_type_status/2, list_listeners/2, crud_listeners_by_id/2, - list_listeners_on_node/2, - crud_listener_by_id_on_node/2, - action_listeners_by_id/2, - action_listeners_by_id_on_node/2 + action_listeners_by_id/2 ]). %% for rpc call @@ -58,26 +53,19 @@ api_spec() -> paths() -> [ - "/listener/status", + "/listeners_status", "/listeners", "/listeners/:id", - "/listeners/:id/:action", - "/nodes/:node/listeners", - "/nodes/:node/listeners/:id", - "/nodes/:node/listeners/:id/:action" + "/listeners/:id/:action" ]. - -schema("/listener/status") -> +schema("/listeners_status") -> #{ - 'operationId' => listener_status, + 'operationId' => listener_type_status, get => #{ tags => [<<"listeners">>], - desc => <<"List all running node's listeners live status.">>, - %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} - %% Current we only support all node's listeners is the same, - %% so we don't return the node information right now. - responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_status)))} + desc => <<"List all running node's listeners live status. group by listener type">>, + responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_type_status)))} } }; schema("/listeners") -> @@ -85,11 +73,15 @@ schema("/listeners") -> 'operationId' => list_listeners, get => #{ tags => [<<"listeners">>], - desc => <<"List all running node's listeners.">>, - %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} - %% Current we only support all node's listeners is the same, - %% so we don't return the node information right now. - responses => #{200 => ?HOCON(?ARRAY(listener_schema()))} + desc => <<"List all running node's listeners for the specified type.">>, + parameters => [ + {type, + ?HOCON( + ?ENUM(listeners_type()), + #{desc => "Listener type", in => query, required => false} + )} + ], + responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_id_status)))} } }; schema("/listeners/:id") -> @@ -100,17 +92,17 @@ schema("/listeners/:id") -> desc => <<"List all running node's listeners for the specified id.">>, parameters => [?R_REF(listener_id)], responses => #{ - %% 200 => ?HOCON(?ARRAY(?R_REF(listeners))) - 200 => ?HOCON(listener_schema()) + 200 => ?HOCON(listener_schema(#{bind => true})), + 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) } }, put => #{ tags => [<<"listeners">>], desc => <<"Update the specified listener on all nodes.">>, parameters => [?R_REF(listener_id)], - 'requestBody' => ?HOCON(listener_schema(), #{}), + 'requestBody' => ?HOCON(listener_schema(#{bind => false}), #{}), responses => #{ - 200 => ?HOCON(listener_schema(), #{}), + 200 => ?HOCON(listener_schema(#{bind => true}), #{}), 400 => error_codes(['BAD_REQUEST']), 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) } @@ -119,9 +111,9 @@ schema("/listeners/:id") -> tags => [<<"listeners">>], desc => <<"Create the specified listener on all nodes.">>, parameters => [?R_REF(listener_id)], - 'requestBody' => ?HOCON(listener_schema(), #{}), + 'requestBody' => ?HOCON(listener_schema(#{bind => true}), #{}), responses => #{ - 200 => ?HOCON(listener_schema(), #{}), + 200 => ?HOCON(listener_schema(#{bind => true}), #{}), 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) } }, @@ -150,90 +142,8 @@ schema("/listeners/:id/:action") -> 400 => error_codes(['BAD_REQUEST']) } } - }; -schema("/nodes/:node/listeners") -> - #{ - 'operationId' => list_listeners_on_node, - get => #{ - tags => [<<"listeners">>], - desc => <<"List all listeners on the specified node.">>, - parameters => [?R_REF(node)], - responses => #{ - 200 => ?HOCON(?ARRAY(listener_schema())), - 400 => error_codes(['BAD_NODE', 'BAD_REQUEST'], ?NODE_NOT_FOUND_OR_DOWN) - } - } - }; -schema("/nodes/:node/listeners/:id") -> - #{ - 'operationId' => crud_listener_by_id_on_node, - get => #{ - tags => [<<"listeners">>], - desc => <<"Get the specified listener on the specified node.">>, - parameters => [ - ?R_REF(listener_id), - ?R_REF(node) - ], - responses => #{ - 200 => ?HOCON(listener_schema()), - 400 => error_codes(['BAD_REQUEST']), - 404 => error_codes(['BAD_LISTEN_ID'], ?NODE_LISTENER_NOT_FOUND) - } - }, - put => #{ - tags => [<<"listeners">>], - desc => <<"Create or update the specified listener on the specified node.">>, - parameters => [ - ?R_REF(listener_id), - ?R_REF(node) - ], - 'requestBody' => ?HOCON(listener_schema()), - responses => #{ - 200 => ?HOCON(listener_schema()), - 400 => error_codes(['BAD_REQUEST']) - } - }, - delete => #{ - tags => [<<"listeners">>], - desc => <<"Delete the specified listener on the specified node.">>, - parameters => [ - ?R_REF(listener_id), - ?R_REF(node) - ], - responses => #{ - 204 => <<"Listener deleted">>, - 400 => error_codes(['BAD_REQUEST']) - } - } - }; -schema("/nodes/:node/listeners/:id/:action") -> - #{ - 'operationId' => action_listeners_by_id_on_node, - post => #{ - tags => [<<"listeners">>], - desc => <<"Start/stop/restart listeners on a specified node.">>, - parameters => [ - ?R_REF(node), - ?R_REF(listener_id), - ?R_REF(action) - ], - responses => #{ - 200 => <<"Updated">>, - 400 => error_codes(['BAD_REQUEST']) - } - } }. -fields(listeners) -> - [ - {"node", - ?HOCON(atom(), #{ - desc => "Node name", - example => "emqx@127.0.0.1", - required => true - })}, - {"listeners", ?ARRAY(listener_schema())} - ]; fields(listener_id) -> [ {id, @@ -262,41 +172,56 @@ fields(node) -> in => path })} ]; -fields(listener_status) -> +fields(listener_type_status) -> [ - {type, ?HOCON(?ENUM(?LISTENER_TYPE), #{desc => "Listener type", required => true})}, + {type, ?HOCON(?ENUM(listeners_type()), #{desc => "Listener type", required => true})}, {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})}, - {number, ?HOCON(non_neg_integer(), #{desc => "Listener number", required => true})}, + {ids, ?HOCON(?ARRAY(string()), #{desc => "Listener Ids", required => true})}, {status, ?HOCON(?R_REF(status))}, {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))} ]; +fields(listener_id_status) -> + fields(listener_id) ++ + [ + {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})}, + {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId number"})}, + {status, ?HOCON(?R_REF(status))}, + {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))} + ]; fields(status) -> [ {max_connections, ?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})}, - {current_connections, - ?HOCON(non_neg_integer(), #{desc => "Current connections"})} + {current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})} ]; fields(node_status) -> fields(node) ++ fields(status); fields(Type) -> - Listeners = listeners_info(), + Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}), [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], Schema. -listener_schema() -> - ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info())). +listener_schema(Opts) -> + ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))). -listeners_info() -> +listeners_type() -> + lists:map( + fun({Type, _}) -> list_to_existing_atom(Type) end, + hocon_schema:fields(emqx_schema, "listeners") + ). + +listeners_info(Opts) -> Listeners = hocon_schema:fields(emqx_schema, "listeners"), lists:map( fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> Fields0 = hocon_schema:fields(Mod, Field), Fields1 = lists:keydelete("authentication", 1, Fields0), Fields2 = lists:keydelete("limiter", 1, Fields1), + Fields3 = required_bind(Fields2, Opts), + Ref = listeners_ref(Type, Opts), TypeAtom = list_to_existing_atom(Type), #{ - ref => ?R_REF(TypeAtom), + ref => ?R_REF(Ref), schema => [ {type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})}, {running, ?HOCON(boolean(), #{desc => "Listener status", required => false})}, @@ -305,14 +230,33 @@ listeners_info() -> desc => "Listener id", required => true, validator => fun validate_id/1 - })} - | Fields2 + })}, + {current_connections, + ?HOCON( + non_neg_integer(), + #{desc => "Current connections", required => false} + )} + | Fields3 ] } end, Listeners ). +required_bind(Fields, #{bind := true}) -> + Fields; +required_bind(Fields, #{bind := false}) -> + {value, {_, Hocon}, Fields1} = lists:keytake("bind", 1, Fields), + [{"bind", Hocon#{required => false}} | Fields1]. + +listeners_ref(Type, #{bind := Bind}) -> + Suffix = + case Bind of + true -> "_required_bind"; + false -> "_not_required_bind" + end, + Type ++ Suffix. + validate_id(Id) -> case emqx_listeners:parse_listener_id(Id) of {error, Reason} -> {error, Reason}; @@ -320,23 +264,40 @@ validate_id(Id) -> end. %% api -listener_status(get, _Request) -> +listener_type_status(get, _Request) -> + Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})), + List = lists:map(fun({Type, L}) -> L#{type => Type} end, Listeners), + {200, List}. - {200, []}. +list_listeners(get, #{query_string := Query}) -> + Listeners = list_listeners(), + NodeL = + case maps:find(<<"type">>, Query) of + {ok, Type} -> listener_type_filter(atom_to_binary(Type), Listeners); + error -> Listeners + end, + {200, listener_status_by_id(NodeL)}. -list_listeners(get, _Request) -> - {200, list_listeners()}. - -crud_listeners_by_id(get, #{bindings := #{id := Id}}) -> - {200, list_listeners_by_id(Id)}; +crud_listeners_by_id(get, #{bindings := #{id := Id0}}) -> + Listeners = [ + Conf#{<<"id">> => Id, <<"type">> => Type} + || {Id, Type, Conf} <- emqx_listeners:list_raw(), + Id =:= Id0 + ], + case Listeners of + [] -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; + [L] -> {200, L} + end; crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> case parse_listener_conf(Body0) of {Id, Type, Name, Conf} -> Key = [listeners, Type, Name], - case emqx_conf:get(Key, undefined) of - undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; - _PrevConf -> - case emqx_conf:update(Key, Conf, ?OPTS(cluster)) of + case emqx_conf:get_raw(Key, undefined) of + undefined -> + {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; + PrevConf -> + MergeConf = emqx_map_lib:deep_merge(PrevConf, Conf), + case emqx_conf:update(Key, MergeConf, ?OPTS(cluster)) of {ok, #{raw_config := _RawConf}} -> crud_listeners_by_id(get, #{bindings => #{id => Id}}); {error, Reason} -> @@ -360,7 +321,8 @@ crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) -> {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; - _ -> {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}} + _ -> + {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}} end; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; @@ -376,64 +338,16 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> parse_listener_conf(Conf0) -> Conf1 = maps:remove(<<"running">>, Conf0), - {IdBin, Conf2} = maps:take(<<"id">>, Conf1), - {TypeBin, Conf3} = maps:take(<<"type">>, Conf2), + Conf2 = maps:remove(<<"current_connections">>, Conf1), + {IdBin, Conf3} = maps:take(<<"id">>, Conf2), + {TypeBin, Conf4} = maps:take(<<"type">>, Conf3), {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin), TypeAtom = binary_to_existing_atom(TypeBin), case Type =:= TypeAtom of - true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; + true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf4}; false -> {error, listener_type_inconsistent} end. -list_listeners_on_node(get, #{bindings := #{node := Node}}) -> - case list_listeners(Node) of - {error, nodedown} -> - {400, #{code => 'BAD_NODE', message => ?NODE_NOT_FOUND_OR_DOWN}}; - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; - #{<<"listeners">> := Listener} -> - {200, Listener} - end. - -crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) -> - case get_listener(Node, Id) of - {error, not_found} -> - {404, #{code => 'BAD_LISTEN_ID', message => ?NODE_LISTENER_NOT_FOUND}}; - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; - Listener -> - {200, Listener} - end; -crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) -> - case parse_listener_conf(Body) of - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; - {Id, Type, _Name, Conf} -> - case update_listener(Node, Id, Conf) of - {error, nodedown} -> - {400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}}; - %% TODO - {error, {eaddrinuse, _}} -> - {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; - {ok, Listener} -> - {200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}} - end; - _ -> - {400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}} - end; -crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) -> - case remove_listener(Node, Id) of - ok -> {204}; - {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} - end. - -action_listeners_by_id_on_node(post, - #{bindings := #{id := Id, action := Action, node := Node}}) -> - {_, Result} = action_listeners(Node, Id, Action), - Result. - action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) -> Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], case @@ -496,31 +410,49 @@ list_listeners() -> list_listeners(Node) -> wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). -list_listeners_by_id(Id) -> - listener_id_filter(Id, list_listeners()). - -get_listener(Node, Id) -> - case listener_id_filter(Id, [list_listeners(Node)]) of - [#{<<"listeners">> := []}] -> {error, not_found}; - [#{<<"listeners">> := [Listener]}] -> Listener - end. - -listener_id_filter(Id, Listeners) -> +listener_status_by_id(NodeL) -> + Listeners = maps:to_list(listener_status_by_id(NodeL, #{})), lists:map( - fun(Conf = #{<<"listeners">> := Listeners0}) -> - Conf#{ - <<"listeners">> => - [C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0] - } + fun({Id, L}) -> + L1 = maps:remove(ids, L), + #{node_status := Nodes} = L1, + L1#{number => maps:size(Nodes), id => Id} end, Listeners ). -update_listener(Node, Id, Config) -> - wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). +listener_status_by_type([], Acc) -> + Acc; +listener_status_by_type([NodeL | Rest], Acc) -> + #{<<"node">> := Node, <<"listeners">> := Listeners} = NodeL, + Acc1 = lists:foldl( + fun(L, Acc0) -> format_status(<<"type">>, Node, L, Acc0) end, + Acc, + Listeners + ), + listener_status_by_type(Rest, Acc1). -remove_listener(Node, Id) -> - wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)). +listener_status_by_id([], Acc) -> + Acc; +listener_status_by_id([NodeL | Rest], Acc) -> + #{<<"node">> := Node, <<"listeners">> := Listeners} = NodeL, + Acc1 = lists:foldl( + fun(L, Acc0) -> format_status(<<"id">>, Node, L, Acc0) end, + Acc, + Listeners + ), + listener_status_by_id(Rest, Acc1). + +listener_type_filter(Type0, Listeners) -> + lists:map( + fun(Conf = #{<<"listeners">> := Listeners0}) -> + Conf#{ + <<"listeners">> => + [C || C = #{<<"type">> := Type} <- Listeners0, Type =:= Type0] + } + end, + Listeners + ). -spec do_list_listeners() -> map(). do_list_listeners() -> @@ -554,3 +486,75 @@ wrap_rpc({badrpc, Reason}) -> {error, Reason}; wrap_rpc(Res) -> Res. + +format_status(Key, Node, Listener, Acc) -> + #{ + <<"id">> := Id, + <<"running">> := Running, + <<"max_connections">> := MaxConnections, + <<"current_connections">> := CurrentConnections + } = Listener, + GroupKey = maps:get(Key, Listener), + case maps:find(GroupKey, Acc) of + error -> + Acc#{ + GroupKey => #{ + enable => Running, + ids => [Id], + status => #{ + max_connections => MaxConnections, + current_connections => CurrentConnections + }, + node_status => #{ + Node => #{ + max_connections => MaxConnections, + current_connections => CurrentConnections + } + } + } + }; + {ok, GroupValue} -> + #{ + ids := Ids, + status := #{ + max_connections := MaxConnections0, + current_connections := CurrentConnections0 + }, + node_status := NodeStatus0 + } = GroupValue, + NodeStatus = + case maps:find(Node, NodeStatus0) of + error -> + #{ + Node => #{ + max_connections => MaxConnections, + current_connections => CurrentConnections + } + }; + {ok, #{ + max_connections := PrevMax, + current_connections := PrevCurr + }} -> + NodeStatus0#{ + Node => #{ + max_connections => max_conn(MaxConnections, PrevMax), + current_connections => CurrentConnections + PrevCurr + } + } + end, + Acc#{ + GroupKey => + GroupValue#{ + ids => lists:usort([Id | Ids]), + status => #{ + max_connections => max_conn(MaxConnections0, MaxConnections), + current_connections => CurrentConnections0 + CurrentConnections + }, + node_status => NodeStatus + } + } + end. + +max_conn(_Int1, infinity) -> infinity; +max_conn(infinity, _Int) -> infinity; +max_conn(Int1, Int2) -> Int1 + Int2. diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index a146f48ef..e0c2586ec 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -35,8 +35,8 @@ end_per_suite(_) -> t_list_listeners(_) -> Path = emqx_mgmt_api_test_util:api_path(["listeners"]), Res = request(get, Path, [], []), - Expect = emqx_mgmt_api_listeners:do_list_listeners(), - ?assertEqual(emqx_json:encode([Expect]), emqx_json:encode(Res)), + #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(), + ?assertEqual(length(Expect), length(Res)), ok. t_crud_listeners_by_id(_) -> @@ -44,19 +44,18 @@ t_crud_listeners_by_id(_) -> NewListenerId = <<"tcp:new">>, TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), - [#{<<"listeners">> := [TcpListener], <<"node">> := Node}] = request(get, TcpPath, [], []), - ?assertEqual(atom_to_binary(node()), Node), + TcpListener = request(get, TcpPath, [], []), %% create ?assertEqual({error, not_found}, is_running(NewListenerId)), - ?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), + ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), NewConf = TcpListener#{ <<"id">> => NewListenerId, <<"bind">> => <<"0.0.0.0:2883">> }, - [#{<<"listeners">> := [Create]}] = request(put, NewPath, [], NewConf), + Create = request(post, NewPath, [], NewConf), ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), - [#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []), + Get1 = request(get, NewPath, [], []), ?assertMatch(Create, Get1), ?assert(is_running(NewListenerId)), @@ -67,64 +66,21 @@ t_crud_listeners_by_id(_) -> <<"id">> => BadId, <<"bind">> => <<"0.0.0.0:2883">> }, - ?assertEqual({error, {"HTTP/1.1", 400, "Bad Request"}}, request(put, BadPath, [], BadConf)), + ?assertMatch({error, {"HTTP/1.1", 400, _}}, request(post, BadPath, [], BadConf)), %% update #{<<"acceptors">> := Acceptors} = Create, Acceptors1 = Acceptors + 10, - [#{<<"listeners">> := [Update]}] = + Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}), ?assertMatch(#{<<"acceptors">> := Acceptors1}, Update), - [#{<<"listeners">> := [Get2]}] = request(get, NewPath, [], []), - ?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2), - - %% delete - ?assertEqual([], delete(NewPath)), - ?assertEqual({error, not_found}, is_running(NewListenerId)), - ?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), - ?assertEqual([], delete(NewPath)), - ok. - -t_list_listeners_on_node(_) -> - Node = atom_to_list(node()), - Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners"]), - Listeners = request(get, Path, [], []), - #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(), - ?assertEqual(emqx_json:encode(Expect), emqx_json:encode(Listeners)), - ok. - -t_crud_listener_by_id_on_node(_) -> - TcpListenerId = <<"tcp:default">>, - NewListenerId = <<"tcp:new1">>, - Node = atom_to_list(node()), - TcpPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", TcpListenerId]), - NewPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", NewListenerId]), - TcpListener = request(get, TcpPath, [], []), - - %% create - ?assertEqual({error, not_found}, is_running(NewListenerId)), - ?assertMatch({error,{"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])), - Create = request(put, NewPath, [], TcpListener#{ - <<"id">> => NewListenerId, - <<"bind">> => <<"0.0.0.0:3883">> - }), - ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), - Get1 = request(get, NewPath, [], []), - ?assertMatch(Create, Get1), - ?assert(is_running(NewListenerId)), - - %% update - #{<<"acceptors">> := Acceptors} = Create, - Acceptors1 = Acceptors + 10, - Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}), - ?assertMatch(#{<<"acceptors">> := Acceptors1}, Update), Get2 = request(get, NewPath, [], []), ?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2), %% delete ?assertEqual([], delete(NewPath)), ?assertEqual({error, not_found}, is_running(NewListenerId)), - ?assertMatch({error, {"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])), + ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), ?assertEqual([], delete(NewPath)), ok. @@ -139,8 +95,8 @@ action_listener(ID, Action, Running) -> {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path), timer:sleep(500), GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]), - [#{<<"listeners">> := Listeners}] = request(get, GetPath, [], []), - [listener_stats(Listener, Running) || Listener <- Listeners]. + Listener = request(get, GetPath, [], []), + listener_stats(Listener, Running). request(Method, Url, QueryParams, Body) -> AuthHeader = emqx_mgmt_api_test_util:auth_header_(),