feat(api): support stats/metrics API aggregate

This commit is contained in:
Turtle 2021-07-29 20:44:30 +08:00 committed by turtleDeng
parent dcfc705811
commit 740b729778
6 changed files with 450 additions and 324 deletions

View File

@ -43,6 +43,8 @@
, lookup_client/3 , lookup_client/3
, kickout_client/1 , kickout_client/1
, list_authz_cache/1 , list_authz_cache/1
, list_client_subscriptions/1
, client_subscriptions/2
, clean_authz_cache/1 , clean_authz_cache/1
, clean_authz_cache/2 , clean_authz_cache/2
, clean_authz_cache_all/0 , clean_authz_cache_all/0
@ -269,6 +271,23 @@ kickout_client(Node, ClientId) ->
list_authz_cache(ClientId) -> list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache). call_client(ClientId, list_authz_cache).
list_client_subscriptions(ClientId) ->
Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
Expected = lists:filter(fun({error, _}) -> false;
([]) -> false;
(_) -> true
end, Results),
case Expected of
[] -> [];
[Result|_] -> Result
end.
client_subscriptions(Node, ClientId) when Node =:= node() ->
emqx_broker:subscriptions(ClientId);
client_subscriptions(Node, ClientId) ->
rpc_call(Node, client_subscriptions, [Node, ClientId]).
clean_authz_cache(ClientId) -> clean_authz_cache(ClientId) ->
Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of case lists:any(fun(Item) -> Item =:= ok end, Results) of

View File

@ -29,6 +29,7 @@
-export([ clients/2 -export([ clients/2
, client/2 , client/2
, subscriptions/2
, authz_cache/2 , authz_cache/2
, subscribe/2 , subscribe/2
, subscribe_batch/2]). , subscribe_batch/2]).
@ -68,6 +69,7 @@ apis() ->
[ clients_api() [ clients_api()
, client_api() , client_api()
, clients_authz_cache_api() , clients_authz_cache_api()
, clients_subscriptions_api()
, subscribe_api()]. , subscribe_api()].
schemas() -> schemas() ->
@ -209,7 +211,17 @@ schemas() ->
} }
} }
}, },
[Client, AuthzCache]. Subscription = #{
subscription => #{
type => object,
properties => #{
topic => #{
type => string},
qos => #{
type => integer,
enum => [0,1,2]}}}
},
[Client, AuthzCache, Subscription].
clients_api() -> clients_api() ->
Metadata = #{ Metadata = #{
@ -271,6 +283,21 @@ clients_authz_cache_api() ->
<<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients 200 OK">>)}}}, <<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients 200 OK">>)}}},
{"/clients/:clientid/authz_cache", Metadata, authz_cache}. {"/clients/:clientid/authz_cache", Metadata, authz_cache}.
clients_subscriptions_api() ->
Metadata = #{
get => #{
description => <<"Get client subscriptions">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}}
},
{"/clients/:clientid/subscriptions", Metadata, subscriptions}.
subscribe_api() -> subscribe_api() ->
Metadata = #{ Metadata = #{
post => #{ post => #{
@ -363,6 +390,14 @@ subscribe_batch(post, Request) ->
end || TopicInfo <- TopicInfos], end || TopicInfo <- TopicInfos],
subscribe_batch(#{clientid => ClientID, topics => Topics}). subscribe_batch(#{clientid => ClientID, topics => Topics}).
subscriptions(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
Subs0 = emqx_mgmt:list_client_subscriptions(ClientID),
Subs = lists:map(fun({Topic, SubOpts}) ->
#{topic => Topic, qos => maps:get(qos, SubOpts)}
end, Subs0),
{200, Subs}.
%%%============================================================================================== %%%==============================================================================================
%% api apply %% api apply

View File

@ -26,10 +26,26 @@ api_spec() ->
{[metrics_api()], [metrics_schema()]}. {[metrics_api()], [metrics_schema()]}.
metrics_schema() -> metrics_schema() ->
#{ Metric = #{
metrics => #{
type => object, type => object,
properties => #{ properties => properties()
},
Metrics = #{
type => array,
items => #{
type => object,
properties => properties()
}
},
MetricsInfo = #{
oneOf => [ minirest:ref(<<"metric">>)
, minirest:ref(<<"metrics">>)
]
},
#{metric => Metric, metrics => Metrics, metrics_info => MetricsInfo}.
properties() ->
#{
'actions.failure' => #{ 'actions.failure' => #{
type => integer, type => integer,
description => <<"Number of failure executions of the rule engine action">>}, description => <<"Number of failure executions of the rule engine action">>},
@ -276,19 +292,40 @@ metrics_schema() ->
'session.terminated' => #{ 'session.terminated' => #{
type => integer, type => integer,
description => <<"Number of terminated sessions">>} description => <<"Number of terminated sessions">>}
}
}
}. }.
metrics_api() -> metrics_api() ->
Metadata = #{ Metadata = #{
get => #{ get => #{
description => <<"EMQ X metrics">>, description => <<"EMQ X metrics">>,
parameters => [#{
name => aggregate,
in => query,
schema => #{type => boolean}
}],
responses => #{ responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"List all metrics">>, metrics)}}}, <<"200">> => #{
description => <<"List all metrics">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"metrics_info">>)
}
}
}
}
}
},
{"/metrics", Metadata, list}. {"/metrics", Metadata, list}.
%%%============================================================================================== %%%==============================================================================================
%% api apply %% api apply
list(get, _) -> list(get, Request) ->
{200, emqx_mgmt:get_metrics()}. Params = cowboy_req:parse_qs(Request),
case proplists:get_value(<<"aggregate">>, Params, undefined) of
<<"true">> ->
{200, emqx_mgmt:get_metrics()};
_ ->
Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) ||
Node <- ekka_mnesia:running_nodes()],
{200, Data}
end.

View File

@ -22,13 +22,29 @@
-export([list/2]). -export([list/2]).
api_spec() -> api_spec() ->
{[stats_api()], [stats_schema()]}. {[stats_api()], stats_schema()}.
stats_schema() -> stats_schema() ->
#{ Stats = #{
stats => #{ type => array,
items => #{
type => object, type => object,
properties => #{ properties => maps:put('node', #{type => string, description => <<"Node">>}, properties())
}
},
Stat = #{
type => object,
properties => properties()
},
StatsInfo =#{
oneOf => [ minirest:ref(<<"stats">>)
, minirest:ref(<<"stat">>)
]
},
[#{stats => Stats, stat => Stat, stats_info => StatsInfo}].
properties() ->
#{
'connections.count' => #{ 'connections.count' => #{
type => integer, type => integer,
description => <<"Number of current connections">>}, description => <<"Number of current connections">>},
@ -89,19 +105,38 @@ stats_schema() ->
'retained.max' => #{ 'retained.max' => #{
type => integer, type => integer,
description => <<"Historical maximum number of retained messages">>} description => <<"Historical maximum number of retained messages">>}
}
}
}. }.
stats_api() -> stats_api() ->
Metadata = #{ Metadata = #{
get => #{ get => #{
description => <<"EMQ X stats">>, description => <<"EMQ X stats">>,
parameters => [#{
name => aggregate,
in => query,
schema => #{type => boolean}
}],
responses => #{ responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"List stats ok">>, stats)}}}, <<"200">> => #{
description => <<"List stats ok">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"stats_info">>)
}
}
}
}}},
{"/stats", Metadata, list}. {"/stats", Metadata, list}.
%%%============================================================================================== %%%==============================================================================================
%% api apply %% api apply
list(get, _Request) -> list(get, Request) ->
{200, emqx_mgmt:get_stats()}. Params = cowboy_req:parse_qs(Request),
case proplists:get_value(<<"aggregate">>, Params, undefined) of
<<"true">> ->
{200, emqx_mgmt:get_stats()};
_ ->
Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) ||
Node <- ekka_mnesia:running_nodes()],
{200, Data}
end.

View File

@ -40,7 +40,7 @@ set_special_configs(_App) ->
ok. ok.
t_metrics_api(_) -> t_metrics_api(_) ->
MetricsPath = emqx_mgmt_api_test_util:api_path(["metrics"]), MetricsPath = emqx_mgmt_api_test_util:api_path(["metrics?aggregate=true"]),
SystemMetrics = emqx_mgmt:get_metrics(), SystemMetrics = emqx_mgmt:get_metrics(),
{ok, MetricsResponse} = emqx_mgmt_api_test_util:request_api(get, MetricsPath), {ok, MetricsResponse} = emqx_mgmt_api_test_util:request_api(get, MetricsPath),
Metrics = emqx_json:decode(MetricsResponse, [return_maps]), Metrics = emqx_json:decode(MetricsResponse, [return_maps]),

View File

@ -40,7 +40,7 @@ set_special_configs(_App) ->
ok. ok.
t_stats_api(_) -> t_stats_api(_) ->
StatsPath = emqx_mgmt_api_test_util:api_path(["stats"]), StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]),
SystemStats = emqx_mgmt:get_stats(), SystemStats = emqx_mgmt:get_stats(),
{ok, StatsResponse} = emqx_mgmt_api_test_util:request_api(get, StatsPath), {ok, StatsResponse} = emqx_mgmt_api_test_util:request_api(get, StatsPath),
Stats = emqx_json:decode(StatsResponse, [return_maps]), Stats = emqx_json:decode(StatsResponse, [return_maps]),