diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 8ae113723..20f1aa108 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -43,6 +43,8 @@ , lookup_client/3 , kickout_client/1 , list_authz_cache/1 + , list_client_subscriptions/1 + , client_subscriptions/2 , clean_authz_cache/1 , clean_authz_cache/2 , clean_authz_cache_all/0 @@ -269,6 +271,23 @@ kickout_client(Node, ClientId) -> list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). +list_client_subscriptions(ClientId) -> + Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], + Expected = lists:filter(fun({error, _}) -> false; + ([]) -> false; + (_) -> true + end, Results), + case Expected of + [] -> []; + [Result|_] -> Result + end. + +client_subscriptions(Node, ClientId) when Node =:= node() -> + emqx_broker:subscriptions(ClientId); + +client_subscriptions(Node, ClientId) -> + rpc_call(Node, client_subscriptions, [Node, ClientId]). + clean_authz_cache(ClientId) -> Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], case lists:any(fun(Item) -> Item =:= ok end, Results) of diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 1f308181d..b99d6c6ef 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -29,6 +29,7 @@ -export([ clients/2 , client/2 + , subscriptions/2 , authz_cache/2 , subscribe/2 , subscribe_batch/2]). @@ -68,6 +69,7 @@ apis() -> [ clients_api() , client_api() , clients_authz_cache_api() + , clients_subscriptions_api() , subscribe_api()]. schemas() -> @@ -209,7 +211,17 @@ schemas() -> } } }, - [Client, AuthzCache]. + Subscription = #{ + subscription => #{ + type => object, + properties => #{ + topic => #{ + type => string}, + qos => #{ + type => integer, + enum => [0,1,2]}}} + }, + [Client, AuthzCache, Subscription]. clients_api() -> Metadata = #{ @@ -271,6 +283,21 @@ clients_authz_cache_api() -> <<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients 200 OK">>)}}}, {"/clients/:clientid/authz_cache", Metadata, authz_cache}. +clients_subscriptions_api() -> + Metadata = #{ + get => #{ + description => <<"Get client subscriptions">>, + parameters => [#{ + name => clientid, + in => path, + schema => #{type => string}, + required => true + }], + responses => #{ + <<"200">> => emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}} + }, + {"/clients/:clientid/subscriptions", Metadata, subscriptions}. + subscribe_api() -> Metadata = #{ post => #{ @@ -363,6 +390,14 @@ subscribe_batch(post, Request) -> end || TopicInfo <- TopicInfos], subscribe_batch(#{clientid => ClientID, topics => Topics}). +subscriptions(get, Request) -> + ClientID = cowboy_req:binding(clientid, Request), + Subs0 = emqx_mgmt:list_client_subscriptions(ClientID), + Subs = lists:map(fun({Topic, SubOpts}) -> + #{topic => Topic, qos => maps:get(qos, SubOpts)} + end, Subs0), + {200, Subs}. + %%%============================================================================================== %% api apply diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 9c9b952cd..6f7d7c5f0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -26,269 +26,306 @@ api_spec() -> {[metrics_api()], [metrics_schema()]}. metrics_schema() -> - #{ - metrics => #{ + Metric = #{ + type => object, + properties => properties() + }, + Metrics = #{ + type => array, + items => #{ type => object, - properties => #{ - 'actions.failure' => #{ - type => integer, - description => <<"Number of failure executions of the rule engine action">>}, - 'actions.success' => #{ - type => integer, - description => <<"Number of successful executions of the rule engine action">>}, - 'bytes.received' => #{ - type => integer, - description => <<"Number of bytes received by EMQ X Broker">>}, - 'bytes.sent' => #{ - type => integer, - description => <<"Number of bytes sent by EMQ X Broker on this connection">>}, - 'client.authenticate' => #{ - type => integer, - description => <<"Number of client authentications">>}, - 'client.auth.anonymous' => #{ - type => integer, - description => <<"Number of clients who log in anonymously">>}, - 'client.connect' => #{ - type => integer, - description => <<"Number of client connections">>}, - 'client.connack' => #{ - type => integer, - description => <<"Number of CONNACK packet sent">>}, - 'client.connected' => #{ - type => integer, - description => <<"Number of successful client connections">>}, - 'client.disconnected' => #{ - type => integer, - description => <<"Number of client disconnects">>}, - 'client.check_authz' => #{ - type => integer, - description => <<"Number of Authorization rule checks">>}, - 'client.subscribe' => #{ - type => integer, - description => <<"Number of client subscriptions">>}, - 'client.unsubscribe' => #{ - type => integer, - description => <<"Number of client unsubscriptions">>}, - 'delivery.dropped.too_large' => #{ - type => integer, - description => <<"The number of messages that were dropped because the length exceeded the limit when sending">>}, - 'delivery.dropped.queue_full' => #{ - type => integer, - description => <<"Number of messages with a non-zero QoS that were dropped because the message queue was full when sending">>}, - 'delivery.dropped.qos0_msg' => #{ - type => integer, - description => <<"Number of messages with QoS 0 that were dropped because the message queue was full when sending">>}, - 'delivery.dropped.expired' => #{ - type => integer, - description => <<"Number of messages dropped due to message expiration on sending">>}, - 'delivery.dropped.no_local' => #{ - type => integer, - description => <<"Number of messages that were dropped due to the No Local subscription option when sending">>}, - 'delivery.dropped' => #{ - type => integer, - description => <<"Total number of discarded messages when sending">>}, - 'messages.delayed' => #{ - type => integer, - description => <<"Number of delay- published messages stored by EMQ X Broker">>}, - 'messages.delivered' => #{ - type => integer, - description => <<"Number of messages forwarded to the subscription process internally by EMQ X Broker">>}, - 'messages.dropped' => #{ - type => integer, - description => <<"Total number of messages dropped by EMQ X Broker before forwarding to the subscription process">>}, - 'messages.dropped.expired' => #{ - type => integer, - description => <<"Number of messages dropped due to message expiration when receiving">>}, - 'messages.dropped.no_subscribers' => #{ - type => integer, - description => <<"Number of messages dropped due to no subscribers">>}, - 'messages.forward' => #{ - type => integer, - description => <<"Number of messages forwarded to other nodes">>}, - 'messages.publish' => #{ - type => integer, - description => <<"Number of messages published in addition to system messages">>}, - 'messages.qos0.received' => #{ - type => integer, - description => <<"Number of QoS 0 messages received from clients">>}, - 'messages.qos1.received' => #{ - type => integer, - description => <<"Number of QoS 1 messages received from clients">>}, - 'messages.qos2.received' => #{ - type => integer, - description => <<"Number of QoS 2 messages received from clients">>}, - 'messages.qos0.sent' => #{ - type => integer, - description => <<"Number of QoS 0 messages sent to clients">>}, - 'messages.qos1.sent' => #{ - type => integer, - description => <<"Number of QoS 1 messages sent to clients">>}, - 'messages.qos2.sent' => #{ - type => integer, - description => <<"Number of QoS 2 messages sent to clients">>}, - 'messages.received' => #{ - type => integer, - description => <<"Number of messages received from the client, equal to the sum of messages.qos0.received,messages.qos1.received and messages.qos2.received">>}, - 'messages.sent' => #{ - type => integer, - description => <<"Number of messages sent to the client, equal to the sum of messages.qos0.sent,messages.qos1.sent and messages.qos2.sent">>}, - 'messages.retained' => #{ - type => integer, - description => <<"Number of retained messages stored by EMQ X Broker">>}, - 'messages.acked' => #{ - type => integer, - description => <<"Number of received PUBACK and PUBREC packet">>}, - 'packets.received' => #{ - type => integer, - description => <<"Number of received packet">>}, - 'packets.sent' => #{ - type => integer, - description => <<"Number of sent packet">>}, - 'packets.connect.received' => #{ - type => integer, - description => <<"Number of received CONNECT packet">>}, - 'packets.connack.auth_error' => #{ - type => integer, - description => <<"Number of received CONNECT packet with failed authentication">>}, - 'packets.connack.error' => #{ - type => integer, - description => <<"Number of received CONNECT packet with unsuccessful connections">>}, - 'packets.connack.sent' => #{ - type => integer, - description => <<"Number of sent CONNACK packet">>}, - 'packets.publish.received' => #{ - type => integer, - description => <<"Number of received PUBLISH packet">>}, - 'packets.publish.sent' => #{ - type => integer, - description => <<"Number of sent PUBLISH packet">>}, - 'packets.publish.inuse' => #{ - type => integer, - description => <<"Number of received PUBLISH packet with occupied identifiers">>}, - 'packets.publish.auth_error' => #{ - type => integer, - description => <<"Number of received PUBLISH packets with failed the Authorization check">>}, - 'packets.publish.error' => #{ - type => integer, - description => <<"Number of received PUBLISH packet that cannot be published">>}, - 'packets.publish.dropped' => #{ - type => integer, - description => <<"Number of messages discarded due to the receiving limit">>}, - 'packets.puback.received' => #{ - type => integer, - description => <<"Number of received PUBACK packet">>}, - 'packets.puback.sent' => #{ - type => integer, - description => <<"Number of sent PUBACK packet">>}, - 'packets.puback.inuse' => #{ - type => integer, - description => <<"Number of received PUBACK packet with occupied identifiers">>}, - 'packets.puback.missed' => #{ - type => integer, - description => <<"Number of received packet with identifiers.">>}, - 'packets.pubrec.received' => #{ - type => integer, - description => <<"Number of received PUBREC packet">>}, - 'packets.pubrec.sent' => #{ - type => integer, - description => <<"Number of sent PUBREC packet">>}, - 'packets.pubrec.inuse' => #{ - type => integer, - description => <<"Number of received PUBREC packet with occupied identifiers">>}, - 'packets.pubrec.missed' => #{ - type => integer, - description => <<"Number of received PUBREC packet with unknown identifiers">>}, - 'packets.pubrel.received' => #{ - type => integer, - description => <<"Number of received PUBREL packet">>}, - 'packets.pubrel.sent' => #{ - type => integer, - description => <<"Number of sent PUBREL packet">>}, - 'packets.pubrel.missed' => #{ - type => integer, - description => <<"Number of received PUBREC packet with unknown identifiers">>}, - 'packets.pubcomp.received' => #{ - type => integer, - description => <<"Number of received PUBCOMP packet">>}, - 'packets.pubcomp.sent' => #{ - type => integer, - description => <<"Number of sent PUBCOMP packet">>}, - 'packets.pubcomp.inuse' => #{ - type => integer, - description => <<"Number of received PUBCOMP packet with occupied identifiers">>}, - 'packets.pubcomp.missed' => #{ - type => integer, - description => <<"Number of missed PUBCOMP packet">>}, - 'packets.subscribe.received' => #{ - type => integer, - description => <<"Number of received SUBSCRIBE packet">>}, - 'packets.subscribe.error' => #{ - type => integer, - description => <<"Number of received SUBSCRIBE packet with failed subscriptions">>}, - 'packets.subscribe.auth_error' => #{ - type => integer, - description => <<"Number of received SUBACK packet with failed Authorization check">>}, - 'packets.suback.sent' => #{ - type => integer, - description => <<"Number of sent SUBACK packet">>}, - 'packets.unsubscribe.received' => #{ - type => integer, - description => <<"Number of received UNSUBSCRIBE packet">>}, - 'packets.unsubscribe.error' => #{ - type => integer, - description => <<"Number of received UNSUBSCRIBE packet with failed unsubscriptions">>}, - 'packets.unsuback.sent' => #{ - type => integer, - description => <<"Number of sent UNSUBACK packet">>}, - 'packets.pingreq.received' => #{ - type => integer, - description => <<"Number of received PINGREQ packet">>}, - 'packets.pingresp.sent' => #{ - type => integer, - description => <<"Number of sent PUBRESP packet">>}, - 'packets.disconnect.received' => #{ - type => integer, - description => <<"Number of received DISCONNECT packet">>}, - 'packets.disconnect.sent' => #{ - type => integer, - description => <<"Number of sent DISCONNECT packet">>}, - 'packets.auth.received' => #{ - type => integer, - description => <<"Number of received AUTH packet">>}, - 'packets.auth.sent' => #{ - type => integer, - description => <<"Number of sent AUTH packet">>}, - 'rules.matched' => #{ - type => integer, - description => <<"Number of rule matched">>}, - 'session.created' => #{ - type => integer, - description => <<"Number of sessions created">>}, - 'session.discarded' => #{ - type => integer, - description => <<"Number of sessions dropped because Clean Session or Clean Start is true">>}, - 'session.resumed' => #{ - type => integer, - description => <<"Number of sessions resumed because Clean Session or Clean Start is false">>}, - 'session.takeovered' => #{ - type => integer, - description => <<"Number of sessions takeovered because Clean Session or Clean Start is false">>}, - 'session.terminated' => #{ - type => integer, - description => <<"Number of terminated sessions">>} - } + properties => properties() } + }, + MetricsInfo = #{ + oneOf => [ minirest:ref(<<"metric">>) + , minirest:ref(<<"metrics">>) + ] + }, + #{metric => Metric, metrics => Metrics, metrics_info => MetricsInfo}. + +properties() -> + #{ + 'actions.failure' => #{ + type => integer, + description => <<"Number of failure executions of the rule engine action">>}, + 'actions.success' => #{ + type => integer, + description => <<"Number of successful executions of the rule engine action">>}, + 'bytes.received' => #{ + type => integer, + description => <<"Number of bytes received by EMQ X Broker">>}, + 'bytes.sent' => #{ + type => integer, + description => <<"Number of bytes sent by EMQ X Broker on this connection">>}, + 'client.authenticate' => #{ + type => integer, + description => <<"Number of client authentications">>}, + 'client.auth.anonymous' => #{ + type => integer, + description => <<"Number of clients who log in anonymously">>}, + 'client.connect' => #{ + type => integer, + description => <<"Number of client connections">>}, + 'client.connack' => #{ + type => integer, + description => <<"Number of CONNACK packet sent">>}, + 'client.connected' => #{ + type => integer, + description => <<"Number of successful client connections">>}, + 'client.disconnected' => #{ + type => integer, + description => <<"Number of client disconnects">>}, + 'client.check_authz' => #{ + type => integer, + description => <<"Number of Authorization rule checks">>}, + 'client.subscribe' => #{ + type => integer, + description => <<"Number of client subscriptions">>}, + 'client.unsubscribe' => #{ + type => integer, + description => <<"Number of client unsubscriptions">>}, + 'delivery.dropped.too_large' => #{ + type => integer, + description => <<"The number of messages that were dropped because the length exceeded the limit when sending">>}, + 'delivery.dropped.queue_full' => #{ + type => integer, + description => <<"Number of messages with a non-zero QoS that were dropped because the message queue was full when sending">>}, + 'delivery.dropped.qos0_msg' => #{ + type => integer, + description => <<"Number of messages with QoS 0 that were dropped because the message queue was full when sending">>}, + 'delivery.dropped.expired' => #{ + type => integer, + description => <<"Number of messages dropped due to message expiration on sending">>}, + 'delivery.dropped.no_local' => #{ + type => integer, + description => <<"Number of messages that were dropped due to the No Local subscription option when sending">>}, + 'delivery.dropped' => #{ + type => integer, + description => <<"Total number of discarded messages when sending">>}, + 'messages.delayed' => #{ + type => integer, + description => <<"Number of delay- published messages stored by EMQ X Broker">>}, + 'messages.delivered' => #{ + type => integer, + description => <<"Number of messages forwarded to the subscription process internally by EMQ X Broker">>}, + 'messages.dropped' => #{ + type => integer, + description => <<"Total number of messages dropped by EMQ X Broker before forwarding to the subscription process">>}, + 'messages.dropped.expired' => #{ + type => integer, + description => <<"Number of messages dropped due to message expiration when receiving">>}, + 'messages.dropped.no_subscribers' => #{ + type => integer, + description => <<"Number of messages dropped due to no subscribers">>}, + 'messages.forward' => #{ + type => integer, + description => <<"Number of messages forwarded to other nodes">>}, + 'messages.publish' => #{ + type => integer, + description => <<"Number of messages published in addition to system messages">>}, + 'messages.qos0.received' => #{ + type => integer, + description => <<"Number of QoS 0 messages received from clients">>}, + 'messages.qos1.received' => #{ + type => integer, + description => <<"Number of QoS 1 messages received from clients">>}, + 'messages.qos2.received' => #{ + type => integer, + description => <<"Number of QoS 2 messages received from clients">>}, + 'messages.qos0.sent' => #{ + type => integer, + description => <<"Number of QoS 0 messages sent to clients">>}, + 'messages.qos1.sent' => #{ + type => integer, + description => <<"Number of QoS 1 messages sent to clients">>}, + 'messages.qos2.sent' => #{ + type => integer, + description => <<"Number of QoS 2 messages sent to clients">>}, + 'messages.received' => #{ + type => integer, + description => <<"Number of messages received from the client, equal to the sum of messages.qos0.received,messages.qos1.received and messages.qos2.received">>}, + 'messages.sent' => #{ + type => integer, + description => <<"Number of messages sent to the client, equal to the sum of messages.qos0.sent,messages.qos1.sent and messages.qos2.sent">>}, + 'messages.retained' => #{ + type => integer, + description => <<"Number of retained messages stored by EMQ X Broker">>}, + 'messages.acked' => #{ + type => integer, + description => <<"Number of received PUBACK and PUBREC packet">>}, + 'packets.received' => #{ + type => integer, + description => <<"Number of received packet">>}, + 'packets.sent' => #{ + type => integer, + description => <<"Number of sent packet">>}, + 'packets.connect.received' => #{ + type => integer, + description => <<"Number of received CONNECT packet">>}, + 'packets.connack.auth_error' => #{ + type => integer, + description => <<"Number of received CONNECT packet with failed authentication">>}, + 'packets.connack.error' => #{ + type => integer, + description => <<"Number of received CONNECT packet with unsuccessful connections">>}, + 'packets.connack.sent' => #{ + type => integer, + description => <<"Number of sent CONNACK packet">>}, + 'packets.publish.received' => #{ + type => integer, + description => <<"Number of received PUBLISH packet">>}, + 'packets.publish.sent' => #{ + type => integer, + description => <<"Number of sent PUBLISH packet">>}, + 'packets.publish.inuse' => #{ + type => integer, + description => <<"Number of received PUBLISH packet with occupied identifiers">>}, + 'packets.publish.auth_error' => #{ + type => integer, + description => <<"Number of received PUBLISH packets with failed the Authorization check">>}, + 'packets.publish.error' => #{ + type => integer, + description => <<"Number of received PUBLISH packet that cannot be published">>}, + 'packets.publish.dropped' => #{ + type => integer, + description => <<"Number of messages discarded due to the receiving limit">>}, + 'packets.puback.received' => #{ + type => integer, + description => <<"Number of received PUBACK packet">>}, + 'packets.puback.sent' => #{ + type => integer, + description => <<"Number of sent PUBACK packet">>}, + 'packets.puback.inuse' => #{ + type => integer, + description => <<"Number of received PUBACK packet with occupied identifiers">>}, + 'packets.puback.missed' => #{ + type => integer, + description => <<"Number of received packet with identifiers.">>}, + 'packets.pubrec.received' => #{ + type => integer, + description => <<"Number of received PUBREC packet">>}, + 'packets.pubrec.sent' => #{ + type => integer, + description => <<"Number of sent PUBREC packet">>}, + 'packets.pubrec.inuse' => #{ + type => integer, + description => <<"Number of received PUBREC packet with occupied identifiers">>}, + 'packets.pubrec.missed' => #{ + type => integer, + description => <<"Number of received PUBREC packet with unknown identifiers">>}, + 'packets.pubrel.received' => #{ + type => integer, + description => <<"Number of received PUBREL packet">>}, + 'packets.pubrel.sent' => #{ + type => integer, + description => <<"Number of sent PUBREL packet">>}, + 'packets.pubrel.missed' => #{ + type => integer, + description => <<"Number of received PUBREC packet with unknown identifiers">>}, + 'packets.pubcomp.received' => #{ + type => integer, + description => <<"Number of received PUBCOMP packet">>}, + 'packets.pubcomp.sent' => #{ + type => integer, + description => <<"Number of sent PUBCOMP packet">>}, + 'packets.pubcomp.inuse' => #{ + type => integer, + description => <<"Number of received PUBCOMP packet with occupied identifiers">>}, + 'packets.pubcomp.missed' => #{ + type => integer, + description => <<"Number of missed PUBCOMP packet">>}, + 'packets.subscribe.received' => #{ + type => integer, + description => <<"Number of received SUBSCRIBE packet">>}, + 'packets.subscribe.error' => #{ + type => integer, + description => <<"Number of received SUBSCRIBE packet with failed subscriptions">>}, + 'packets.subscribe.auth_error' => #{ + type => integer, + description => <<"Number of received SUBACK packet with failed Authorization check">>}, + 'packets.suback.sent' => #{ + type => integer, + description => <<"Number of sent SUBACK packet">>}, + 'packets.unsubscribe.received' => #{ + type => integer, + description => <<"Number of received UNSUBSCRIBE packet">>}, + 'packets.unsubscribe.error' => #{ + type => integer, + description => <<"Number of received UNSUBSCRIBE packet with failed unsubscriptions">>}, + 'packets.unsuback.sent' => #{ + type => integer, + description => <<"Number of sent UNSUBACK packet">>}, + 'packets.pingreq.received' => #{ + type => integer, + description => <<"Number of received PINGREQ packet">>}, + 'packets.pingresp.sent' => #{ + type => integer, + description => <<"Number of sent PUBRESP packet">>}, + 'packets.disconnect.received' => #{ + type => integer, + description => <<"Number of received DISCONNECT packet">>}, + 'packets.disconnect.sent' => #{ + type => integer, + description => <<"Number of sent DISCONNECT packet">>}, + 'packets.auth.received' => #{ + type => integer, + description => <<"Number of received AUTH packet">>}, + 'packets.auth.sent' => #{ + type => integer, + description => <<"Number of sent AUTH packet">>}, + 'rules.matched' => #{ + type => integer, + description => <<"Number of rule matched">>}, + 'session.created' => #{ + type => integer, + description => <<"Number of sessions created">>}, + 'session.discarded' => #{ + type => integer, + description => <<"Number of sessions dropped because Clean Session or Clean Start is true">>}, + 'session.resumed' => #{ + type => integer, + description => <<"Number of sessions resumed because Clean Session or Clean Start is false">>}, + 'session.takeovered' => #{ + type => integer, + description => <<"Number of sessions takeovered because Clean Session or Clean Start is false">>}, + 'session.terminated' => #{ + type => integer, + description => <<"Number of terminated sessions">>} }. metrics_api() -> Metadata = #{ get => #{ description => <<"EMQ X metrics">>, + parameters => [#{ + name => aggregate, + in => query, + schema => #{type => boolean} + }], responses => #{ - <<"200">> => emqx_mgmt_util:response_schema(<<"List all metrics">>, metrics)}}}, + <<"200">> => #{ + description => <<"List all metrics">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"metrics_info">>) + } + } + } + } + } + }, {"/metrics", Metadata, list}. %%%============================================================================================== %% api apply -list(get, _) -> - {200, emqx_mgmt:get_metrics()}. +list(get, Request) -> + Params = cowboy_req:parse_qs(Request), + case proplists:get_value(<<"aggregate">>, Params, undefined) of + <<"true">> -> + {200, emqx_mgmt:get_metrics()}; + _ -> + Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) || + Node <- ekka_mnesia:running_nodes()], + {200, Data} + end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index fa93fabfe..706723f8b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -22,86 +22,121 @@ -export([list/2]). api_spec() -> - {[stats_api()], [stats_schema()]}. + {[stats_api()], stats_schema()}. stats_schema() -> - #{ - stats => #{ + Stats = #{ + type => array, + items => #{ type => object, - properties => #{ - 'connections.count' => #{ - type => integer, - description => <<"Number of current connections">>}, - 'connections.max' => #{ - type => integer, - description => <<"Historical maximum number of connections">>}, - 'channels.count' => #{ - type => integer, - description => <<"sessions.count">>}, - 'channels.max' => #{ - type => integer, - description => <<"session.max">>}, - 'sessions.count' => #{ - type => integer, - description => <<"Number of current sessions">>}, - 'sessions.max' => #{ - type => integer, - description => <<"Historical maximum number of sessions">>}, - 'topics.count' => #{ - type => integer, - description => <<"Number of current topics">>}, - 'topics.max' => #{ - type => integer, - description => <<"Historical maximum number of topics">>}, - 'suboptions.count' => #{ - type => integer, - description => <<"subscriptions.count">>}, - 'suboptions.max' => #{ - type => integer, - description => <<"subscriptions.max">>}, - 'subscribers.count' => #{ - type => integer, - description => <<"Number of current subscribers">>}, - 'subscribers.max' => #{ - type => integer, - description => <<"Historical maximum number of subscribers">>}, - 'subscriptions.count' => #{ - type => integer, - description => <<"Number of current subscriptions, including shared subscriptions">>}, - 'subscriptions.max' => #{ - type => integer, - description => <<"Historical maximum number of subscriptions">>}, - 'subscriptions.shared.count' => #{ - type => integer, - description => <<"Number of current shared subscriptions">>}, - 'subscriptions.shared.max' => #{ - type => integer, - description => <<"Historical maximum number of shared subscriptions">>}, - 'routes.count' => #{ - type => integer, - description => <<"Number of current routes">>}, - 'routes.max' => #{ - type => integer, - description => <<"Historical maximum number of routes">>}, - 'retained.count' => #{ - type => integer, - description => <<"Number of currently retained messages">>}, - 'retained.max' => #{ - type => integer, - description => <<"Historical maximum number of retained messages">>} - } + properties => maps:put('node', #{type => string, description => <<"Node">>}, properties()) } + }, + Stat = #{ + type => object, + properties => properties() + }, + StatsInfo =#{ + oneOf => [ minirest:ref(<<"stats">>) + , minirest:ref(<<"stat">>) + ] + }, + [#{stats => Stats, stat => Stat, stats_info => StatsInfo}]. + +properties() -> + #{ + 'connections.count' => #{ + type => integer, + description => <<"Number of current connections">>}, + 'connections.max' => #{ + type => integer, + description => <<"Historical maximum number of connections">>}, + 'channels.count' => #{ + type => integer, + description => <<"sessions.count">>}, + 'channels.max' => #{ + type => integer, + description => <<"session.max">>}, + 'sessions.count' => #{ + type => integer, + description => <<"Number of current sessions">>}, + 'sessions.max' => #{ + type => integer, + description => <<"Historical maximum number of sessions">>}, + 'topics.count' => #{ + type => integer, + description => <<"Number of current topics">>}, + 'topics.max' => #{ + type => integer, + description => <<"Historical maximum number of topics">>}, + 'suboptions.count' => #{ + type => integer, + description => <<"subscriptions.count">>}, + 'suboptions.max' => #{ + type => integer, + description => <<"subscriptions.max">>}, + 'subscribers.count' => #{ + type => integer, + description => <<"Number of current subscribers">>}, + 'subscribers.max' => #{ + type => integer, + description => <<"Historical maximum number of subscribers">>}, + 'subscriptions.count' => #{ + type => integer, + description => <<"Number of current subscriptions, including shared subscriptions">>}, + 'subscriptions.max' => #{ + type => integer, + description => <<"Historical maximum number of subscriptions">>}, + 'subscriptions.shared.count' => #{ + type => integer, + description => <<"Number of current shared subscriptions">>}, + 'subscriptions.shared.max' => #{ + type => integer, + description => <<"Historical maximum number of shared subscriptions">>}, + 'routes.count' => #{ + type => integer, + description => <<"Number of current routes">>}, + 'routes.max' => #{ + type => integer, + description => <<"Historical maximum number of routes">>}, + 'retained.count' => #{ + type => integer, + description => <<"Number of currently retained messages">>}, + 'retained.max' => #{ + type => integer, + description => <<"Historical maximum number of retained messages">>} }. stats_api() -> Metadata = #{ get => #{ description => <<"EMQ X stats">>, + parameters => [#{ + name => aggregate, + in => query, + schema => #{type => boolean} + }], responses => #{ - <<"200">> => emqx_mgmt_util:response_schema(<<"List stats ok">>, stats)}}}, + <<"200">> => #{ + description => <<"List stats ok">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"stats_info">>) + } + } + } + }}}, {"/stats", Metadata, list}. %%%============================================================================================== %% api apply -list(get, _Request) -> - {200, emqx_mgmt:get_stats()}. +list(get, Request) -> + Params = cowboy_req:parse_qs(Request), + case proplists:get_value(<<"aggregate">>, Params, undefined) of + <<"true">> -> + {200, emqx_mgmt:get_stats()}; + _ -> + Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) || + Node <- ekka_mnesia:running_nodes()], + {200, Data} + end. diff --git a/apps/emqx_management/test/emqx_mgmt_metrics_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_metrics_api_SUITE.erl index 7bd5891c8..b54489e37 100644 --- a/apps/emqx_management/test/emqx_mgmt_metrics_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_metrics_api_SUITE.erl @@ -40,7 +40,7 @@ set_special_configs(_App) -> ok. t_metrics_api(_) -> - MetricsPath = emqx_mgmt_api_test_util:api_path(["metrics"]), + MetricsPath = emqx_mgmt_api_test_util:api_path(["metrics?aggregate=true"]), SystemMetrics = emqx_mgmt:get_metrics(), {ok, MetricsResponse} = emqx_mgmt_api_test_util:request_api(get, MetricsPath), Metrics = emqx_json:decode(MetricsResponse, [return_maps]), diff --git a/apps/emqx_management/test/emqx_mgmt_stats_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_stats_api_SUITE.erl index dbbca9d43..395a0851e 100644 --- a/apps/emqx_management/test/emqx_mgmt_stats_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_stats_api_SUITE.erl @@ -40,7 +40,7 @@ set_special_configs(_App) -> ok. t_stats_api(_) -> - StatsPath = emqx_mgmt_api_test_util:api_path(["stats"]), + StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]), SystemStats = emqx_mgmt:get_stats(), {ok, StatsResponse} = emqx_mgmt_api_test_util:request_api(get, StatsPath), Stats = emqx_json:decode(StatsResponse, [return_maps]),