diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 79703bd4a..ad6d7c9ac 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -38,7 +38,7 @@ -type ip_port() :: tuple(). -type cipher() :: map(). -type rfc3339_system_time() :: integer(). --type qos():: integer(). +-type qos() :: integer(). -typerefl_from_string({qos/0, emqx_schema, to_qos}). -typerefl_from_string({duration/0, emqx_schema, to_duration}). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 16728bc36..952d430cb 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -389,7 +389,8 @@ subscribe(ClientId, TopicTables) -> subscribe([Node | Nodes], ClientId, TopicTables) -> case wrap_rpc(emqx_management_proto_v1:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); - Re -> Re + {subscribe, Res} -> + {subscribe, Res, Node} end; subscribe([], _ClientId, _TopicTables) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 5c9747cbf..5bd3f5681 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -18,6 +18,7 @@ -behaviour(minirest_api). +-include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -25,7 +26,10 @@ -include("emqx_mgmt.hrl"). %% API --export([api_spec/0]). +-export([ api_spec/0 + , paths/0 + , schema/1 + , fields/1]). -export([ clients/2 , client/2 @@ -67,419 +71,322 @@ <<"{\"code\": \"RESOURCE_NOT_FOUND\", \"reason\": \"Client id not found\"}">>). api_spec() -> - {apis(), schemas()}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -apis() -> - [ clients_api() - , client_api() - , clients_authz_cache_api() - , clients_subscriptions_api() - , subscribe_api() - , unsubscribe_api() - , keepalive_api() +paths() -> + [ "/clients" + , "/clients/:clientid" + , "/clients/:clientid/authz_cache" + , "/clients/:clientid/subscriptions" + , "/clients/:clientid/subscribe" + , "/clients/:clientid/unsubscribe" + , "/clients/:clientid/keepalive" ]. -schemas() -> - Client = #{ - client => #{ - type => object, - properties => emqx_mgmt_util:properties(properties(client)) - } - }, - AuthzCache = #{ - authz_cache => #{ - type => object, - properties => emqx_mgmt_util:properties(properties(authz_cache)) - } - }, - [Client, AuthzCache]. - -properties(client) -> - [ - {awaiting_rel_cnt, integer, - <<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>}, - {awaiting_rel_max, integer, - <<"v4 api name [max_awaiting_rel]. Maximum allowed number of awaiting PUBREC packet">>}, - {clean_start, boolean, - <<"Indicate whether the client is using a brand new session">>}, - {clientid, string , - <<"Client identifier">>}, - {connected, boolean, - <<"Whether the client is connected">>}, - {connected_at, string , - <<"Client connection time, rfc3339">>}, - {created_at, string , - <<"Session creation time, rfc3339">>}, - {disconnected_at, string , - <<"Client offline time. It's Only valid and returned when connected is false, rfc3339">>}, - {expiry_interval, integer, - <<"Session expiration interval, with the unit of second">>}, - {heap_size, integer, - <<"Process heap size with the unit of byte">>}, - {inflight_cnt, integer, - <<"Current length of inflight">>}, - {inflight_max, integer, - <<"v4 api name [max_inflight]. Maximum length of inflight">>}, - {ip_address, string , - <<"Client's IP address">>}, - {port, integer, - <<"Client's port">>}, - {is_bridge, boolean, - <<"Indicates whether the client is connectedvia bridge">>}, - {keepalive, integer, - <<"keepalive time, with the unit of second">>}, - {mailbox_len, integer, - <<"Process mailbox size">>}, - {mqueue_dropped, integer, - <<"Number of messages dropped by the message queue due to exceeding the length">>}, - {mqueue_len, integer, - <<"Current length of message queue">>}, - {mqueue_max, integer, - <<"v4 api name [max_mqueue]. Maximum length of message queue">>}, - {node, string , - <<"Name of the node to which the client is connected">>}, - {proto_name, string , - <<"Client protocol name">>}, - {proto_ver, integer, - <<"Protocol version used by the client">>}, - {recv_cnt, integer, - <<"Number of TCP packets received">>}, - {recv_msg, integer, - <<"Number of PUBLISH packets received">>}, - {'recv_msg.qos0', integer, - <<"Number of PUBLISH QoS0 packets received">>}, - {'recv_msg.qos1', integer, - <<"Number of PUBLISH QoS1 packets received">>}, - {'recv_msg.qos2', integer, - <<"Number of PUBLISH QoS2 packets received">>}, - {'recv_msg.dropped', integer, - <<"Number of dropped PUBLISH messages">>}, - {'recv_msg.dropped.await_pubrel_timeout', integer, - <<"Number of dropped PUBLISH messages due to waiting PUBREL timeout">>}, - {recv_oct, integer, - <<"Number of bytes received by EMQ X Broker (the same below)">>}, - {recv_pkt, integer, - <<"Number of MQTT packets received">>}, - {reductions, integer, - <<"Erlang reduction">>}, - {send_cnt, integer, - <<"Number of TCP packets sent">>}, - {send_msg, integer, - <<"Number of PUBLISH messages sent">>}, - {'send_msg.qos0', integer, - <<"Number of PUBLISH QoS0 messages sent">>}, - {'send_msg.qos1', integer, - <<"Number of PUBLISH QoS1 messages sent">>}, - {'send_msg.qos2', integer, - <<"Number of PUBLISH QoS2 messages sent">>}, - {'send_msg.dropped', integer, - <<"Number of dropped PUBLISH messages">>}, - {'send_msg.dropped.expired', integer, - <<"Number of dropped PUBLISH messages due to expired">>}, - {'send_msg.dropped.queue_full', integer, - <<"Number of dropped PUBLISH messages due to queue full">>}, - {'send_msg.dropped.too_large', integer, - <<"Number of dropped PUBLISH messages due to packet length too large">>}, - {send_oct, integer, - <<"Number of bytes sent">>}, - {send_pkt, integer, - <<"Number of MQTT packets sent">>}, - {subscriptions_cnt, integer, - <<"Number of subscriptions established by this client.">>}, - {subscriptions_max, integer, - <<"v4 api name [max_subscriptions] Maximum number of subscriptions allowed by this client">>}, - {username, string , - <<"User name of client when connecting">>}, - {will_msg, string , - <<"Client will message">>}, - {zone, string , - <<"Indicate the configuration group used by the client">>} - ]; -properties(authz_cache) -> - [ - {access, string, <<"Access type">>}, - {result, string, <<"Allow or deny">>}, - {topic, string, <<"Topic name">>}, - {updated_time, integer, <<"Update time">>} - ]. - -clients_api() -> - Metadata = #{ +schema("/clients") -> + #{ + 'operationId' => clients, get => #{ description => <<"List clients">>, parameters => [ - #{ - name => page, + hoconsc:ref(emqx_dashboard_swagger, page), + hoconsc:ref(emqx_dashboard_swagger, limit), + {node, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"Page">>, - schema => #{type => integer} - }, - #{ - name => limit, + nullable => true, + desc => <<"Node name">>, + example => atom_to_list(node())})}, + {username, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"Page limit">>, - schema => #{type => integer} - }, - #{ - name => node, + nullable => true, + desc => <<"User name">>})}, + {zone, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"Node name">>, - schema => #{type => string} - }, - #{ - name => username, + nullable => true})}, + {ip_address, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"User name">>, - schema => #{type => string} - }, - #{ - name => zone, + nullable => true, + desc => <<"Client's IP address">>, + example => <<"127.0.0.1">>})}, + {conn_state, hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{ in => query, - required => false, - schema => #{type => string} - }, - #{ - name => ip_address, + nullable => true, + desc => <<"The current connection status of the client, ", + "the possible values are connected,idle,disconnected">>})}, + {clean_start, hoconsc:mk(boolean(), #{ in => query, - required => false, - description => <<"Client's IP address">>, - schema => #{type => string} - }, - #{ - name => conn_state, + nullable => true, + description => <<"Whether the client uses a new session">>})}, + {proto_name, hoconsc:mk(hoconsc:enum(['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']), #{ in => query, - required => false, - description => - <<"The current connection status of the client, ", - "the possible values are connected,idle,disconnected">>, - schema => #{type => string, enum => [connected, idle, disconnected]} - }, - #{ - name => clean_start, + nullable => true, + description => <<"Client protocol name, ", + "the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>})}, + {proto_ver, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"Whether the client uses a new session">>, - schema => #{type => boolean} - }, - #{ - name => proto_name, + nullable => true, + desc => <<"Client protocol version">>})}, + {like_clientid, hoconsc:mk(binary(), #{ in => query, - required => false, - description => - <<"Client protocol name, ", - "the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>, - schema => #{type => string, enum => ['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']} - }, - #{ - name => proto_ver, + nullable => true, + desc => <<"Fuzzy search of client identifier by substring method">>})}, + {like_username, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"Client protocol version">>, - schema => #{type => string} - }, - #{ - name => like_clientid, + nullable => true, + desc => <<"Client user name, fuzzy search by substring">>})}, + {gte_created_at, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"Fuzzy search of client identifier by substring method">>, - schema => #{type => string} - }, - #{ - name => like_username, + nullable => true, + desc => <<"Search client session creation time by greater", + " than or equal method, rfc3339 or timestamp(millisecond)">>})}, + {lte_created_at, hoconsc:mk(binary(), #{ in => query, - required => false, - description => <<"Client user name, fuzzy search by substring">>, - schema => #{type => string} - }, - #{ - name => gte_created_at, + nullable => true, + desc => <<"Search client session creation time by less", + " than or equal method, rfc3339 or timestamp(millisecond)">>})}, + {gte_connected_at, hoconsc:mk(binary(), #{ in => query, - required => false, - description => - <<"Search client session creation time by greater than or equal method, " - "rfc3339 or timestamp(millisecond)">>, - schema => #{type => string} - }, - #{ - name => lte_created_at, + nullable => true, + desc => <<"Search client connection creation time by greater" + " than or equal method, rfc3339 or timestamp(millisecond)">>})}, + {lte_connected_at, hoconsc:mk(binary(), #{ in => query, - required => false, - description => - <<"Search client session creation time by less than or equal method, ", - "rfc3339 or timestamp(millisecond)">>, - schema => #{type => string} - }, - #{ - name => gte_connected_at, - in => query, - required => false, - description => - <<"Search client connection creation time by greater than or equal method, ", - "rfc3339 or timestamp(millisecond)">>, - schema => #{type => string} - }, - #{ - name => lte_connected_at, - in => query, - required => false, - description => - <<"Search client connection creation time by less than or equal method, ", - "rfc3339 or timestamp(millisecond) ">>, - schema => #{type => string} - } + nullable => true, + desc => <<"Search client connection creation time by less" + " than or equal method, rfc3339 or timestamp(millisecond)">>})} ], responses => #{ - <<"200">> => emqx_mgmt_util:array_schema(client, <<"List clients 200 OK">>), - <<"400">> => emqx_mgmt_util:error_schema( <<"Invalid parameters">> - , ['INVALID_PARAMETER'])}}}, - {"/clients", Metadata, clients}. + 200 => [ + {data, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, client)), #{})}, + {meta, hoconsc:mk(hoconsc:ref(?MODULE, meta), #{})} + ], + 400 => + emqx_dashboard_swagger:error_codes( + ['INVALID_PARAMETER'], <<"Invalid parameters">>)} + } + }; -client_api() -> - Metadata = #{ +schema("/clients/:clientid") -> + #{ + 'operationId' => client, get => #{ description => <<"Get clients info by client ID">>, - parameters => [#{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }], + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ - <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}}, + 200 => hoconsc:mk(hoconsc:ref(?MODULE, client), #{}), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>)}}, delete => #{ description => <<"Kick out client by client ID">>, - parameters => [#{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }], + parameters => [ + {clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ - <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"204">> => emqx_mgmt_util:schema(<<"Kick out client successfully">>)}}}, - {"/clients/:clientid", Metadata, client}. + 204 => <<"Kick out client successfully">>, + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>) + } + } + }; -clients_authz_cache_api() -> - Metadata = #{ +schema("/clients/:clientid/authz_cache") -> + #{ + 'operationId' => authz_cache, get => #{ description => <<"Get client authz cache">>, - parameters => [#{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }], + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ - <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:schema(authz_cache, <<"Get client authz cache">>)}}, + 200 => hoconsc:mk(hoconsc:ref(?MODULE, authz_cache), #{}), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>) + } + }, delete => #{ description => <<"Clean client authz cache">>, - parameters => [#{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }], + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ - <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"204">> => emqx_mgmt_util:schema(<<"Clean client authz cache successfully">>)}}}, - {"/clients/:clientid/authz_cache", Metadata, authz_cache}. + 204 => <<"Kick out client successfully">>, + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>) + } + } + }; -clients_subscriptions_api() -> - Metadata = #{ +schema("/clients/:clientid/subscriptions") -> + #{ + 'operationId' => subscriptions, get => #{ description => <<"Get client subscriptions">>, - parameters => [#{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }], + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ - <<"200">> => - emqx_mgmt_util:array_schema(subscription, <<"Get client subscriptions">>)}} - }, - {"/clients/:clientid/subscriptions", Metadata, subscriptions}. + 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), #{}), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>) + } + } + }; -unsubscribe_api() -> - Metadata = #{ - post => #{ - description => <<"Unsubscribe">>, - parameters => [ - #{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - } - ], - 'requestBody' => emqx_mgmt_util:schema(#{ - type => object, - properties => #{ - topic => #{ - type => string, - description => <<"Topic">>}}}), - responses => #{ - <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:schema(<<"Unsubscribe ok">>)}}}, - {"/clients/:clientid/unsubscribe", Metadata, unsubscribe}. -subscribe_api() -> - Metadata = #{ +schema("/clients/:clientid/subscribe") -> + #{ + 'operationId' => subscribe, post => #{ description => <<"Subscribe">>, - parameters => [#{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }], - 'requestBody' => emqx_mgmt_util:schema(#{ - type => object, - properties => #{ - topic => #{ - type => string, - description => <<"Topic">>}, - qos => #{ - type => integer, - enum => [0, 1, 2], - example => 0, - description => <<"QoS">>}}}), + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], + 'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, subscribe)), responses => #{ - <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:schema(<<"Subscribe ok">>)}}}, - {"/clients/:clientid/subscribe", Metadata, subscribe}. + 200 => hoconsc:ref(emqx_mgmt_api_subscriptions, subscription), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>) + } + } + }; -keepalive_api() -> - Metadata = #{ - put => #{ - description => <<"set the online client keepalive by second ">>, - parameters => [#{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }, - #{ - name => interval, - in => query, - schema => #{type => integer}, - required => true - } - ], +schema("/clients/:clientid/unsubscribe") -> + #{ + 'operationId' => unsubscribe, + post => #{ + description => <<"Unsubscribe">>, + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], + 'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, unsubscribe)), responses => #{ - <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"400">> => emqx_mgmt_util:error_schema(<<"">>, ['PARAMS_ERROR']), - <<"200">> => emqx_mgmt_util:schema(<<"ok">>)}}}, - {"/clients/:clientid/keepalive", Metadata, set_keepalive}. + 204 => <<"Unsubscribe OK">>, + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>) + } + } + }; + +schema("/clients/:clientid/keepalive") -> + #{ + 'operationId' => set_keepalive, + put => #{ + description => <<"Set the online client keepalive by seconds">>, + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], + 'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, keepalive)), + responses => #{ + 200 => hoconsc:mk(hoconsc:ref(?MODULE, client), #{}), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">>) + } + } + }. + +fields(client) -> + [ + {awaiting_rel_cnt, hoconsc:mk(integer(), #{desc => + <<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>})}, + {awaiting_rel_max, hoconsc:mk(integer(), #{desc => + <<"v4 api name [max_awaiting_rel]. " + "Maximum allowed number of awaiting PUBREC packet">>})}, + {clean_start, hoconsc:mk(boolean(), #{desc => + <<"Indicate whether the client is using a brand new session">>})}, + {clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})}, + {connected, hoconsc:mk(boolean(), #{desc => <<"Whether the client is connected">>})}, + {connected_at, hoconsc:mk(binary(), #{desc => <<"Client connection time, rfc3339">>})}, + {created_at, hoconsc:mk(binary(), #{desc => <<"Session creation time, rfc3339">>})}, + {disconnected_at, hoconsc:mk(binary(), #{desc => + <<"Client offline time." + " It's Only valid and returned when connected is false, rfc3339">>})}, + {expiry_interval, hoconsc:mk(integer(), #{desc => + <<"Session expiration interval, with the unit of second">>})}, + {heap_size, hoconsc:mk(integer(), #{desc => + <<"Process heap size with the unit of byte">>})}, + {inflight_cnt, hoconsc:mk(integer(), #{desc => <<"Current length of inflight">>})}, + {inflight_max, hoconsc:mk(integer(), #{desc => + <<"v4 api name [max_inflight]. Maximum length of inflight">>})}, + {ip_address, hoconsc:mk(binary(), #{desc => <<"Client's IP address">>})}, + {is_bridge, hoconsc:mk(boolean(), #{desc => + <<"Indicates whether the client is connectedvia bridge">>})}, + {keepalive, hoconsc:mk(integer(), #{desc => + <<"keepalive time, with the unit of second">>})}, + {mailbox_len, hoconsc:mk(integer(), #{desc => <<"Process mailbox size">>})}, + {mqueue_dropped, hoconsc:mk(integer(), #{desc => + <<"Number of messages dropped by the message queue due to exceeding the length">>})}, + {mqueue_len, hoconsc:mk(integer(), #{desc => <<"Current length of message queue">>})}, + {mqueue_max, hoconsc:mk(integer(), #{desc => + <<"v4 api name [max_mqueue]. Maximum length of message queue">>})}, + {node, hoconsc:mk(binary(), #{desc => + <<"Name of the node to which the client is connected">>})}, + {port, hoconsc:mk(integer(), #{desc => <<"Client's port">>})}, + {proto_name, hoconsc:mk(binary(), #{desc => <<"Client protocol name">>})}, + {proto_ver, hoconsc:mk(integer(), #{desc => <<"Protocol version used by the client">>})}, + {recv_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets received">>})}, + {recv_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets received">>})}, + {'recv_msg.dropped', hoconsc:mk(integer(), #{desc => + <<"Number of dropped PUBLISH packets">>})}, + {'recv_msg.dropped.await_pubrel_timeout', hoconsc:mk(integer(), #{desc => + <<"Number of dropped PUBLISH packets due to expired">>})}, + {'recv_msg.qos0', hoconsc:mk(integer(), #{desc => + <<"Number of PUBLISH QoS0 packets received">>})}, + {'recv_msg.qos1', hoconsc:mk(integer(), #{desc => + <<"Number of PUBLISH QoS1 packets received">>})}, + {'recv_msg.qos2', hoconsc:mk(integer(), #{desc => + <<"Number of PUBLISH QoS2 packets received">>})}, + {recv_oct, hoconsc:mk(integer(), #{desc => + <<"Number of bytes received by EMQ X Broker (the same below)">>})}, + {recv_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets received">>})}, + {reductions, hoconsc:mk(integer(), #{desc => <<"Erlang reduction">>})}, + {send_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets sent">>})}, + {send_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets sent">>})}, + {'send_msg.dropped', hoconsc:mk(integer(), #{desc => + <<"Number of dropped PUBLISH packets">>})}, + {'send_msg.dropped.expired', hoconsc:mk(integer(), #{desc => + <<"Number of dropped PUBLISH packets due to expired">>})}, + {'send_msg.dropped.queue_full', hoconsc:mk(integer(), #{desc => + <<"Number of dropped PUBLISH packets due to queue full">>})}, + {'send_msg.dropped.too_large', hoconsc:mk(integer(), #{desc => + <<"Number of dropped PUBLISH packets due to packet length too large">>})}, + {'send_msg.qos0', hoconsc:mk(integer(), #{desc => + <<"Number of PUBLISH QoS0 packets sent">>})}, + {'send_msg.qos1', hoconsc:mk(integer(), #{desc => + <<"Number of PUBLISH QoS1 packets sent">>})}, + {'send_msg.qos2', hoconsc:mk(integer(), #{desc => + <<"Number of PUBLISH QoS2 packets sent">>})}, + {send_oct, hoconsc:mk(integer(), #{desc => <<"Number of bytes sent">>})}, + {send_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets sent">>})}, + {subscriptions_cnt, hoconsc:mk(integer(), #{desc => + <<"Number of subscriptions established by this client.">>})}, + {subscriptions_max, hoconsc:mk(integer(), #{desc => + <<"v4 api name [max_subscriptions]", + " Maximum number of subscriptions allowed by this client">>})}, + {username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})}, + {will_msg, hoconsc:mk(binary(), #{desc => <<"Client will message">>})}, + {zone, hoconsc:mk(binary(), #{desc => + <<"Indicate the configuration group used by the client">>})} + ]; + +fields(authz_cache) -> + [ + {access, hoconsc:mk(binary(), #{desc => <<"Access type">>})}, + {result, hoconsc:mk(binary(), #{desc => <<"Allow or deny">>})}, + {topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})}, + {updated_time, hoconsc:mk(integer(), #{desc => <<"Update time">>})} + ]; + +fields(keepalive) -> + [ + {interval, hoconsc:mk(integer(), #{desc => <<"Keepalive time, with the unit of second">>})} + ]; + +fields(subscribe) -> + [ + {topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})}, + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})} + ]; + +fields(unsubscribe) -> + [ + {topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})} + ]; + +fields(meta) -> + emqx_dashboard_swagger:fields(page) ++ + emqx_dashboard_swagger:fields(limit) ++ + [{count, hoconsc:mk(integer(), #{example => 1})}]. + %%%============================================================================================== %% parameters trans clients(get, #{query_string := Qs}) -> - list(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())). + list_clients(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())). client(get, #{bindings := Bindings}) -> lookup(Bindings); @@ -529,13 +436,12 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) -> {200, lists:map(Formatter, Subs)} end. -set_keepalive(put, #{bindings := #{clientid := ClientID}, query_string := Query}) -> - case maps:find(<<"interval">>, Query) of - error -> {404, "Interval Not Found"}; - {ok, Interval0} -> - Interval = binary_to_integer(Interval0), +set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> + case maps:find(<<"interval">>, Body) of + error -> {400, 'BAD_REQUEST',"Interval Not Found"}; + {ok, Interval} -> case emqx_mgmt:set_keepalive(emqx_mgmt_util:urldecode(ClientID), Interval) of - ok -> {200}; + ok -> lookup(#{clientid => ClientID}); {error, not_found} ->{404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> {400, #{code => 'PARAMS_ERROR', message => Reason}} end @@ -544,7 +450,7 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, query_string := Query} %%%============================================================================================== %% api apply -list(Params) -> +list_clients(Params) -> {Tab, QuerySchema} = ?CLIENT_QS_SCHEMA, case maps:get(<<"node">>, Params, undefined) of undefined -> @@ -590,7 +496,7 @@ get_authz_cache(#{clientid := ClientID})-> clean_authz_cache(#{clientid := ClientID}) -> case emqx_mgmt:clean_authz_cache(ClientID) of ok -> - {200}; + {204}; {error, not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> @@ -605,8 +511,15 @@ subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; - ok -> - {200} + {ok, Node} -> + Response = + #{ + clientid => ClientID, + topic => Topic, + qos => Qos, + node => Node + }, + {200, Response} end. unsubscribe(#{clientid := ClientID, topic := Topic}) -> @@ -614,7 +527,7 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) -> {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {unsubscribe, [{Topic, #{}}]} -> - {200} + {204} end. subscribe_batch(#{clientid := ClientID, topics := Topics}) -> @@ -630,10 +543,10 @@ do_subscribe(ClientID, Topic0, Qos) -> case emqx_mgmt:subscribe(ClientID, TopicTable) of {error, Reason} -> {error, Reason}; - {subscribe, Subscriptions} -> + {subscribe, Subscriptions, Node} -> case proplists:is_defined(Topic, Subscriptions) of true -> - ok; + {ok, Node}; false -> {error, unknow_error} end diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 6296e0cfb..40e3948a7 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -18,15 +18,14 @@ -behaviour(minirest_api). +-include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). --import(emqx_mgmt_util, [ page_schema/1 - , error_schema/2 - , properties/1 - , page_params/0 - ]). - --export([api_spec/0]). +-export([ api_spec/0 + , paths/0 + , schema/1 + , fields/1]). -export([subscriptions/2]). @@ -46,73 +45,73 @@ -define(format_fun, {?MODULE, format}). api_spec() -> - {subscriptions_api(), subscription_schema()}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -subscriptions_api() -> - MetaData = #{ +paths() -> + ["/subscriptions"]. + +schema("/subscriptions") -> + #{ + 'operationId' => subscriptions, get => #{ description => <<"List subscriptions">>, parameters => parameters(), responses => #{ - <<"200">> => page_schema(subscription), - <<"400">> => error_schema(<<"Invalid parameters">>, ['INVALID_PARAMETER']) - } - } - }, - [{"/subscriptions", MetaData, subscriptions}]. + 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscription)), #{})}} + }. -subscription_schema() -> - Props = properties([ - {node, string}, - {topic, string}, - {clientid, string}, - {qos, integer, <<>>, [0,1,2]}]), - [#{subscription => #{type => object, properties => Props}}]. +fields(subscription) -> + [ + {node, hoconsc:mk(binary(), #{desc => <<"Access type">>})}, + {topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})}, + {clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})}, + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})} + ]. parameters() -> [ - #{ - name => clientid, + hoconsc:ref(emqx_dashboard_swagger, page), + hoconsc:ref(emqx_dashboard_swagger, limit), + { + node, hoconsc:mk(binary(), #{ in => query, - description => <<"Client ID">>, - schema => #{type => string} + nullable => true, + desc => <<"Node name">>, + example => atom_to_list(node())}) }, - #{ - name => node, + { + clientid, hoconsc:mk(binary(), #{ in => query, - description => <<"Node name">>, - schema => #{type => string} + nullable => true, + desc => <<"Client ID">>}) }, - #{ - name => qos, + { + qos, hoconsc:mk(emqx_schema:qos(), #{ in => query, - description => <<"QoS">>, - schema => #{type => integer, enum => [0, 1, 2]} + nullable => true, + desc => <<"QoS">>}) }, - #{ - name => share_group, + { + topic, hoconsc:mk(binary(), #{ in => query, - description => <<"Shared subscription group name">>, - schema => #{type => string} + nullable => true, + desc => <<"Topic, url encoding">>}) }, - #{ - name => topic, + { + match_topic, hoconsc:mk(binary(), #{ in => query, - description => <<"Topic, url encoding">>, - schema => #{type => string} + nullable => true, + desc => <<"Match topic string, url encoding">>}) + }, + { + share_group, hoconsc:mk(binary(), #{ + in => query, + nullable => true, + desc => <<"Shared subscription group name">>}) } - #{ - name => match_topic, - in => query, - description => <<"Match topic string, url encoding">>, - schema => #{type => string} - } | page_params() ]. subscriptions(get, #{query_string := Params}) -> - list(Params). - -list(Params) -> {Tab, QuerySchema} = ?SUBS_QS_SCHEMA, case maps:get(<<"node">>, Params, undefined) of undefined -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 3282ea8c4..148ffdc87 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -96,8 +96,9 @@ t_clients(_) -> %% post /clients/:clientid/unsubscribe UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "unsubscribe"]), + UnSubscribeBody = #{topic => Topic}, {ok, _} = emqx_mgmt_api_test_util:request_api(post, UnSubscribePath, - "", AuthHeader, SubscribeBody), + "", AuthHeader, UnSubscribeBody), timer:sleep(100), ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)), @@ -165,15 +166,15 @@ t_query_clients_with_time(_) -> t_keepalive(_Config) -> Username = "user_keepalive", ClientId = "client_keepalive", - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "keepalive"]), - Query = "interval=11", + Body = #{interval => 11}, {error,{"HTTP/1.1",404,"Not Found"}} = - emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>), + emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), {ok, C1} = emqtt:start_link(#{username => Username, clientid => ClientId}), {ok, _} = emqtt:connect(C1), - {ok, Ok} = emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>), - ?assertEqual("", Ok), + {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), + #{<<"keepalive">> := 11} = emqx_json:decode(NewClient, [return_maps]), [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), #{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid), ?assertEqual(11, Keepalive), diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index 0c54216c2..72611d5cd 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -31,10 +31,8 @@ , stats/2 ]). --define(API_TAG_PROMETHEUS, [<<"premetheus">>]). -define(SCHEMA_MODULE, emqx_prometheus_schema). - api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -47,7 +45,6 @@ schema("/prometheus") -> #{ 'operationId' => prometheus , get => #{ description => <<"Get Prometheus config info">> - , tags => ?API_TAG_PROMETHEUS , responses => #{200 => prometheus_config_schema()} }