diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index e068c5384..fbf926540 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -65,10 +65,10 @@ count(Table, Nodes) -> lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]). page(Params) -> - binary_to_integer(proplists:get_value(<<"_page">>, Params, <<"1">>)). + binary_to_integer(proplists:get_value(<<"page">>, Params, <<"1">>)). limit(Params) -> - case proplists:get_value(<<"_limit">>, Params) of + case proplists:get_value(<<"limit">>, Params) of undefined -> emqx_mgmt:max_row_limit(); Size -> binary_to_integer(Size) end. @@ -204,7 +204,7 @@ params2qs(Params, QsSchema) -> {length(Qs) + length(Fuzzy), {Qs, Fuzzy}}. %%-------------------------------------------------------------------- -%% Intenal funcs +%% Internal funcs pick_params_to_qs([], _, Acc1, Acc2) -> NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)], @@ -215,12 +215,12 @@ pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) -> undefined -> pick_params_to_qs(Params, QsKits, Acc1, Acc2); Type -> case Key of - <> - when Prefix =:= <<"_gte_">>; - Prefix =:= <<"_lte_">> -> + <> + when Prefix =:= <<"gte_">>; + Prefix =:= <<"lte_">> -> OpposeKey = case Prefix of - <<"_gte_">> -> <<"_lte_", NKey/binary>>; - <<"_lte_">> -> <<"_gte_", NKey/binary>> + <<"gte_">> -> <<"lte_", NKey/binary>>; + <<"lte_">> -> <<"gte_", NKey/binary>> end, case lists:keytake(OpposeKey, 1, Params) of false -> @@ -252,20 +252,20 @@ qs(K, Value0, Type) -> throw({bad_value_type, {K, Type, Value0}}) end. -qs(<<"_gte_", Key/binary>>, Value) -> +qs(<<"gte_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), '>=', Value}; -qs(<<"_lte_", Key/binary>>, Value) -> +qs(<<"lte_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), '=<', Value}; -qs(<<"_like_", Key/binary>>, Value) -> +qs(<<"like_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), like, Value}; -qs(<<"_match_", Key/binary>>, Value) -> +qs(<<"match_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), match, Value}; qs(Key, Value) -> {binary_to_existing_atom(Key, utf8), '=:=', Value}. -is_fuzzy_key(<<"_like_", _/binary>>) -> +is_fuzzy_key(<<"like_", _/binary>>) -> true; -is_fuzzy_key(<<"_match_", _/binary>>) -> +is_fuzzy_key(<<"match_", _/binary>>) -> true; is_fuzzy_key(_) -> false. @@ -317,18 +317,18 @@ params2qs_test() -> {<<"int">>, integer}, {<<"atom">>, atom}, {<<"ts">>, timestamp}, - {<<"_gte_range">>, integer}, - {<<"_lte_range">>, integer}, - {<<"_like_fuzzy">>, binary}, - {<<"_match_topic">>, binary}], + {<<"gte_range">>, integer}, + {<<"lte_range">>, integer}, + {<<"like_fuzzy">>, binary}, + {<<"match_topic">>, binary}], Params = [{<<"str">>, <<"abc">>}, {<<"int">>, <<"123">>}, {<<"atom">>, <<"connected">>}, {<<"ts">>, <<"156000">>}, - {<<"_gte_range">>, <<"1">>}, - {<<"_lte_range">>, <<"5">>}, - {<<"_like_fuzzy">>, <<"user">>}, - {<<"_match_topic">>, <<"t/#">>}], + {<<"gte_range">>, <<"1">>}, + {<<"lte_range">>, <<"5">>}, + {<<"like_fuzzy">>, <<"user">>}, + {<<"match_topic">>, <<"t/#">>}], ExpectedQs = [{str, '=:=', <<"abc">>}, {int, '=:=', 123}, {atom, '=:=', connected}, diff --git a/apps/emqx_management/src/emqx_mgmt_api_apps.erl b/apps/emqx_management/src/emqx_mgmt_api_apps.erl index 396c05696..46dd34432 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_apps.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_apps.erl @@ -122,8 +122,7 @@ app_api() -> name => app_id, in => path, required => true, - schema => #{type => string}, - default => <<"admin">> + schema => #{type => string} }], 'requestBody' => emqx_mgmt_util:request_body_schema(app_without_secret_schema()), responses => #{ diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 0fcd7404a..bd73f381d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -48,12 +48,12 @@ , {<<"clean_start">>, atom} , {<<"proto_name">>, binary} , {<<"proto_ver">>, integer} - , {<<"_like_clientid">>, binary} - , {<<"_like_username">>, binary} - , {<<"_gte_created_at">>, timestamp} - , {<<"_lte_created_at">>, timestamp} - , {<<"_gte_connected_at">>, timestamp} - , {<<"_lte_connected_at">>, timestamp}]}). + , {<<"like_clientid">>, binary} + , {<<"like_username">>, binary} + , {<<"gte_created_at">>, timestamp} + , {<<"lte_created_at">>, timestamp} + , {<<"gte_connected_at">>, timestamp} + , {<<"lte_connected_at">>, timestamp}]}). -define(query_fun, {?MODULE, query}). -define(format_fun, {?MODULE, format_channel_info}). diff --git a/apps/emqx_management/src/emqx_mgmt_api_routes.erl b/apps/emqx_management/src/emqx_mgmt_api_routes.erl index 2b514eb51..73c78b199 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_routes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_routes.erl @@ -54,15 +54,13 @@ routes_api() -> name => page, in => query, description => <<"Page">>, - schema => #{type => integer}, - default => 1 + schema => #{type => integer, default => 1} }, #{ name => limit, in => query, description => <<"Page size">>, - schema => #{type => integer}, - default => emqx_mgmt:max_row_limit() + schema => #{type => integer, default => emqx_mgmt:max_row_limit()} }], responses => #{ <<"200">> => diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 3f563427b..ef1382caa 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -16,79 +16,107 @@ -module(emqx_mgmt_api_subscriptions). +-behavior(minirest_api). + -include_lib("emqx/include/emqx.hrl"). --define(SUBS_QS_SCHEMA, {emqx_suboption, - [{<<"clientid">>, binary}, - {<<"topic">>, binary}, - {<<"share">>, binary}, - {<<"qos">>, integer}, - {<<"_match_topic">>, binary}]}). +-export([api_spec/0]). --rest_api(#{name => list_subscriptions, - method => 'GET', - path => "/subscriptions/", - func => list, - descr => "A list of subscriptions in the cluster"}). - --rest_api(#{name => list_node_subscriptions, - method => 'GET', - path => "/nodes/:atom:node/subscriptions/", - func => list, - descr => "A list of subscriptions on a node"}). - --rest_api(#{name => lookup_client_subscriptions, - method => 'GET', - path => "/subscriptions/:bin:clientid", - func => lookup, - descr => "A list of subscriptions of a client"}). - --rest_api(#{name => lookup_client_subscriptions_with_node, - method => 'GET', - path => "/nodes/:atom:node/subscriptions/:bin:clientid", - func => lookup, - descr => "A list of subscriptions of a client on the node"}). - --export([ list/2 - , lookup/2 - ]). +-export([subscriptions/2]). -export([ query/3 , format/1 ]). +-define(SUBS_QS_SCHEMA, {emqx_suboption, + [ {<<"clientid">>, binary} + , {<<"topic">>, binary} + , {<<"share">>, binary} + , {<<"qos">>, integer} + , {<<"match_topic">>, binary}]}). + -define(query_fun, {?MODULE, query}). -define(format_fun, {?MODULE, format}). -list(Bindings, Params) when map_size(Bindings) == 0 -> - case proplists:get_value(<<"topic">>, Params) of - undefined -> - emqx_mgmt:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}); - Topic -> - emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)}) - end; +api_spec() -> + { + [subscriptions_api()], + [subscription_schema()] + }. -list(#{node := Node} = Bindings, Params) -> - case proplists:get_value(<<"topic">>, Params) of - undefined -> - case Node =:= node() of - true -> - emqx_mgmt:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)}); - false -> - case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of - {badrpc, Reason} -> emqx_mgmt:return({error, Reason}); - Res -> Res - end - end; - Topic -> - emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)}) - end. +subscriptions_api() -> + MetaData = #{ + get => #{ + description => "List subscriptions", + parameters => [ + #{ + name => page, + in => query, + description => <<"Page">>, + schema => #{type => integer} + }, + #{ + name => limit, + in => query, + description => <<"Page size">>, + schema => #{type => integer} + }, + #{ + name => clientid, + in => query, + description => <<"Client ID">>, + schema => #{type => string} + }, + #{ + name => qos, + in => query, + description => <<"QoS">>, + schema => #{type => integer} + }, + #{ + name => share, + in => query, + description => <<"Shared subscription">>, + schema => #{type => boolean} + }, + #{ + name => topic, + in => query, + description => <<"Topic">>, + schema => #{type => string} + } + #{ + name => match_topic, + in => query, + description => <<"Match topic string">>, + schema => #{type => string} + } + ], + responses => #{ + <<"200">> => emqx_mgmt_util:response_page_schema(<<"subscription">>)}}}, + {"/subscriptions", MetaData, subscriptions}. -lookup(#{node := Node, clientid := ClientId}, _Params) -> - emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))}); +subscription_schema() -> + #{ + subscription => #{ + type => object, + properties => #{ + topic => #{ + type => string}, + clientid => #{ + type => string}, + qos => #{ + type => integer, + enum => [0,1,2]}}} + }. + +subscriptions(get, Request) -> + Params = cowboy_req:parse_qs(Request), + list(Params). + +list(Params) -> + {200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}. -lookup(#{clientid := ClientId}, _Params) -> - emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}). format(Items) when is_list(Items) -> [format(Item) || Item <- Items]; @@ -98,10 +126,10 @@ format({{Subscriber, Topic}, Options}) -> format({_Subscriber, Topic, Options = #{share := Group}}) -> QoS = maps:get(qos, Options), - #{node => node(), topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; + #{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; format({_Subscriber, Topic, Options}) -> QoS = maps:get(qos, Options), - #{node => node(), topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. + #{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. %%-------------------------------------------------------------------- %% Query Function diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index ea6570d79..0220065c9 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -31,7 +31,8 @@ , response_array_schema/2 , response_error_schema/1 , response_error_schema/2 - , batch_response_schema/1]). + , response_page_schema/1 + , response_batch_schema/1]). -export([urldecode/1]). @@ -128,7 +129,25 @@ response_error_schema(Description, Enum) -> type => string}}}, json_content_schema(Description, Schema). -batch_response_schema(DefName) when is_binary(DefName) -> +response_page_schema(Def) when is_binary(Def) -> + Schema = #{ + type => object, + properties => #{ + meta => #{ + type => object, + properties => #{ + page => #{ + type => integer}, + limit => #{ + type => integer}, + count => #{ + type => integer}}}, + data => #{ + type => array, + items => minirest:ref(Def)}}}, + json_content_schema("", Schema). + +response_batch_schema(DefName) when is_binary(DefName) -> Schema = #{ type => object, properties => #{ diff --git a/apps/emqx_management/test/emqx_mgmt_subscription_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_subscription_api_SUITE.erl new file mode 100644 index 000000000..6d56f21c4 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_subscription_api_SUITE.erl @@ -0,0 +1,74 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_subscription_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(CLIENTID, <<"api_clientid">>). +-define(USERNAME, <<"api_username">>). + +%% notice: integer topic for sort response +-define(TOPIC1, <<"0000">>). +-define(TOPIC2, <<"0001">>). + + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_subscription_api(_) -> + {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID}), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), + Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), + {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), + Data = emqx_json:decode(Response, [return_maps]), + Meta = maps:get(<<"meta">>, Data), + ?assertEqual(1, maps:get(<<"page">>, Meta)), + ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)), + ?assertEqual(2, maps:get(<<"count">>, Meta)), + Subscriptions = maps:get(<<"data">>, Data), + ?assertEqual(length(Subscriptions), 2), + Sort = + fun(#{<<"topic">> := T1}, #{<<"topic">> := T2}) -> + binary_to_integer(T1) =< binary_to_integer(T2) + end, + [Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions), + ?assertEqual(maps:get(<<"topic">>, Subscriptions1), ?TOPIC1), + ?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2), + ?assertEqual(maps:get(<<"clientid">>, Subscriptions1), ?CLIENTID), + ?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID), + emqtt:disconnect(Client).