Merge branch 'master' into emqx_config

This commit is contained in:
Shawn 2021-07-16 16:49:50 +08:00 committed by GitHub
commit bb6d7c4e0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 881 additions and 59 deletions

View File

@ -13,7 +13,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.3"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.10.1"}}}

View File

@ -175,7 +175,7 @@ broker_info(Node) ->
%%--------------------------------------------------------------------
get_metrics() ->
[{Node, get_metrics(Node)} || Node <- ekka_mnesia:running_nodes()].
nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]).
get_metrics(Node) when Node =:= node() ->
emqx_metrics:all();
@ -183,13 +183,44 @@ get_metrics(Node) ->
rpc_call(Node, get_metrics, [Node]).
get_stats() ->
[{Node, get_stats(Node)} || Node <- ekka_mnesia:running_nodes()].
GlobalStatsKeys =
[ 'retained.count'
, 'retained.max'
, 'routes.count'
, 'routes.max'
, 'subscriptions.shared.count'
, 'subscriptions.shared.max'
],
CountStats = nodes_info_count([
begin
Stats = get_stats(Node),
delete_keys(Stats, GlobalStatsKeys)
end || Node <- ekka_mnesia:running_nodes()]),
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
maps:merge(CountStats, GlobalStats).
delete_keys(List, []) ->
List;
delete_keys(List, [Key | Keys]) ->
delete_keys(proplists:delete(Key, List), Keys).
get_stats(Node) when Node =:= node() ->
emqx_stats:getstats();
get_stats(Node) ->
rpc_call(Node, get_stats, [Node]).
nodes_info_count(PropList) ->
NodeCount =
fun({Key, Value}, Result) ->
Count = maps:get(Key, Result, 0),
Result#{Key => Count + Value}
end,
AllCount =
fun(StatsMap, Result) ->
lists:foldl(NodeCount, Result, StatsMap)
end,
lists:foldl(AllCount, #{}, PropList).
%%--------------------------------------------------------------------
%% Clients
%%--------------------------------------------------------------------

View File

@ -313,6 +313,7 @@ subscribe_api() ->
#{
name => topic,
in => query,
type => string,
required => true,
default => <<"topic_1">>
}

View File

@ -16,27 +16,277 @@
-module(emqx_mgmt_api_metrics).
-rest_api(#{name => list_all_metrics,
method => 'GET',
path => "/metrics",
func => list,
descr => "A list of metrics of all nodes in the cluster"}).
-behavior(minirest_api).
-rest_api(#{name => list_node_metrics,
method => 'GET',
path => "/nodes/:atom:node/metrics",
func => list,
descr => "A list of metrics of a node"}).
-export([api_spec/0]).
-export([list/2]).
list(Bindings, _Params) when map_size(Bindings) == 0 ->
emqx_mgmt:return({ok, [#{node => Node, metrics => maps:from_list(Metrics)}
|| {Node, Metrics} <- emqx_mgmt:get_metrics()]});
api_spec() ->
{[metrics_api()], [metrics_schema()]}.
list(#{node := Node}, _Params) ->
case emqx_mgmt:get_metrics(Node) of
{error, Reason} -> emqx_mgmt:return({error, Reason});
Metrics -> emqx_mgmt:return({ok, maps:from_list(Metrics)})
end.
metrics_schema() ->
DefinitionName = <<"metrics">>,
DefinitionProperties = #{
<<"actions.failure">> => #{
type => <<"integer">>,
description => <<"Number of failure executions of the rule engine action">>},
<<"actions.success">> => #{
type => <<"integer">>,
description => <<"Number of successful executions of the rule engine action">>},
<<"bytes.received">> => #{
type => <<"integer">>,
description => <<"Number of bytes received by EMQ X Broker">>},
<<"bytes.sent">> => #{
type => <<"integer">>,
description => <<"Number of bytes sent by EMQ X Broker on this connection">>},
<<"client.authenticate">> => #{
type => <<"integer">>,
description => <<"Number of client authentications">>},
<<"client.auth.anonymous">> => #{
type => <<"integer">>,
description => <<"Number of clients who log in anonymously">>},
<<"client.connect">> => #{
type => <<"integer">>,
description => <<"Number of client connections">>},
<<"client.connack">> => #{
type => <<"integer">>,
description => <<"Number of CONNACK packet sent">>},
<<"client.connected">> => #{
type => <<"integer">>,
description => <<"Number of successful client connections">>},
<<"client.disconnected">> => #{
type => <<"integer">>,
description => <<"Number of client disconnects">>},
<<"client.check_acl">> => #{
type => <<"integer">>,
description => <<"Number of ACL rule checks">>},
<<"client.subscribe">> => #{
type => <<"integer">>,
description => <<"Number of client subscriptions">>},
<<"client.unsubscribe">> => #{
type => <<"integer">>,
description => <<"Number of client unsubscriptions">>},
<<"delivery.dropped.too_large">> => #{
type => <<"integer">>,
description => <<"The number of messages that were dropped because the length exceeded the limit when sending">>},
<<"delivery.dropped.queue_full">> => #{
type => <<"integer">>,
description => <<"Number of messages with a non-zero QoS that were dropped because the message queue was full when sending">>},
<<"delivery.dropped.qos0_msg">> => #{
type => <<"integer">>,
description => <<"Number of messages with QoS 0 that were dropped because the message queue was full when sending">>},
<<"delivery.dropped.expired">> => #{
type => <<"integer">>,
description => <<"Number of messages dropped due to message expiration on sending">>},
<<"delivery.dropped.no_local">> => #{
type => <<"integer">>,
description => <<"Number of messages that were dropped due to the No Local subscription option when sending">>},
<<"delivery.dropped">> => #{
type => <<"integer">>,
description => <<"Total number of discarded messages when sending">>},
<<"messages.delayed">> => #{
type => <<"integer">>,
description => <<"Number of delay- published messages stored by EMQ X Broker">>},
<<"messages.delivered">> => #{
type => <<"integer">>,
description => <<"Number of messages forwarded to the subscription process internally by EMQ X Broker">>},
<<"messages.dropped">> => #{
type => <<"integer">>,
description => <<"Total number of messages dropped by EMQ X Broker before forwarding to the subscription process">>},
<<"messages.dropped.expired">> => #{
type => <<"integer">>,
description => <<"Number of messages dropped due to message expiration when receiving">>},
<<"messages.dropped.no_subscribers">> => #{
type => <<"integer">>,
description => <<"Number of messages dropped due to no subscribers">>},
<<"messages.forward">> => #{
type => <<"integer">>,
description => <<"Number of messages forwarded to other nodes">>},
<<"messages.publish">> => #{
type => <<"integer">>,
description => <<"Number of messages published in addition to system messages">>},
<<"messages.qos0.received">> => #{
type => <<"integer">>,
description => <<"Number of QoS 0 messages received from clients">>},
<<"messages.qos1.received">> => #{
type => <<"integer">>,
description => <<"Number of QoS 1 messages received from clients">>},
<<"messages.qos2.received">> => #{
type => <<"integer">>,
description => <<"Number of QoS 2 messages received from clients">>},
<<"messages.qos0.sent">> => #{
type => <<"integer">>,
description => <<"Number of QoS 0 messages sent to clients">>},
<<"messages.qos1.sent">> => #{
type => <<"integer">>,
description => <<"Number of QoS 1 messages sent to clients">>},
<<"messages.qos2.sent">> => #{
type => <<"integer">>,
description => <<"Number of QoS 2 messages sent to clients">>},
<<"messages.received">> => #{
type => <<"integer">>,
description => <<"Number of messages received from the client, equal to the sum of messages.qos0.receivedmessages.qos1.received and messages.qos2.received">>},
<<"messages.sent">> => #{
type => <<"integer">>,
description => <<"Number of messages sent to the client, equal to the sum of messages.qos0.sentmessages.qos1.sent and messages.qos2.sent">>},
<<"messages.retained">> => #{
type => <<"integer">>,
description => <<"Number of retained messages stored by EMQ X Broker">>},
<<"messages.acked">> => #{
type => <<"integer">>,
description => <<"Number of received PUBACK and PUBREC packet">>},
<<"packets.received">> => #{
type => <<"integer">>,
description => <<"Number of received packet">>},
<<"packets.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent packet">>},
<<"packets.connect.received">> => #{
type => <<"integer">>,
description => <<"Number of received CONNECT packet">>},
<<"packets.connack.auth_error">> => #{
type => <<"integer">>,
description => <<"Number of received CONNECT packet with failed authentication">>},
<<"packets.connack.error">> => #{
type => <<"integer">>,
description => <<"Number of received CONNECT packet with unsuccessful connections">>},
<<"packets.connack.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent CONNACK packet">>},
<<"packets.publish.received">> => #{
type => <<"integer">>,
description => <<"Number of received PUBLISH packet">>},
<<"packets.publish.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent PUBLISH packet">>},
<<"packets.publish.inuse">> => #{
type => <<"integer">>,
description => <<"Number of received PUBLISH packet with occupied identifiers">>},
<<"packets.publish.auth_error">> => #{
type => <<"integer">>,
description => <<"Number of received PUBLISH packets with failed the ACL check">>},
<<"packets.publish.error">> => #{
type => <<"integer">>,
description => <<"Number of received PUBLISH packet that cannot be published">>},
<<"packets.publish.dropped">> => #{
type => <<"integer">>,
description => <<"Number of messages discarded due to the receiving limit">>},
<<"packets.puback.received">> => #{
type => <<"integer">>,
description => <<"Number of received PUBACK packet">>},
<<"packets.puback.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent PUBACK packet">>},
<<"packets.puback.inuse">> => #{
type => <<"integer">>,
description => <<"Number of received PUBACK packet with occupied identifiers">>},
<<"packets.puback.missed">> => #{
type => <<"integer">>,
description => <<"Number of received packet with identifiers.">>},
<<"packets.pubrec.received">> => #{
type => <<"integer">>,
description => <<"Number of received PUBREC packet">>},
<<"packets.pubrec.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent PUBREC packet">>},
<<"packets.pubrec.inuse">> => #{
type => <<"integer">>,
description => <<"Number of received PUBREC packet with occupied identifiers">>},
<<"packets.pubrec.missed">> => #{
type => <<"integer">>,
description => <<"Number of received PUBREC packet with unknown identifiers">>},
<<"packets.pubrel.received">> => #{
type => <<"integer">>,
description => <<"Number of received PUBREL packet">>},
<<"packets.pubrel.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent PUBREL packet">>},
<<"packets.pubrel.missed">> => #{
type => <<"integer">>,
description => <<"Number of received PUBREC packet with unknown identifiers">>},
<<"packets.pubcomp.received">> => #{
type => <<"integer">>,
description => <<"Number of received PUBCOMP packet">>},
<<"packets.pubcomp.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent PUBCOMP packet">>},
<<"packets.pubcomp.inuse">> => #{
type => <<"integer">>,
description => <<"Number of received PUBCOMP packet with occupied identifiers">>},
<<"packets.pubcomp.missed">> => #{
type => <<"integer">>,
description => <<"Number of missed PUBCOMP packet">>},
<<"packets.subscribe.received">> => #{
type => <<"integer">>,
description => <<"Number of received SUBSCRIBE packet">>},
<<"packets.subscribe.error">> => #{
type => <<"integer">>,
description => <<"Number of received SUBSCRIBE packet with failed subscriptions">>},
<<"packets.subscribe.auth_error">> => #{
type => <<"integer">>,
description => <<"Number of received SUBACK packet with failed ACL check">>},
<<"packets.suback.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent SUBACK packet">>},
<<"packets.unsubscribe.received">> => #{
type => <<"integer">>,
description => <<"Number of received UNSUBSCRIBE packet">>},
<<"packets.unsubscribe.error">> => #{
type => <<"integer">>,
description => <<"Number of received UNSUBSCRIBE packet with failed unsubscriptions">>},
<<"packets.unsuback.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent UNSUBACK packet">>},
<<"packets.pingreq.received">> => #{
type => <<"integer">>,
description => <<"Number of received PINGREQ packet">>},
<<"packets.pingresp.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent PUBRESP packet">>},
<<"packets.disconnect.received">> => #{
type => <<"integer">>,
description => <<"Number of received DISCONNECT packet">>},
<<"packets.disconnect.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent DISCONNECT packet">>},
<<"packets.auth.received">> => #{
type => <<"integer">>,
description => <<"Number of received AUTH packet">>},
<<"packets.auth.sent">> => #{
type => <<"integer">>,
description => <<"Number of sent AUTH packet">>},
<<"rules.matched">> => #{
type => <<"integer">>,
description => <<"Number of rule matched">>},
<<"session.created">> => #{
type => <<"integer">>,
description => <<"Number of sessions created">>},
<<"session.discarded">> => #{
type => <<"integer">>,
description => <<"Number of sessions dropped because Clean Session or Clean Start is true">>},
<<"session.resumed">> => #{
type => <<"integer">>,
description => <<"Number of sessions resumed because Clean Session or Clean Start is false">>},
<<"session.takeovered">> => #{
type => <<"integer">>,
description => <<"Number of sessions takeovered because Clean Session or Clean Start is false">>},
<<"session.terminated">> => #{
type => <<"integer">>,
description => <<"Number of terminated sessions">>}},
{DefinitionName, DefinitionProperties}.
metrics_api() ->
Metadata = #{
get => #{
description => "EMQ X metrics",
responses => #{
<<"200">> => #{
schema => cowboy_swagger:schema(<<"metrics">>)}}}},
{"/metrics", Metadata, list}.
%%%==============================================================================================
%% api apply
list(get, _) ->
Response = emqx_json:encode(emqx_mgmt:get_metrics()),
{200, Response}.

View File

@ -20,7 +20,9 @@
-export([api_spec/0]).
-export([ nodes/2
, node/2]).
, node/2
, node_metrics/2
, node_stats/2]).
-include_lib("emqx/include/emqx.hrl").
@ -29,9 +31,13 @@ api_spec() ->
apis() ->
[ nodes_api()
, node_api()].
, node_api()
, node_metrics_api()
, node_stats_api()].
schemas() ->
%% notice: node api used schema metrics and stats
%% see these schema in emqx_mgmt_api_metrics emqx_mgmt_api_status
[node_schema()].
node_schema() ->
@ -121,15 +127,60 @@ node_api() ->
schema => cowboy_swagger:schema(<<"node">>)}}}},
{"/nodes/:node_name", Metadata, node}.
node_metrics_api() ->
Metadata = #{
get => #{
description => "Get node metrics",
parameters => [#{
name => node_name,
in => path,
description => "node name",
type => string,
required => true,
default => node()}],
responses => #{
<<"400">> =>
emqx_mgmt_util:not_found_schema(<<"Node error">>, [<<"SOURCE_ERROR">>]),
<<"200">> => #{
description => <<"Get EMQ X Node Metrics">>,
schema => cowboy_swagger:schema(<<"metrics">>)}}}},
{"/nodes/:node_name/metrics", Metadata, node_metrics}.
node_stats_api() ->
Metadata = #{
get => #{
description => "Get node stats",
parameters => [#{
name => node_name,
in => path,
description => "node name",
type => string,
required => true,
default => node()}],
responses => #{
<<"400">> =>
emqx_mgmt_util:not_found_schema(<<"Node error">>, [<<"SOURCE_ERROR">>]),
<<"200">> => #{
description => <<"Get EMQ X Node Stats">>,
schema => cowboy_swagger:schema(<<"stats">>)}}}},
{"/nodes/:node_name/stats", Metadata, node_metrics}.
%%%==============================================================================================
%% parameters trans
nodes(get, _Request) ->
list(#{}).
node(get, Request) ->
NodeName = cowboy_req:binding(node_name, Request),
Node = binary_to_atom(NodeName, utf8),
get_node(#{node => Node}).
Params = node_name_path_parameter(Request),
get_node(Params).
node_metrics(get, Request) ->
Params = node_name_path_parameter(Request),
get_metrics(Params).
node_stats(get, Request) ->
Params = node_name_path_parameter(Request),
get_stats(Params).
%%%==============================================================================================
%% api apply
@ -147,8 +198,29 @@ get_node(#{node := Node}) ->
{200, Response}
end.
get_metrics(#{node := Node}) ->
case emqx_mgmt:get_metrics(Node) of
{error, _} ->
{400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})};
Metrics ->
{200, emqx_json:encode(Metrics)}
end.
get_stats(#{node := Node}) ->
case emqx_mgmt:get_stats(Node) of
{error, _} ->
{400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})};
Stats ->
{200, emqx_json:encode(Stats)}
end.
%%============================================================================================================
%% internal function
node_name_path_parameter(Request) ->
NodeName = cowboy_req:binding(node_name, Request),
Node = binary_to_atom(NodeName, utf8),
#{node => Node}.
format(_Node, Info = #{memory_total := Total, memory_used := Used}) ->
{ok, SysPathBinary} = file:get_cwd(),
SysPath = list_to_binary(SysPathBinary),

View File

@ -0,0 +1,161 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_publish).
%% API
-include_lib("emqx/include/emqx.hrl").
-behavior(minirest_api).
-export([api_spec/0]).
-export([ publish/2
, publish_batch/2]).
api_spec() ->
{
[publish_api(), publish_batch_api()],
[request_message_schema(), mqtt_message_schema()]
}.
publish_api() ->
MeteData = #{
post => #{
description => "publish",
parameters => [#{
name => message,
in => body,
required => true,
schema => minirest:ref(<<"request_message">>)
}],
responses => #{
<<"200">> => #{
description => <<"publish ok">>,
schema => minirest:ref(<<"message">>)}}}},
{"/publish", MeteData, publish}.
publish_batch_api() ->
MeteData = #{
post => #{
description => "publish",
parameters => [#{
name => message,
in => body,
required => true,
schema =>#{
type => array,
items => minirest:ref(<<"request_message">>)}
}],
responses => #{
<<"200">> => #{
description => <<"publish result">>,
schema => #{
type => array,
items => minirest:ref(<<"message">>)}}}}},
{"/publish_batch", MeteData, publish_batch}.
request_message_schema() ->
{<<"request_message">>, maps:without([<<"id">>], message_def())}.
mqtt_message_schema() ->
{<<"message">>, message_def()}.
message_def() ->
#{
<<"id">> => #{
type => <<"string">>,
description => <<"Message ID">>},
<<"topic">> => #{
type => <<"string">>,
description => <<"Topic">>},
<<"qos">> => #{
type => <<"integer">>,
enum => [0, 1, 2],
description => <<"Qos">>},
<<"payload">> => #{
type => <<"string">>,
description => <<"Topic">>},
<<"from">> => #{
type => <<"string">>,
description => <<"Message from">>},
<<"flag">> => #{
type => <<"object">>,
description => <<"Message flag">>,
properties => #{
<<"sys">> => #{
type => <<"boolean">>,
default => false,
description => <<"System message flag, nullable, default false">>},
<<"dup">> => #{
type => <<"boolean">>,
default => false,
description => <<"Dup message flag, nullable, default false">>},
<<"retain">> => #{
type => <<"boolean">>,
default => false,
description => <<"Retain message flag, nullable, default false">>}}}
}.
publish(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Message = message(emqx_json:decode(Body, [return_maps])),
_ = emqx_mgmt:publish(Message),
{200, emqx_json:encode(format_message(Message))}.
publish_batch(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Messages = messages(emqx_json:decode(Body, [return_maps])),
_ = [emqx_mgmt:publish(Message) || Message <- Messages],
ResponseBody = emqx_json:encode(format_message(Messages)),
{200, ResponseBody}.
message(Map) ->
From = maps:get(<<"from">>, Map, http_api),
QoS = maps:get(<<"qos">>, Map, 0),
Topic = maps:get(<<"topic">>, Map),
Payload = maps:get(<<"payload">>, Map),
Flags = flags(Map),
emqx_message:make(From, QoS, Topic, Payload, Flags, #{}).
flags(Map) ->
Flags = maps:get(<<"flags">>, Map, #{}),
Retain = maps:get(<<"retain">>, Flags, false),
Sys = maps:get(<<"sys">>, Flags, false),
Dup = maps:get(<<"dup">>, Flags, false),
#{
retain => Retain,
sys => Sys,
dup => Dup
}.
messages(List) ->
[message(MessageMap) || MessageMap <- List].
format_message(Messages) when is_list(Messages)->
[format_message(Message) || Message <- Messages];
format_message(#message{id = ID, qos = Qos, from = From, topic = Topic, payload = Payload, flags = Flags}) ->
#{
id => emqx_guid:to_hexstr(ID),
qos => Qos,
topic => Topic,
payload => Payload,
flag => Flags,
from => to_binary(From)
}.
to_binary(Data) when is_binary(Data) ->
Data;
to_binary(Data) ->
list_to_binary(io_lib:format("~p", [Data])).

View File

@ -13,33 +13,93 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_stats).
-rest_api(#{name => list_stats,
method => 'GET',
path => "/stats/",
func => list,
descr => "A list of stats of all nodes in the cluster"}).
-behavior(minirest_api).
-rest_api(#{name => lookup_node_stats,
method => 'GET',
path => "/nodes/:atom:node/stats/",
func => lookup,
descr => "A list of stats of a node"}).
-export([api_spec/0]).
-export([ list/2
, lookup/2
]).
-export([list/2]).
%% List stats of all nodes
list(Bindings, _Params) when map_size(Bindings) == 0 ->
emqx_mgmt:return({ok, [#{node => Node, stats => maps:from_list(Stats)}
|| {Node, Stats} <- emqx_mgmt:get_stats()]}).
api_spec() ->
{stats_api(), stats_schema()}.
%% List stats of a node
lookup(#{node := Node}, _Params) ->
case emqx_mgmt:get_stats(Node) of
{error, Reason} -> emqx_mgmt:return({error, Reason});
Stats -> emqx_mgmt:return({ok, maps:from_list(Stats)})
end.
stats_schema() ->
DefinitionName = <<"stats">>,
DefinitionProperties = #{
<<"connections.count">> => #{
type => <<"integer">>,
description => <<"Number of current connections">>},
<<"connections.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of connections">>},
<<"channels.count">> => #{
type => <<"integer">>,
description => <<"sessions.count">>},
<<"channels.max">> => #{
type => <<"integer">>,
description => <<"session.max">>},
<<"sessions.count">> => #{
type => <<"integer">>,
description => <<"Number of current sessions">>},
<<"sessions.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of sessions">>},
<<"topics.count">> => #{
type => <<"integer">>,
description => <<"Number of current topics">>},
<<"topics.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of topics">>},
<<"suboptions.count">> => #{
type => <<"integer">>,
description => <<"subscriptions.count">>},
<<"suboptions.max">> => #{
type => <<"integer">>,
description => <<"subscriptions.max">>},
<<"subscribers.count">> => #{
type => <<"integer">>,
description => <<"Number of current subscribers">>},
<<"subscribers.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of subscribers">>},
<<"subscriptions.count">> => #{
type => <<"integer">>,
description => <<"Number of current subscriptions, including shared subscriptions">>},
<<"subscriptions.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of subscriptions">>},
<<"subscriptions.shared.count">> => #{
type => <<"integer">>,
description => <<"Number of current shared subscriptions">>},
<<"subscriptions.shared.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of shared subscriptions">>},
<<"routes.count">> => #{
type => <<"integer">>,
description => <<"Number of current routes">>},
<<"routes.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of routes">>},
<<"retained.count">> => #{
type => <<"integer">>,
description => <<"Number of currently retained messages">>},
<<"retained.max">> => #{
type => <<"integer">>,
description => <<"Historical maximum number of retained messages">>}},
[{DefinitionName, DefinitionProperties}].
stats_api() ->
Metadata = #{
get => #{
description => "EMQ X stats",
responses => #{
<<"200">> => #{
schema => cowboy_swagger:schema(<<"stats">>)}}}},
[{"/stats", Metadata, list}].
%%%==============================================================================================
%% api apply
list(get, _Request) ->
Response = emqx_json:encode(emqx_mgmt:get_stats()),
{200, Response}.

View File

@ -21,11 +21,13 @@
, kmg/1
, ntoa/1
, merge_maps/2
, not_found_schema/1
, not_found_schema/2
, batch_operation/3
]).
-export([ not_found_schema/1
, not_found_schema/2
, batch_response_schema/1]).
-export([urldecode/1]).
-define(KB, 1024).
@ -80,6 +82,8 @@ merge_maps(Default, New) ->
urldecode(S) ->
emqx_http_lib:uri_decode(S).
%%%==============================================================================================
%% schema util
not_found_schema(Description) ->
not_found_schema(Description, ["RESOURCE_NOT_FOUND"]).
@ -96,6 +100,34 @@ not_found_schema(Description, Enum) ->
type => string}}}
}.
batch_response_schema(DefName) ->
#{
type => object,
properties => #{
success => #{
type => integer,
description => <<"Success count">>},
failed => #{
type => integer,
description => <<"Failed count">>},
detail => #{
type => array,
description => <<"Failed object & reason">>,
items => #{
type => object,
properties =>
#{
data => minirest:ref(DefName),
reason => #{
type => <<"string">>
}
}
}
}
}
}.
%%%==============================================================================================
batch_operation(Module, Function, ArgsList) ->
Failed = batch_operation(Module, Function, ArgsList, []),
Len = erlang:length(Failed),

View File

@ -0,0 +1,52 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_metrics_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_metrics_api(_) ->
MetricsPath = emqx_mgmt_api_test_util:api_path(["metrics"]),
SystemMetrics = emqx_mgmt:get_metrics(),
{ok, MetricsResponse} = emqx_mgmt_api_test_util:request_api(get, MetricsPath),
Metrics = emqx_json:decode(MetricsResponse, [return_maps]),
?assertEqual(erlang:length(maps:keys(SystemMetrics)), erlang:length(maps:keys(Metrics))),
Fun =
fun(Key) ->
?assertEqual(maps:get(Key, SystemMetrics), maps:get(atom_to_binary(Key, utf8), Metrics))
end,
lists:foreach(Fun, maps:keys(SystemMetrics)).

View File

@ -20,11 +20,6 @@
-include_lib("eunit/include/eunit.hrl").
-define(APP, emqx_management).
-define(SERVER, "http://127.0.0.1:8081").
-define(BASE_PATH, "/api/v5").
all() ->
emqx_ct:all(?MODULE).
@ -54,5 +49,29 @@ t_nodes_api(_) ->
NodePath = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_list(node())]),
{ok, NodeInfo} = emqx_mgmt_api_test_util:request_api(get, NodePath),
NodeNameResponse = binary_to_atom(maps:get(<<"node">>, emqx_json:decode(NodeInfo, [return_maps])), utf8),
NodeNameResponse =
binary_to_atom(maps:get(<<"node">>, emqx_json:decode(NodeInfo, [return_maps])), utf8),
?assertEqual(node(), NodeNameResponse).
t_node_stats_api() ->
StatsPath = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "stats"]),
SystemStats= emqx_mgmt:get_stats(),
{ok, StatsResponse} = emqx_mgmt_api_test_util:request_api(get, StatsPath),
Stats = emqx_json:decode(StatsResponse, [return_maps]),
Fun =
fun(Key) ->
?assertEqual(maps:get(Key, SystemStats), maps:get(atom_to_binary(Key, utf8), Stats))
end,
lists:foreach(Fun, maps:keys(SystemStats)).
t_node_metrics_api() ->
MetricsPath =
emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "metrics"]),
SystemMetrics= emqx_mgmt:get_metrics(),
{ok, MetricsResponse} = emqx_mgmt_api_test_util:request_api(get, MetricsPath),
Metrics = emqx_json:decode(MetricsResponse, [return_maps]),
Fun =
fun(Key) ->
?assertEqual(maps:get(Key, SystemMetrics), maps:get(atom_to_binary(Key, utf8), Metrics))
end,
lists:foreach(Fun, maps:keys(SystemMetrics)).

View File

@ -0,0 +1,92 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_publish_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
-define(TOPIC1, <<"api_topic1">>).
-define(TOPIC2, <<"api_topic2">>).
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_publish_api(_) ->
{ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Body = #{topic => ?TOPIC1, payload => Payload},
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok),
emqtt:disconnect(Client).
t_publish_batch_api(_) ->
{ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish_batch"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Body =[#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}],
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
ResponseMap = emqx_json:decode(Response, [return_maps]),
?assertEqual(2, erlang:length(ResponseMap)),
?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok),
?assertEqual(receive_assert(?TOPIC2, 0, Payload), ok),
emqtt:disconnect(Client).
receive_assert(Topic, Qos, Payload) ->
receive
{publish, Message} ->
ReceiveTopic = maps:get(topic, Message),
ReceiveQos = maps:get(qos, Message),
ReceivePayload = maps:get(payload, Message),
?assertEqual(ReceiveTopic , Topic),
?assertEqual(ReceiveQos , Qos),
?assertEqual(ReceivePayload , Payload),
ok
after 5000 ->
timeout
end.

View File

@ -0,0 +1,52 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_stats_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_stats_api(_) ->
StatsPath = emqx_mgmt_api_test_util:api_path(["stats"]),
SystemStats = emqx_mgmt:get_stats(),
{ok, StatsResponse} = emqx_mgmt_api_test_util:request_api(get, StatsPath),
Stats = emqx_json:decode(StatsResponse, [return_maps]),
?assertEqual(erlang:length(maps:keys(SystemStats)), erlang:length(maps:keys(Stats))),
Fun =
fun(Key) ->
?assertEqual(maps:get(Key, SystemStats), maps:get(atom_to_binary(Key, utf8), Stats))
end,
lists:foreach(Fun, maps:keys(SystemStats)).

View File

@ -48,7 +48,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.3"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.1"}}}