diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index fa483e90e..8fe62918f 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -13,7 +13,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.3"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.10.1"}}} diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index bcd9ee1aa..b539b223b 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -175,7 +175,7 @@ broker_info(Node) -> %%-------------------------------------------------------------------- get_metrics() -> - [{Node, get_metrics(Node)} || Node <- ekka_mnesia:running_nodes()]. + nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]). get_metrics(Node) when Node =:= node() -> emqx_metrics:all(); @@ -183,13 +183,44 @@ get_metrics(Node) -> rpc_call(Node, get_metrics, [Node]). get_stats() -> - [{Node, get_stats(Node)} || Node <- ekka_mnesia:running_nodes()]. + GlobalStatsKeys = + [ 'retained.count' + , 'retained.max' + , 'routes.count' + , 'routes.max' + , 'subscriptions.shared.count' + , 'subscriptions.shared.max' + ], + CountStats = nodes_info_count([ + begin + Stats = get_stats(Node), + delete_keys(Stats, GlobalStatsKeys) + end || Node <- ekka_mnesia:running_nodes()]), + GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))), + maps:merge(CountStats, GlobalStats). + +delete_keys(List, []) -> + List; +delete_keys(List, [Key | Keys]) -> + delete_keys(proplists:delete(Key, List), Keys). get_stats(Node) when Node =:= node() -> emqx_stats:getstats(); get_stats(Node) -> rpc_call(Node, get_stats, [Node]). +nodes_info_count(PropList) -> + NodeCount = + fun({Key, Value}, Result) -> + Count = maps:get(Key, Result, 0), + Result#{Key => Count + Value} + end, + AllCount = + fun(StatsMap, Result) -> + lists:foldl(NodeCount, Result, StatsMap) + end, + lists:foldl(AllCount, #{}, PropList). + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 74cd995a4..138b61097 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -313,6 +313,7 @@ subscribe_api() -> #{ name => topic, in => query, + type => string, required => true, default => <<"topic_1">> } diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index a4bf652a2..d959b0b17 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -16,27 +16,277 @@ -module(emqx_mgmt_api_metrics). --rest_api(#{name => list_all_metrics, - method => 'GET', - path => "/metrics", - func => list, - descr => "A list of metrics of all nodes in the cluster"}). +-behavior(minirest_api). --rest_api(#{name => list_node_metrics, - method => 'GET', - path => "/nodes/:atom:node/metrics", - func => list, - descr => "A list of metrics of a node"}). +-export([api_spec/0]). -export([list/2]). -list(Bindings, _Params) when map_size(Bindings) == 0 -> - emqx_mgmt:return({ok, [#{node => Node, metrics => maps:from_list(Metrics)} - || {Node, Metrics} <- emqx_mgmt:get_metrics()]}); +api_spec() -> + {[metrics_api()], [metrics_schema()]}. -list(#{node := Node}, _Params) -> - case emqx_mgmt:get_metrics(Node) of - {error, Reason} -> emqx_mgmt:return({error, Reason}); - Metrics -> emqx_mgmt:return({ok, maps:from_list(Metrics)}) - end. +metrics_schema() -> + DefinitionName = <<"metrics">>, + DefinitionProperties = #{ + <<"actions.failure">> => #{ + type => <<"integer">>, + description => <<"Number of failure executions of the rule engine action">>}, + <<"actions.success">> => #{ + type => <<"integer">>, + description => <<"Number of successful executions of the rule engine action">>}, + <<"bytes.received">> => #{ + type => <<"integer">>, + description => <<"Number of bytes received by EMQ X Broker">>}, + <<"bytes.sent">> => #{ + type => <<"integer">>, + description => <<"Number of bytes sent by EMQ X Broker on this connection">>}, + <<"client.authenticate">> => #{ + type => <<"integer">>, + description => <<"Number of client authentications">>}, + <<"client.auth.anonymous">> => #{ + type => <<"integer">>, + description => <<"Number of clients who log in anonymously">>}, + <<"client.connect">> => #{ + type => <<"integer">>, + description => <<"Number of client connections">>}, + <<"client.connack">> => #{ + type => <<"integer">>, + description => <<"Number of CONNACK packet sent">>}, + <<"client.connected">> => #{ + type => <<"integer">>, + description => <<"Number of successful client connections">>}, + <<"client.disconnected">> => #{ + type => <<"integer">>, + description => <<"Number of client disconnects">>}, + <<"client.check_acl">> => #{ + type => <<"integer">>, + description => <<"Number of ACL rule checks">>}, + <<"client.subscribe">> => #{ + type => <<"integer">>, + description => <<"Number of client subscriptions">>}, + <<"client.unsubscribe">> => #{ + type => <<"integer">>, + description => <<"Number of client unsubscriptions">>}, + <<"delivery.dropped.too_large">> => #{ + type => <<"integer">>, + description => <<"The number of messages that were dropped because the length exceeded the limit when sending">>}, + <<"delivery.dropped.queue_full">> => #{ + type => <<"integer">>, + description => <<"Number of messages with a non-zero QoS that were dropped because the message queue was full when sending">>}, + <<"delivery.dropped.qos0_msg">> => #{ + type => <<"integer">>, + description => <<"Number of messages with QoS 0 that were dropped because the message queue was full when sending">>}, + <<"delivery.dropped.expired">> => #{ + type => <<"integer">>, + description => <<"Number of messages dropped due to message expiration on sending">>}, + <<"delivery.dropped.no_local">> => #{ + type => <<"integer">>, + description => <<"Number of messages that were dropped due to the No Local subscription option when sending">>}, + <<"delivery.dropped">> => #{ + type => <<"integer">>, + description => <<"Total number of discarded messages when sending">>}, + <<"messages.delayed">> => #{ + type => <<"integer">>, + description => <<"Number of delay- published messages stored by EMQ X Broker">>}, + <<"messages.delivered">> => #{ + type => <<"integer">>, + description => <<"Number of messages forwarded to the subscription process internally by EMQ X Broker">>}, + <<"messages.dropped">> => #{ + type => <<"integer">>, + description => <<"Total number of messages dropped by EMQ X Broker before forwarding to the subscription process">>}, + <<"messages.dropped.expired">> => #{ + type => <<"integer">>, + description => <<"Number of messages dropped due to message expiration when receiving">>}, + <<"messages.dropped.no_subscribers">> => #{ + type => <<"integer">>, + description => <<"Number of messages dropped due to no subscribers">>}, + <<"messages.forward">> => #{ + type => <<"integer">>, + description => <<"Number of messages forwarded to other nodes">>}, + <<"messages.publish">> => #{ + type => <<"integer">>, + description => <<"Number of messages published in addition to system messages">>}, + <<"messages.qos0.received">> => #{ + type => <<"integer">>, + description => <<"Number of QoS 0 messages received from clients">>}, + <<"messages.qos1.received">> => #{ + type => <<"integer">>, + description => <<"Number of QoS 1 messages received from clients">>}, + <<"messages.qos2.received">> => #{ + type => <<"integer">>, + description => <<"Number of QoS 2 messages received from clients">>}, + <<"messages.qos0.sent">> => #{ + type => <<"integer">>, + description => <<"Number of QoS 0 messages sent to clients">>}, + <<"messages.qos1.sent">> => #{ + type => <<"integer">>, + description => <<"Number of QoS 1 messages sent to clients">>}, + <<"messages.qos2.sent">> => #{ + type => <<"integer">>, + description => <<"Number of QoS 2 messages sent to clients">>}, + <<"messages.received">> => #{ + type => <<"integer">>, + description => <<"Number of messages received from the client, equal to the sum of messages.qos0.received,messages.qos1.received and messages.qos2.received">>}, + <<"messages.sent">> => #{ + type => <<"integer">>, + description => <<"Number of messages sent to the client, equal to the sum of messages.qos0.sent,messages.qos1.sent and messages.qos2.sent">>}, + <<"messages.retained">> => #{ + type => <<"integer">>, + description => <<"Number of retained messages stored by EMQ X Broker">>}, + <<"messages.acked">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBACK and PUBREC packet">>}, + <<"packets.received">> => #{ + type => <<"integer">>, + description => <<"Number of received packet">>}, + <<"packets.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent packet">>}, + <<"packets.connect.received">> => #{ + type => <<"integer">>, + description => <<"Number of received CONNECT packet">>}, + <<"packets.connack.auth_error">> => #{ + type => <<"integer">>, + description => <<"Number of received CONNECT packet with failed authentication">>}, + <<"packets.connack.error">> => #{ + type => <<"integer">>, + description => <<"Number of received CONNECT packet with unsuccessful connections">>}, + <<"packets.connack.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent CONNACK packet">>}, + <<"packets.publish.received">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBLISH packet">>}, + <<"packets.publish.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent PUBLISH packet">>}, + <<"packets.publish.inuse">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBLISH packet with occupied identifiers">>}, + <<"packets.publish.auth_error">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBLISH packets with failed the ACL check">>}, + <<"packets.publish.error">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBLISH packet that cannot be published">>}, + <<"packets.publish.dropped">> => #{ + type => <<"integer">>, + description => <<"Number of messages discarded due to the receiving limit">>}, + <<"packets.puback.received">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBACK packet">>}, + <<"packets.puback.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent PUBACK packet">>}, + <<"packets.puback.inuse">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBACK packet with occupied identifiers">>}, + <<"packets.puback.missed">> => #{ + type => <<"integer">>, + description => <<"Number of received packet with identifiers.">>}, + <<"packets.pubrec.received">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBREC packet">>}, + <<"packets.pubrec.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent PUBREC packet">>}, + <<"packets.pubrec.inuse">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBREC packet with occupied identifiers">>}, + <<"packets.pubrec.missed">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBREC packet with unknown identifiers">>}, + <<"packets.pubrel.received">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBREL packet">>}, + <<"packets.pubrel.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent PUBREL packet">>}, + <<"packets.pubrel.missed">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBREC packet with unknown identifiers">>}, + <<"packets.pubcomp.received">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBCOMP packet">>}, + <<"packets.pubcomp.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent PUBCOMP packet">>}, + <<"packets.pubcomp.inuse">> => #{ + type => <<"integer">>, + description => <<"Number of received PUBCOMP packet with occupied identifiers">>}, + <<"packets.pubcomp.missed">> => #{ + type => <<"integer">>, + description => <<"Number of missed PUBCOMP packet">>}, + <<"packets.subscribe.received">> => #{ + type => <<"integer">>, + description => <<"Number of received SUBSCRIBE packet">>}, + <<"packets.subscribe.error">> => #{ + type => <<"integer">>, + description => <<"Number of received SUBSCRIBE packet with failed subscriptions">>}, + <<"packets.subscribe.auth_error">> => #{ + type => <<"integer">>, + description => <<"Number of received SUBACK packet with failed ACL check">>}, + <<"packets.suback.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent SUBACK packet">>}, + <<"packets.unsubscribe.received">> => #{ + type => <<"integer">>, + description => <<"Number of received UNSUBSCRIBE packet">>}, + <<"packets.unsubscribe.error">> => #{ + type => <<"integer">>, + description => <<"Number of received UNSUBSCRIBE packet with failed unsubscriptions">>}, + <<"packets.unsuback.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent UNSUBACK packet">>}, + <<"packets.pingreq.received">> => #{ + type => <<"integer">>, + description => <<"Number of received PINGREQ packet">>}, + <<"packets.pingresp.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent PUBRESP packet">>}, + <<"packets.disconnect.received">> => #{ + type => <<"integer">>, + description => <<"Number of received DISCONNECT packet">>}, + <<"packets.disconnect.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent DISCONNECT packet">>}, + <<"packets.auth.received">> => #{ + type => <<"integer">>, + description => <<"Number of received AUTH packet">>}, + <<"packets.auth.sent">> => #{ + type => <<"integer">>, + description => <<"Number of sent AUTH packet">>}, + <<"rules.matched">> => #{ + type => <<"integer">>, + description => <<"Number of rule matched">>}, + <<"session.created">> => #{ + type => <<"integer">>, + description => <<"Number of sessions created">>}, + <<"session.discarded">> => #{ + type => <<"integer">>, + description => <<"Number of sessions dropped because Clean Session or Clean Start is true">>}, + <<"session.resumed">> => #{ + type => <<"integer">>, + description => <<"Number of sessions resumed because Clean Session or Clean Start is false">>}, + <<"session.takeovered">> => #{ + type => <<"integer">>, + description => <<"Number of sessions takeovered because Clean Session or Clean Start is false">>}, + <<"session.terminated">> => #{ + type => <<"integer">>, + description => <<"Number of terminated sessions">>}}, + {DefinitionName, DefinitionProperties}. +metrics_api() -> + Metadata = #{ + get => #{ + description => "EMQ X metrics", + responses => #{ + <<"200">> => #{ + schema => cowboy_swagger:schema(<<"metrics">>)}}}}, + {"/metrics", Metadata, list}. + +%%%============================================================================================== +%% api apply +list(get, _) -> + Response = emqx_json:encode(emqx_mgmt:get_metrics()), + {200, Response}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index 3cb38c3a7..1aaecffdc 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -20,7 +20,9 @@ -export([api_spec/0]). -export([ nodes/2 - , node/2]). + , node/2 + , node_metrics/2 + , node_stats/2]). -include_lib("emqx/include/emqx.hrl"). @@ -29,9 +31,13 @@ api_spec() -> apis() -> [ nodes_api() - , node_api()]. + , node_api() + , node_metrics_api() + , node_stats_api()]. schemas() -> + %% notice: node api used schema metrics and stats + %% see these schema in emqx_mgmt_api_metrics emqx_mgmt_api_status [node_schema()]. node_schema() -> @@ -121,15 +127,60 @@ node_api() -> schema => cowboy_swagger:schema(<<"node">>)}}}}, {"/nodes/:node_name", Metadata, node}. +node_metrics_api() -> + Metadata = #{ + get => #{ + description => "Get node metrics", + parameters => [#{ + name => node_name, + in => path, + description => "node name", + type => string, + required => true, + default => node()}], + responses => #{ + <<"400">> => + emqx_mgmt_util:not_found_schema(<<"Node error">>, [<<"SOURCE_ERROR">>]), + <<"200">> => #{ + description => <<"Get EMQ X Node Metrics">>, + schema => cowboy_swagger:schema(<<"metrics">>)}}}}, + {"/nodes/:node_name/metrics", Metadata, node_metrics}. + +node_stats_api() -> + Metadata = #{ + get => #{ + description => "Get node stats", + parameters => [#{ + name => node_name, + in => path, + description => "node name", + type => string, + required => true, + default => node()}], + responses => #{ + <<"400">> => + emqx_mgmt_util:not_found_schema(<<"Node error">>, [<<"SOURCE_ERROR">>]), + <<"200">> => #{ + description => <<"Get EMQ X Node Stats">>, + schema => cowboy_swagger:schema(<<"stats">>)}}}}, + {"/nodes/:node_name/stats", Metadata, node_metrics}. + %%%============================================================================================== %% parameters trans nodes(get, _Request) -> list(#{}). node(get, Request) -> - NodeName = cowboy_req:binding(node_name, Request), - Node = binary_to_atom(NodeName, utf8), - get_node(#{node => Node}). + Params = node_name_path_parameter(Request), + get_node(Params). + +node_metrics(get, Request) -> + Params = node_name_path_parameter(Request), + get_metrics(Params). + +node_stats(get, Request) -> + Params = node_name_path_parameter(Request), + get_stats(Params). %%%============================================================================================== %% api apply @@ -147,8 +198,29 @@ get_node(#{node := Node}) -> {200, Response} end. +get_metrics(#{node := Node}) -> + case emqx_mgmt:get_metrics(Node) of + {error, _} -> + {400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})}; + Metrics -> + {200, emqx_json:encode(Metrics)} + end. + +get_stats(#{node := Node}) -> + case emqx_mgmt:get_stats(Node) of + {error, _} -> + {400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})}; + Stats -> + {200, emqx_json:encode(Stats)} + end. + %%============================================================================================================ %% internal function +node_name_path_parameter(Request) -> + NodeName = cowboy_req:binding(node_name, Request), + Node = binary_to_atom(NodeName, utf8), + #{node => Node}. + format(_Node, Info = #{memory_total := Total, memory_used := Used}) -> {ok, SysPathBinary} = file:get_cwd(), SysPath = list_to_binary(SysPathBinary), diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl new file mode 100644 index 000000000..a3305bc55 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_publish). +%% API +-include_lib("emqx/include/emqx.hrl"). + +-behavior(minirest_api). + +-export([api_spec/0]). + +-export([ publish/2 + , publish_batch/2]). + +api_spec() -> + { + [publish_api(), publish_batch_api()], + [request_message_schema(), mqtt_message_schema()] + }. + +publish_api() -> + MeteData = #{ + post => #{ + description => "publish", + parameters => [#{ + name => message, + in => body, + required => true, + schema => minirest:ref(<<"request_message">>) + }], + responses => #{ + <<"200">> => #{ + description => <<"publish ok">>, + schema => minirest:ref(<<"message">>)}}}}, + {"/publish", MeteData, publish}. + +publish_batch_api() -> + MeteData = #{ + post => #{ + description => "publish", + parameters => [#{ + name => message, + in => body, + required => true, + schema =>#{ + type => array, + items => minirest:ref(<<"request_message">>)} + }], + responses => #{ + <<"200">> => #{ + description => <<"publish result">>, + schema => #{ + type => array, + items => minirest:ref(<<"message">>)}}}}}, + {"/publish_batch", MeteData, publish_batch}. + +request_message_schema() -> + {<<"request_message">>, maps:without([<<"id">>], message_def())}. + +mqtt_message_schema() -> + {<<"message">>, message_def()}. + +message_def() -> + #{ + <<"id">> => #{ + type => <<"string">>, + description => <<"Message ID">>}, + <<"topic">> => #{ + type => <<"string">>, + description => <<"Topic">>}, + <<"qos">> => #{ + type => <<"integer">>, + enum => [0, 1, 2], + description => <<"Qos">>}, + <<"payload">> => #{ + type => <<"string">>, + description => <<"Topic">>}, + <<"from">> => #{ + type => <<"string">>, + description => <<"Message from">>}, + <<"flag">> => #{ + type => <<"object">>, + description => <<"Message flag">>, + properties => #{ + <<"sys">> => #{ + type => <<"boolean">>, + default => false, + description => <<"System message flag, nullable, default false">>}, + <<"dup">> => #{ + type => <<"boolean">>, + default => false, + description => <<"Dup message flag, nullable, default false">>}, + <<"retain">> => #{ + type => <<"boolean">>, + default => false, + description => <<"Retain message flag, nullable, default false">>}}} + }. + +publish(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Message = message(emqx_json:decode(Body, [return_maps])), + _ = emqx_mgmt:publish(Message), + {200, emqx_json:encode(format_message(Message))}. + +publish_batch(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Messages = messages(emqx_json:decode(Body, [return_maps])), + _ = [emqx_mgmt:publish(Message) || Message <- Messages], + ResponseBody = emqx_json:encode(format_message(Messages)), + {200, ResponseBody}. + +message(Map) -> + From = maps:get(<<"from">>, Map, http_api), + QoS = maps:get(<<"qos">>, Map, 0), + Topic = maps:get(<<"topic">>, Map), + Payload = maps:get(<<"payload">>, Map), + Flags = flags(Map), + emqx_message:make(From, QoS, Topic, Payload, Flags, #{}). + +flags(Map) -> + Flags = maps:get(<<"flags">>, Map, #{}), + Retain = maps:get(<<"retain">>, Flags, false), + Sys = maps:get(<<"sys">>, Flags, false), + Dup = maps:get(<<"dup">>, Flags, false), + #{ + retain => Retain, + sys => Sys, + dup => Dup + }. + +messages(List) -> + [message(MessageMap) || MessageMap <- List]. + +format_message(Messages) when is_list(Messages)-> + [format_message(Message) || Message <- Messages]; +format_message(#message{id = ID, qos = Qos, from = From, topic = Topic, payload = Payload, flags = Flags}) -> + #{ + id => emqx_guid:to_hexstr(ID), + qos => Qos, + topic => Topic, + payload => Payload, + flag => Flags, + from => to_binary(From) + }. + +to_binary(Data) when is_binary(Data) -> + Data; +to_binary(Data) -> + list_to_binary(io_lib:format("~p", [Data])). diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index e57c3dc0e..55c24189a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -13,33 +13,93 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- - -module(emqx_mgmt_api_stats). --rest_api(#{name => list_stats, - method => 'GET', - path => "/stats/", - func => list, - descr => "A list of stats of all nodes in the cluster"}). +-behavior(minirest_api). --rest_api(#{name => lookup_node_stats, - method => 'GET', - path => "/nodes/:atom:node/stats/", - func => lookup, - descr => "A list of stats of a node"}). +-export([api_spec/0]). --export([ list/2 - , lookup/2 - ]). +-export([list/2]). -%% List stats of all nodes -list(Bindings, _Params) when map_size(Bindings) == 0 -> - emqx_mgmt:return({ok, [#{node => Node, stats => maps:from_list(Stats)} - || {Node, Stats} <- emqx_mgmt:get_stats()]}). +api_spec() -> + {stats_api(), stats_schema()}. -%% List stats of a node -lookup(#{node := Node}, _Params) -> - case emqx_mgmt:get_stats(Node) of - {error, Reason} -> emqx_mgmt:return({error, Reason}); - Stats -> emqx_mgmt:return({ok, maps:from_list(Stats)}) - end. +stats_schema() -> + DefinitionName = <<"stats">>, + DefinitionProperties = #{ + <<"connections.count">> => #{ + type => <<"integer">>, + description => <<"Number of current connections">>}, + <<"connections.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of connections">>}, + <<"channels.count">> => #{ + type => <<"integer">>, + description => <<"sessions.count">>}, + <<"channels.max">> => #{ + type => <<"integer">>, + description => <<"session.max">>}, + <<"sessions.count">> => #{ + type => <<"integer">>, + description => <<"Number of current sessions">>}, + <<"sessions.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of sessions">>}, + <<"topics.count">> => #{ + type => <<"integer">>, + description => <<"Number of current topics">>}, + <<"topics.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of topics">>}, + <<"suboptions.count">> => #{ + type => <<"integer">>, + description => <<"subscriptions.count">>}, + <<"suboptions.max">> => #{ + type => <<"integer">>, + description => <<"subscriptions.max">>}, + <<"subscribers.count">> => #{ + type => <<"integer">>, + description => <<"Number of current subscribers">>}, + <<"subscribers.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of subscribers">>}, + <<"subscriptions.count">> => #{ + type => <<"integer">>, + description => <<"Number of current subscriptions, including shared subscriptions">>}, + <<"subscriptions.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of subscriptions">>}, + <<"subscriptions.shared.count">> => #{ + type => <<"integer">>, + description => <<"Number of current shared subscriptions">>}, + <<"subscriptions.shared.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of shared subscriptions">>}, + <<"routes.count">> => #{ + type => <<"integer">>, + description => <<"Number of current routes">>}, + <<"routes.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of routes">>}, + <<"retained.count">> => #{ + type => <<"integer">>, + description => <<"Number of currently retained messages">>}, + <<"retained.max">> => #{ + type => <<"integer">>, + description => <<"Historical maximum number of retained messages">>}}, + [{DefinitionName, DefinitionProperties}]. + +stats_api() -> + Metadata = #{ + get => #{ + description => "EMQ X stats", + responses => #{ + <<"200">> => #{ + schema => cowboy_swagger:schema(<<"stats">>)}}}}, + [{"/stats", Metadata, list}]. + +%%%============================================================================================== +%% api apply +list(get, _Request) -> + Response = emqx_json:encode(emqx_mgmt:get_stats()), + {200, Response}. diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 4197973e7..6ba7cb93a 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -21,11 +21,13 @@ , kmg/1 , ntoa/1 , merge_maps/2 - , not_found_schema/1 - , not_found_schema/2 , batch_operation/3 ]). +-export([ not_found_schema/1 + , not_found_schema/2 + , batch_response_schema/1]). + -export([urldecode/1]). -define(KB, 1024). @@ -80,6 +82,8 @@ merge_maps(Default, New) -> urldecode(S) -> emqx_http_lib:uri_decode(S). +%%%============================================================================================== +%% schema util not_found_schema(Description) -> not_found_schema(Description, ["RESOURCE_NOT_FOUND"]). @@ -96,6 +100,34 @@ not_found_schema(Description, Enum) -> type => string}}} }. +batch_response_schema(DefName) -> + #{ + type => object, + properties => #{ + success => #{ + type => integer, + description => <<"Success count">>}, + failed => #{ + type => integer, + description => <<"Failed count">>}, + detail => #{ + type => array, + description => <<"Failed object & reason">>, + items => #{ + type => object, + properties => + #{ + data => minirest:ref(DefName), + reason => #{ + type => <<"string">> + } + } + } + } + } + }. + +%%%============================================================================================== batch_operation(Module, Function, ArgsList) -> Failed = batch_operation(Module, Function, ArgsList, []), Len = erlang:length(Failed), diff --git a/apps/emqx_management/test/emqx_mgmt_metrics_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_metrics_api_SUITE.erl new file mode 100644 index 000000000..7bd5891c8 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_metrics_api_SUITE.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_metrics_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_metrics_api(_) -> + MetricsPath = emqx_mgmt_api_test_util:api_path(["metrics"]), + SystemMetrics = emqx_mgmt:get_metrics(), + {ok, MetricsResponse} = emqx_mgmt_api_test_util:request_api(get, MetricsPath), + Metrics = emqx_json:decode(MetricsResponse, [return_maps]), + ?assertEqual(erlang:length(maps:keys(SystemMetrics)), erlang:length(maps:keys(Metrics))), + Fun = + fun(Key) -> + ?assertEqual(maps:get(Key, SystemMetrics), maps:get(atom_to_binary(Key, utf8), Metrics)) + end, + lists:foreach(Fun, maps:keys(SystemMetrics)). diff --git a/apps/emqx_management/test/emqx_mgmt_nodes_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_nodes_api_SUITE.erl index 52d2bf626..f0829b7fb 100644 --- a/apps/emqx_management/test/emqx_mgmt_nodes_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_nodes_api_SUITE.erl @@ -20,11 +20,6 @@ -include_lib("eunit/include/eunit.hrl"). --define(APP, emqx_management). - --define(SERVER, "http://127.0.0.1:8081"). --define(BASE_PATH, "/api/v5"). - all() -> emqx_ct:all(?MODULE). @@ -54,5 +49,29 @@ t_nodes_api(_) -> NodePath = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_list(node())]), {ok, NodeInfo} = emqx_mgmt_api_test_util:request_api(get, NodePath), - NodeNameResponse = binary_to_atom(maps:get(<<"node">>, emqx_json:decode(NodeInfo, [return_maps])), utf8), + NodeNameResponse = + binary_to_atom(maps:get(<<"node">>, emqx_json:decode(NodeInfo, [return_maps])), utf8), ?assertEqual(node(), NodeNameResponse). + +t_node_stats_api() -> + StatsPath = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "stats"]), + SystemStats= emqx_mgmt:get_stats(), + {ok, StatsResponse} = emqx_mgmt_api_test_util:request_api(get, StatsPath), + Stats = emqx_json:decode(StatsResponse, [return_maps]), + Fun = + fun(Key) -> + ?assertEqual(maps:get(Key, SystemStats), maps:get(atom_to_binary(Key, utf8), Stats)) + end, + lists:foreach(Fun, maps:keys(SystemStats)). + +t_node_metrics_api() -> + MetricsPath = + emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "metrics"]), + SystemMetrics= emqx_mgmt:get_metrics(), + {ok, MetricsResponse} = emqx_mgmt_api_test_util:request_api(get, MetricsPath), + Metrics = emqx_json:decode(MetricsResponse, [return_maps]), + Fun = + fun(Key) -> + ?assertEqual(maps:get(Key, SystemMetrics), maps:get(atom_to_binary(Key, utf8), Metrics)) + end, + lists:foreach(Fun, maps:keys(SystemMetrics)). diff --git a/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl new file mode 100644 index 000000000..9ecb1a11b --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl @@ -0,0 +1,92 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_publish_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(CLIENTID, <<"api_clientid">>). +-define(USERNAME, <<"api_username">>). + +-define(TOPIC1, <<"api_topic1">>). +-define(TOPIC2, <<"api_topic2">>). + + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_publish_api(_) -> + {ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}), + {ok, _} = emqtt:connect(Client), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = #{topic => ?TOPIC1, payload => Payload}, + {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), + ?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok), + emqtt:disconnect(Client). + +t_publish_batch_api(_) -> + {ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}), + {ok, _} = emqtt:connect(Client), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish_batch"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body =[#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}], + {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), + ResponseMap = emqx_json:decode(Response, [return_maps]), + ?assertEqual(2, erlang:length(ResponseMap)), + ?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok), + ?assertEqual(receive_assert(?TOPIC2, 0, Payload), ok), + emqtt:disconnect(Client). + +receive_assert(Topic, Qos, Payload) -> + receive + {publish, Message} -> + ReceiveTopic = maps:get(topic, Message), + ReceiveQos = maps:get(qos, Message), + ReceivePayload = maps:get(payload, Message), + ?assertEqual(ReceiveTopic , Topic), + ?assertEqual(ReceiveQos , Qos), + ?assertEqual(ReceivePayload , Payload), + ok + after 5000 -> + timeout + end. + diff --git a/apps/emqx_management/test/emqx_mgmt_stats_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_stats_api_SUITE.erl new file mode 100644 index 000000000..dbbca9d43 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_stats_api_SUITE.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_stats_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_stats_api(_) -> + StatsPath = emqx_mgmt_api_test_util:api_path(["stats"]), + SystemStats = emqx_mgmt:get_stats(), + {ok, StatsResponse} = emqx_mgmt_api_test_util:request_api(get, StatsPath), + Stats = emqx_json:decode(StatsResponse, [return_maps]), + ?assertEqual(erlang:length(maps:keys(SystemStats)), erlang:length(maps:keys(Stats))), + Fun = + fun(Key) -> + ?assertEqual(maps:get(Key, SystemStats), maps:get(atom_to_binary(Key, utf8), Stats)) + end, + lists:foreach(Fun, maps:keys(SystemStats)). diff --git a/rebar.config b/rebar.config index 68a199d56..b688df419 100644 --- a/rebar.config +++ b/rebar.config @@ -48,7 +48,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.3"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.1"}}}