diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 02cb91900..faf6ba8ec 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -87,13 +87,21 @@ do_list_raw() -> Listeners = maps:to_list(RawWithDefault), lists:flatmap(fun format_raw_listeners/1, Listeners). -format_raw_listeners({Type, Conf}) -> +format_raw_listeners({Type0, Conf}) -> + Type = binary_to_atom(Type0), lists:map( fun({LName, LConf0}) when is_map(LConf0) -> - Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0), + Bind = parse_bind(LConf0), + Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), LConf1 = maps:remove(<<"authentication">>, LConf0), LConf2 = maps:put(<<"running">>, Running, LConf1), - {Type, LName, LConf2} + CurrConn = + case Running of + true -> current_conns(Type, LName, Bind); + false -> 0 + end, + LConf3 = maps:put(<<"current_connections">>, CurrConn, LConf2), + {Type0, LName, LConf3} end, maps:to_list(Conf) ). @@ -112,16 +120,7 @@ is_running(ListenerId) -> end. is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> - ListenOn = - case Conf of - #{bind := Bind} -> - Bind; - #{<<"bind">> := Bind} -> - case emqx_schema:to_ip_port(binary_to_list(Bind)) of - {ok, L} -> L; - {error, _} -> binary_to_integer(Bind) - end - end, + #{bind := ListenOn} = Conf, try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid) -> true @@ -545,3 +544,9 @@ str(B) when is_binary(B) -> binary_to_list(B); str(S) when is_list(S) -> S. + +parse_bind(#{<<"bind">> := Bind}) -> + case emqx_schema:to_ip_port(binary_to_list(Bind)) of + {ok, L} -> L; + {error, _} -> binary_to_integer(Bind) + end. diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 5c2cc8a6a..ed0117b96 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -1162,8 +1162,17 @@ delete_user(ChainName, AuthenticatorID, UserID) -> end. list_users(ChainName, AuthenticatorID, QueryString) -> - Response = emqx_authentication:list_users(ChainName, AuthenticatorID, QueryString), - emqx_mgmt_util:generate_response(Response). + case emqx_authentication:list_users(ChainName, AuthenticatorID, QueryString) of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Reason} -> + {400, #{ + code => <<"INVALID_PARAMETER">>, + message => list_to_binary(io_lib:format("Reason ~p", [Reason])) + }}; + Result -> + {200, Result} + end. update_config(Path, ConfigRequest) -> emqx_conf:update(Path, ConfigRequest, #{ diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index 2f06ccdbc..dcdcf6878 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -405,14 +405,23 @@ fields(meta) -> %%-------------------------------------------------------------------- users(get, #{query_string := QueryString}) -> - Response = emqx_mgmt_api:node_query( - node(), - QueryString, - ?ACL_TABLE, - ?ACL_USERNAME_QSCHEMA, - ?QUERY_USERNAME_FUN - ), - emqx_mgmt_util:generate_response(Response); + case + emqx_mgmt_api:node_query( + node(), + QueryString, + ?ACL_TABLE, + ?ACL_USERNAME_QSCHEMA, + ?QUERY_USERNAME_FUN + ) + of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} + end; users(post, #{body := Body}) when is_list(Body) -> lists:foreach( fun(#{<<"username">> := Username, <<"rules">> := Rules}) -> @@ -423,14 +432,23 @@ users(post, #{body := Body}) when is_list(Body) -> {204}. clients(get, #{query_string := QueryString}) -> - Response = emqx_mgmt_api:node_query( - node(), - QueryString, - ?ACL_TABLE, - ?ACL_CLIENTID_QSCHEMA, - ?QUERY_CLIENTID_FUN - ), - emqx_mgmt_util:generate_response(Response); + case + emqx_mgmt_api:node_query( + node(), + QueryString, + ?ACL_TABLE, + ?ACL_CLIENTID_QSCHEMA, + ?QUERY_CLIENTID_FUN + ) + of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} + end; clients(post, #{body := Body}) when is_list(Body) -> lists:foreach( fun(#{<<"clientid">> := ClientID, <<"rules">> := Rules}) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 0c5120818..6eccdc045 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -101,30 +101,39 @@ clients(get, #{ bindings := #{name := Name0}, query_string := QString }) -> - with_gateway(Name0, fun(GwName, _) -> + Fun = fun(GwName, _) -> TabName = emqx_gateway_cm:tabname(info, GwName), - case maps:get(<<"node">>, QString, undefined) of - undefined -> - Response = emqx_mgmt_api:cluster_query( - QString, - TabName, - ?CLIENT_QSCHEMA, - ?QUERY_FUN - ), - emqx_mgmt_util:generate_response(Response); - Node1 -> - Node = binary_to_atom(Node1, utf8), - QStringWithoutNode = maps:without([<<"node">>], QString), - Response = emqx_mgmt_api:node_query( - Node, - QStringWithoutNode, - TabName, - ?CLIENT_QSCHEMA, - ?QUERY_FUN - ), - emqx_mgmt_util:generate_response(Response) + Result = + case maps:get(<<"node">>, QString, undefined) of + undefined -> + emqx_mgmt_api:cluster_query( + QString, + TabName, + ?CLIENT_QSCHEMA, + ?QUERY_FUN + ); + Node0 -> + Node1 = binary_to_atom(Node0, utf8), + QStringWithoutNode = maps:without([<<"node">>], QString), + emqx_mgmt_api:node_query( + Node1, + QStringWithoutNode, + TabName, + ?CLIENT_QSCHEMA, + ?QUERY_FUN + ) + end, + case Result of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} end - end). + end, + with_gateway(Name0, Fun). clients_insta(get, #{ bindings := #{ diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index d6b2280de..f5685993a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -91,8 +91,15 @@ alarms(get, #{query_string := QString}) -> true -> ?ACTIVATED_ALARM; false -> ?DEACTIVATED_ALARM end, - Response = emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}), - emqx_mgmt_util:generate_response(Response); + case emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}) of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} + end; alarms(delete, _Params) -> _ = emqx_mgmt:delete_all_deactivated_alarms(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 36baf041f..7b6b1cf40 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -454,17 +454,24 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> %% api apply list_clients(QString) -> - case maps:get(<<"node">>, QString, undefined) of - undefined -> - Response = emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB, - ?CLIENT_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response); - Node1 -> - Node = binary_to_atom(Node1, utf8), - QStringWithoutNode = maps:without([<<"node">>], QString), - Response = emqx_mgmt_api:node_query(Node, QStringWithoutNode, - ?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response) + Result = case maps:get(<<"node">>, QString, undefined) of + undefined -> + emqx_mgmt_api:cluster_query(QString, ?CLIENT_QTAB, + ?CLIENT_QSCHEMA, ?QUERY_FUN); + Node0 -> + Node1 = binary_to_atom(Node0, utf8), + QStringWithoutNode = maps:without([<<"node">>], QString), + emqx_mgmt_api:node_query(Node1, QStringWithoutNode, + ?CLIENT_QTAB, ?CLIENT_QSCHEMA, ?QUERY_FUN) + end, + case Result of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} end. lookup(#{clientid := ClientID}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index a2dbce1aa..30b8d61bd 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -20,8 +20,11 @@ -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). -import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). +-define(LISTENER_TYPE, [quic, wss, ws, ssl, tcp]). +-define(LISTENER_STATUS, [enable, disable]). -export([ + listener_status/2, list_listeners/2, crud_listeners_by_id/2, list_listeners_on_node/2, @@ -55,6 +58,7 @@ api_spec() -> paths() -> [ + "/listener/status", "/listeners", "/listeners/:id", "/listeners/:id/:action", @@ -63,13 +67,29 @@ paths() -> "/nodes/:node/listeners/:id/:action" ]. + +schema("/listener/status") -> + #{ + 'operationId' => listener_status, + get => #{ + tags => [<<"listeners">>], + desc => <<"List all running node's listeners live status.">>, + %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + %% Current we only support all node's listeners is the same, + %% so we don't return the node information right now. + responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_status)))} + } + }; schema("/listeners") -> #{ 'operationId' => list_listeners, get => #{ tags => [<<"listeners">>], desc => <<"List all running node's listeners.">>, - responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + %% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} + %% Current we only support all node's listeners is the same, + %% so we don't return the node information right now. + responses => #{200 => ?HOCON(?ARRAY(listener_schema()))} } }; schema("/listeners/:id") -> @@ -80,17 +100,29 @@ schema("/listeners/:id") -> desc => <<"List all running node's listeners for the specified id.">>, parameters => [?R_REF(listener_id)], responses => #{ - 200 => ?HOCON(?ARRAY(?R_REF(listeners))) + %% 200 => ?HOCON(?ARRAY(?R_REF(listeners))) + 200 => ?HOCON(listener_schema()) } }, put => #{ tags => [<<"listeners">>], - desc => <<"Create or update the specified listener on all nodes.">>, + desc => <<"Update the specified listener on all nodes.">>, parameters => [?R_REF(listener_id)], 'requestBody' => ?HOCON(listener_schema(), #{}), responses => #{ 200 => ?HOCON(listener_schema(), #{}), - 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) + 400 => error_codes(['BAD_REQUEST']), + 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) + } + }, + post => #{ + tags => [<<"listeners">>], + desc => <<"Create the specified listener on all nodes.">>, + parameters => [?R_REF(listener_id)], + 'requestBody' => ?HOCON(listener_schema(), #{}), + responses => #{ + 200 => ?HOCON(listener_schema(), #{}), + 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) } }, delete => #{ @@ -230,6 +262,23 @@ fields(node) -> in => path })} ]; +fields(listener_status) -> + [ + {type, ?HOCON(?ENUM(?LISTENER_TYPE), #{desc => "Listener type", required => true})}, + {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})}, + {number, ?HOCON(non_neg_integer(), #{desc => "Listener number", required => true})}, + {status, ?HOCON(?R_REF(status))}, + {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))} + ]; +fields(status) -> + [ + {max_connections, + ?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})}, + {current_connections, + ?HOCON(non_neg_integer(), #{desc => "Current connections"})} + ]; +fields(node_status) -> + fields(node) ++ fields(status); fields(Type) -> Listeners = listeners_info(), [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], @@ -244,6 +293,7 @@ listeners_info() -> fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> Fields0 = hocon_schema:fields(Mod, Field), Fields1 = lists:keydelete("authentication", 1, Fields0), + Fields2 = lists:keydelete("limiter", 1, Fields1), TypeAtom = list_to_existing_atom(Type), #{ ref => ?R_REF(TypeAtom), @@ -256,7 +306,7 @@ listeners_info() -> required => true, validator => fun validate_id/1 })} - | Fields1 + | Fields2 ] } end, @@ -270,6 +320,10 @@ validate_id(Id) -> end. %% api +listener_status(get, _Request) -> + + {200, []}. + list_listeners(get, _Request) -> {200, list_listeners()}. @@ -278,11 +332,35 @@ crud_listeners_by_id(get, #{bindings := #{id := Id}}) -> crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> case parse_listener_conf(Body0) of {Id, Type, Name, Conf} -> - case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of - {ok, #{raw_config := _RawConf}} -> - crud_listeners_by_id(get, #{bindings => #{id => Id}}); - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + Key = [listeners, Type, Name], + case emqx_conf:get(Key, undefined) of + undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; + _PrevConf -> + case emqx_conf:update(Key, Conf, ?OPTS(cluster)) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end + end; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + _ -> + {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} + end; +crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) -> + case parse_listener_conf(Body0) of + {Id, Type, Name, Conf} -> + Key = [listeners, Type, Name], + case emqx_conf:get(Key, undefined) of + undefined -> + case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end; + _ -> {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}} end; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 92c702043..cd766f956 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -113,15 +113,23 @@ parameters() -> ]. subscriptions(get, #{query_string := QString}) -> - case maps:get(<<"node">>, QString, undefined) of - undefined -> - Response = emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE, - ?SUBS_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response); - Node -> - Response = emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), QString, - ?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN), - emqx_mgmt_util:generate_response(Response) + Response = + case maps:get(<<"node">>, QString, undefined) of + undefined -> + emqx_mgmt_api:cluster_query(QString, ?SUBS_QTABLE, + ?SUBS_QSCHEMA, ?QUERY_FUN); + Node0 -> + emqx_mgmt_api:node_query(binary_to_atom(Node0, utf8), QString, + ?SUBS_QTABLE, ?SUBS_QSCHEMA, ?QUERY_FUN) + end, + case Response of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} end. format(Items) when is_list(Items) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 249879c62..99533eed4 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -103,9 +103,16 @@ topic(get, #{bindings := Bindings}) -> %%%============================================================================================== %% api apply do_list(Params) -> - Response = emqx_mgmt_api:node_query( - node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query}), - emqx_mgmt_util:generate_response(Response). + case emqx_mgmt_api:node_query( + node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query}) of + {error, page_limit_invalid} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Response -> + {200, Response} + end. lookup(#{topic := Topic}) -> case emqx_router:lookup_routes(Topic) of diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 7c7d2c316..173000b2a 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -43,9 +43,6 @@ , batch_schema/1 ]). --export([generate_response/1]). - - -export([urldecode/1]). -define(KB, 1024). @@ -262,17 +259,3 @@ bad_request() -> bad_request(<<"Bad Request">>). bad_request(Desc) -> object_schema(properties([{message, string}, {code, string}]), Desc). - -%%%============================================================================================== -%% Response util - -generate_response(QueryResult) -> - case QueryResult of - {error, page_limit_invalid} -> - {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; - {error, Node, {badrpc, R}} -> - Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), - {500, #{code => <<"NODE_DOWN">>, message => Message}}; - Response -> - {200, Response} - end.