refactor: sub api; fix: fuzzy function (#5287)

This commit is contained in:
DDDHuang 2021-07-23 13:49:51 +08:00 committed by GitHub
parent 419036bd9a
commit 0cf4723c73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 215 additions and 97 deletions

View File

@ -65,10 +65,10 @@ count(Table, Nodes) ->
lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]). lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]).
page(Params) -> page(Params) ->
binary_to_integer(proplists:get_value(<<"_page">>, Params, <<"1">>)). binary_to_integer(proplists:get_value(<<"page">>, Params, <<"1">>)).
limit(Params) -> limit(Params) ->
case proplists:get_value(<<"_limit">>, Params) of case proplists:get_value(<<"limit">>, Params) of
undefined -> emqx_mgmt:max_row_limit(); undefined -> emqx_mgmt:max_row_limit();
Size -> binary_to_integer(Size) Size -> binary_to_integer(Size)
end. end.
@ -204,7 +204,7 @@ params2qs(Params, QsSchema) ->
{length(Qs) + length(Fuzzy), {Qs, Fuzzy}}. {length(Qs) + length(Fuzzy), {Qs, Fuzzy}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Intenal funcs %% Internal funcs
pick_params_to_qs([], _, Acc1, Acc2) -> pick_params_to_qs([], _, Acc1, Acc2) ->
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)], NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
@ -215,12 +215,12 @@ pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) ->
undefined -> pick_params_to_qs(Params, QsKits, Acc1, Acc2); undefined -> pick_params_to_qs(Params, QsKits, Acc1, Acc2);
Type -> Type ->
case Key of case Key of
<<Prefix:5/binary, NKey/binary>> <<Prefix:4/binary, NKey/binary>>
when Prefix =:= <<"_gte_">>; when Prefix =:= <<"gte_">>;
Prefix =:= <<"_lte_">> -> Prefix =:= <<"lte_">> ->
OpposeKey = case Prefix of OpposeKey = case Prefix of
<<"_gte_">> -> <<"_lte_", NKey/binary>>; <<"gte_">> -> <<"lte_", NKey/binary>>;
<<"_lte_">> -> <<"_gte_", NKey/binary>> <<"lte_">> -> <<"gte_", NKey/binary>>
end, end,
case lists:keytake(OpposeKey, 1, Params) of case lists:keytake(OpposeKey, 1, Params) of
false -> false ->
@ -252,20 +252,20 @@ qs(K, Value0, Type) ->
throw({bad_value_type, {K, Type, Value0}}) throw({bad_value_type, {K, Type, Value0}})
end. end.
qs(<<"_gte_", Key/binary>>, Value) -> qs(<<"gte_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), '>=', Value}; {binary_to_existing_atom(Key, utf8), '>=', Value};
qs(<<"_lte_", Key/binary>>, Value) -> qs(<<"lte_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), '=<', Value}; {binary_to_existing_atom(Key, utf8), '=<', Value};
qs(<<"_like_", Key/binary>>, Value) -> qs(<<"like_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), like, Value}; {binary_to_existing_atom(Key, utf8), like, Value};
qs(<<"_match_", Key/binary>>, Value) -> qs(<<"match_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), match, Value}; {binary_to_existing_atom(Key, utf8), match, Value};
qs(Key, Value) -> qs(Key, Value) ->
{binary_to_existing_atom(Key, utf8), '=:=', Value}. {binary_to_existing_atom(Key, utf8), '=:=', Value}.
is_fuzzy_key(<<"_like_", _/binary>>) -> is_fuzzy_key(<<"like_", _/binary>>) ->
true; true;
is_fuzzy_key(<<"_match_", _/binary>>) -> is_fuzzy_key(<<"match_", _/binary>>) ->
true; true;
is_fuzzy_key(_) -> is_fuzzy_key(_) ->
false. false.
@ -317,18 +317,18 @@ params2qs_test() ->
{<<"int">>, integer}, {<<"int">>, integer},
{<<"atom">>, atom}, {<<"atom">>, atom},
{<<"ts">>, timestamp}, {<<"ts">>, timestamp},
{<<"_gte_range">>, integer}, {<<"gte_range">>, integer},
{<<"_lte_range">>, integer}, {<<"lte_range">>, integer},
{<<"_like_fuzzy">>, binary}, {<<"like_fuzzy">>, binary},
{<<"_match_topic">>, binary}], {<<"match_topic">>, binary}],
Params = [{<<"str">>, <<"abc">>}, Params = [{<<"str">>, <<"abc">>},
{<<"int">>, <<"123">>}, {<<"int">>, <<"123">>},
{<<"atom">>, <<"connected">>}, {<<"atom">>, <<"connected">>},
{<<"ts">>, <<"156000">>}, {<<"ts">>, <<"156000">>},
{<<"_gte_range">>, <<"1">>}, {<<"gte_range">>, <<"1">>},
{<<"_lte_range">>, <<"5">>}, {<<"lte_range">>, <<"5">>},
{<<"_like_fuzzy">>, <<"user">>}, {<<"like_fuzzy">>, <<"user">>},
{<<"_match_topic">>, <<"t/#">>}], {<<"match_topic">>, <<"t/#">>}],
ExpectedQs = [{str, '=:=', <<"abc">>}, ExpectedQs = [{str, '=:=', <<"abc">>},
{int, '=:=', 123}, {int, '=:=', 123},
{atom, '=:=', connected}, {atom, '=:=', connected},

View File

@ -122,8 +122,7 @@ app_api() ->
name => app_id, name => app_id,
in => path, in => path,
required => true, required => true,
schema => #{type => string}, schema => #{type => string}
default => <<"admin">>
}], }],
'requestBody' => emqx_mgmt_util:request_body_schema(app_without_secret_schema()), 'requestBody' => emqx_mgmt_util:request_body_schema(app_without_secret_schema()),
responses => #{ responses => #{

View File

@ -48,12 +48,12 @@
, {<<"clean_start">>, atom} , {<<"clean_start">>, atom}
, {<<"proto_name">>, binary} , {<<"proto_name">>, binary}
, {<<"proto_ver">>, integer} , {<<"proto_ver">>, integer}
, {<<"_like_clientid">>, binary} , {<<"like_clientid">>, binary}
, {<<"_like_username">>, binary} , {<<"like_username">>, binary}
, {<<"_gte_created_at">>, timestamp} , {<<"gte_created_at">>, timestamp}
, {<<"_lte_created_at">>, timestamp} , {<<"lte_created_at">>, timestamp}
, {<<"_gte_connected_at">>, timestamp} , {<<"gte_connected_at">>, timestamp}
, {<<"_lte_connected_at">>, timestamp}]}). , {<<"lte_connected_at">>, timestamp}]}).
-define(query_fun, {?MODULE, query}). -define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}). -define(format_fun, {?MODULE, format_channel_info}).

View File

@ -54,15 +54,13 @@ routes_api() ->
name => page, name => page,
in => query, in => query,
description => <<"Page">>, description => <<"Page">>,
schema => #{type => integer}, schema => #{type => integer, default => 1}
default => 1
}, },
#{ #{
name => limit, name => limit,
in => query, in => query,
description => <<"Page size">>, description => <<"Page size">>,
schema => #{type => integer}, schema => #{type => integer, default => emqx_mgmt:max_row_limit()}
default => emqx_mgmt:max_row_limit()
}], }],
responses => #{ responses => #{
<<"200">> => <<"200">> =>

View File

@ -16,79 +16,107 @@
-module(emqx_mgmt_api_subscriptions). -module(emqx_mgmt_api_subscriptions).
-behavior(minirest_api).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-define(SUBS_QS_SCHEMA, {emqx_suboption, -export([api_spec/0]).
[{<<"clientid">>, binary},
{<<"topic">>, binary},
{<<"share">>, binary},
{<<"qos">>, integer},
{<<"_match_topic">>, binary}]}).
-rest_api(#{name => list_subscriptions, -export([subscriptions/2]).
method => 'GET',
path => "/subscriptions/",
func => list,
descr => "A list of subscriptions in the cluster"}).
-rest_api(#{name => list_node_subscriptions,
method => 'GET',
path => "/nodes/:atom:node/subscriptions/",
func => list,
descr => "A list of subscriptions on a node"}).
-rest_api(#{name => lookup_client_subscriptions,
method => 'GET',
path => "/subscriptions/:bin:clientid",
func => lookup,
descr => "A list of subscriptions of a client"}).
-rest_api(#{name => lookup_client_subscriptions_with_node,
method => 'GET',
path => "/nodes/:atom:node/subscriptions/:bin:clientid",
func => lookup,
descr => "A list of subscriptions of a client on the node"}).
-export([ list/2
, lookup/2
]).
-export([ query/3 -export([ query/3
, format/1 , format/1
]). ]).
-define(SUBS_QS_SCHEMA, {emqx_suboption,
[ {<<"clientid">>, binary}
, {<<"topic">>, binary}
, {<<"share">>, binary}
, {<<"qos">>, integer}
, {<<"match_topic">>, binary}]}).
-define(query_fun, {?MODULE, query}). -define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format}). -define(format_fun, {?MODULE, format}).
list(Bindings, Params) when map_size(Bindings) == 0 -> api_spec() ->
case proplists:get_value(<<"topic">>, Params) of {
undefined -> [subscriptions_api()],
emqx_mgmt:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}); [subscription_schema()]
Topic -> }.
emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
end;
list(#{node := Node} = Bindings, Params) -> subscriptions_api() ->
case proplists:get_value(<<"topic">>, Params) of MetaData = #{
undefined -> get => #{
case Node =:= node() of description => "List subscriptions",
true -> parameters => [
emqx_mgmt:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)}); #{
false -> name => page,
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of in => query,
{badrpc, Reason} -> emqx_mgmt:return({error, Reason}); description => <<"Page">>,
Res -> Res schema => #{type => integer}
end },
end; #{
Topic -> name => limit,
emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)}) in => query,
end. description => <<"Page size">>,
schema => #{type => integer}
},
#{
name => clientid,
in => query,
description => <<"Client ID">>,
schema => #{type => string}
},
#{
name => qos,
in => query,
description => <<"QoS">>,
schema => #{type => integer}
},
#{
name => share,
in => query,
description => <<"Shared subscription">>,
schema => #{type => boolean}
},
#{
name => topic,
in => query,
description => <<"Topic">>,
schema => #{type => string}
}
#{
name => match_topic,
in => query,
description => <<"Match topic string">>,
schema => #{type => string}
}
],
responses => #{
<<"200">> => emqx_mgmt_util:response_page_schema(<<"subscription">>)}}},
{"/subscriptions", MetaData, subscriptions}.
lookup(#{node := Node, clientid := ClientId}, _Params) -> subscription_schema() ->
emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))}); #{
subscription => #{
type => object,
properties => #{
topic => #{
type => string},
clientid => #{
type => string},
qos => #{
type => integer,
enum => [0,1,2]}}}
}.
subscriptions(get, Request) ->
Params = cowboy_req:parse_qs(Request),
list(Params).
list(Params) ->
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}.
lookup(#{clientid := ClientId}, _Params) ->
emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
format(Items) when is_list(Items) -> format(Items) when is_list(Items) ->
[format(Item) || Item <- Items]; [format(Item) || Item <- Items];
@ -98,10 +126,10 @@ format({{Subscriber, Topic}, Options}) ->
format({_Subscriber, Topic, Options = #{share := Group}}) -> format({_Subscriber, Topic, Options = #{share := Group}}) ->
QoS = maps:get(qos, Options), QoS = maps:get(qos, Options),
#{node => node(), topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; #{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS};
format({_Subscriber, Topic, Options}) -> format({_Subscriber, Topic, Options}) ->
QoS = maps:get(qos, Options), QoS = maps:get(qos, Options),
#{node => node(), topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. #{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Query Function %% Query Function

View File

@ -31,7 +31,8 @@
, response_array_schema/2 , response_array_schema/2
, response_error_schema/1 , response_error_schema/1
, response_error_schema/2 , response_error_schema/2
, batch_response_schema/1]). , response_page_schema/1
, response_batch_schema/1]).
-export([urldecode/1]). -export([urldecode/1]).
@ -128,7 +129,25 @@ response_error_schema(Description, Enum) ->
type => string}}}, type => string}}},
json_content_schema(Description, Schema). json_content_schema(Description, Schema).
batch_response_schema(DefName) when is_binary(DefName) -> response_page_schema(Def) when is_binary(Def) ->
Schema = #{
type => object,
properties => #{
meta => #{
type => object,
properties => #{
page => #{
type => integer},
limit => #{
type => integer},
count => #{
type => integer}}},
data => #{
type => array,
items => minirest:ref(Def)}}},
json_content_schema("", Schema).
response_batch_schema(DefName) when is_binary(DefName) ->
Schema = #{ Schema = #{
type => object, type => object,
properties => #{ properties => #{

View File

@ -0,0 +1,74 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_subscription_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
%% notice: integer topic for sort response
-define(TOPIC1, <<"0000">>).
-define(TOPIC2, <<"0001">>).
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_subscription_api(_) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID}),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, _} = emqtt:subscribe(Client, ?TOPIC2),
Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path),
Data = emqx_json:decode(Response, [return_maps]),
Meta = maps:get(<<"meta">>, Data),
?assertEqual(1, maps:get(<<"page">>, Meta)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)),
?assertEqual(2, maps:get(<<"count">>, Meta)),
Subscriptions = maps:get(<<"data">>, Data),
?assertEqual(length(Subscriptions), 2),
Sort =
fun(#{<<"topic">> := T1}, #{<<"topic">> := T2}) ->
binary_to_integer(T1) =< binary_to_integer(T2)
end,
[Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions),
?assertEqual(maps:get(<<"topic">>, Subscriptions1), ?TOPIC1),
?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2),
?assertEqual(maps:get(<<"clientid">>, Subscriptions1), ?CLIENTID),
?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID),
emqtt:disconnect(Client).