Merge pull request #5378 from DDDHuang/clients_api_fix

fix: add cleints api query params doc & unsubscribe api
This commit is contained in:
DDDHuang 2021-08-04 15:07:51 +08:00 committed by GitHub
commit b3ea7f9cce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 144 additions and 49 deletions

View File

@ -87,8 +87,8 @@ node_query(Node, Params, {Tab, QsSchema}, QueryFun) ->
{_, Rows} = do_query(Node, Qs, QueryFun, Start, Limit+1), {_, Rows} = do_query(Node, Qs, QueryFun, Start, Limit+1),
Meta = #{page => Page, limit => Limit}, Meta = #{page => Page, limit => Limit},
NMeta = case CodCnt =:= 0 of NMeta = case CodCnt =:= 0 of
true -> Meta#{count => count(Tab), hasnext => length(Rows) > Limit}; true -> Meta#{count => count(Tab)};
_ -> Meta#{count => -1, hasnext => length(Rows) > Limit} _ -> Meta#{count => length(Rows)}
end, end,
#{meta => NMeta, data => lists:sublist(Rows, Limit)}. #{meta => NMeta, data => lists:sublist(Rows, Limit)}.
@ -120,8 +120,8 @@ cluster_query(Params, {Tab, QsSchema}, QueryFun) ->
Rows = do_cluster_query(Nodes, Qs, QueryFun, Start, Limit+1, []), Rows = do_cluster_query(Nodes, Qs, QueryFun, Start, Limit+1, []),
Meta = #{page => Page, limit => Limit}, Meta = #{page => Page, limit => Limit},
NMeta = case CodCnt =:= 0 of NMeta = case CodCnt =:= 0 of
true -> Meta#{count => count(Tab, Nodes), hasnext => length(Rows) > Limit}; true -> Meta#{count => count(Tab, Nodes)};
_ -> Meta#{count => -1, hasnext => length(Rows) > Limit} _ -> Meta#{count => length(Rows)}
end, end,
#{meta => NMeta, data => lists:sublist(Rows, Limit)}. #{meta => NMeta, data => lists:sublist(Rows, Limit)}.

View File

@ -32,6 +32,7 @@
, subscriptions/2 , subscriptions/2
, authz_cache/2 , authz_cache/2
, subscribe/2 , subscribe/2
, unsubscribe/2
, subscribe_batch/2]). , subscribe_batch/2]).
-export([ query/3 -export([ query/3
@ -41,7 +42,7 @@
-export([do_subscribe/3]). -export([do_subscribe/3]).
-define(CLIENT_QS_SCHEMA, {emqx_channel_info, -define(CLIENT_QS_SCHEMA, {emqx_channel_info,
[ {<<"clientid">>, binary} [ {<<"node">>, atom}
, {<<"username">>, binary} , {<<"username">>, binary}
, {<<"zone">>, atom} , {<<"zone">>, atom}
, {<<"ip_address">>, ip} , {<<"ip_address">>, ip}
@ -70,7 +71,8 @@ apis() ->
, client_api() , client_api()
, clients_authz_cache_api() , clients_authz_cache_api()
, clients_subscriptions_api() , clients_subscriptions_api()
, subscribe_api()]. , subscribe_api()
, unsubscribe_api()].
schemas() -> schemas() ->
Client = #{ Client = #{
@ -211,22 +213,98 @@ schemas() ->
} }
} }
}, },
Subscription = #{ [Client, AuthzCache].
subscription => #{
type => object,
properties => #{
topic => #{
type => string},
qos => #{
type => integer,
enum => [0,1,2]}}}
},
[Client, AuthzCache, Subscription].
clients_api() -> clients_api() ->
Metadata = #{ Metadata = #{
get => #{ get => #{
description => <<"List clients">>, description => <<"List clients">>,
parameters => [
#{
name => node,
in => query,
required => false,
schema => #{type => string}
},
#{
name => username,
in => query,
required => false,
schema => #{type => string}
},
#{
name => zone,
in => query,
required => false,
schema => #{type => string}
},
#{
name => ip_address,
in => query,
required => false,
schema => #{type => string}
},
#{
name => conn_state,
in => query,
required => false,
schema => #{type => string}
},
#{
name => clean_start,
in => query,
required => false,
schema => #{type => string}
},
#{
name => proto_name,
in => query,
required => false,
schema => #{type => string}
},
#{
name => proto_ver,
in => query,
required => false,
schema => #{type => string}
},
#{
name => like_clientid,
in => query,
required => false,
schema => #{type => string}
},
#{
name => like_username,
in => query,
required => false,
schema => #{type => string}
},
#{
name => gte_created_at,
in => query,
required => false,
schema => #{type => string}
},
#{
name => lte_created_at,
in => query,
required => false,
schema => #{type => string}
},
#{
name => gte_connected_at,
in => query,
required => false,
schema => #{type => string}
},
#{
name => lte_connected_at,
in => query,
required => false,
schema => #{type => string}
}
],
responses => #{ responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, client)}}}, <<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, client)}}},
{"/clients", Metadata, clients}. {"/clients", Metadata, clients}.
@ -284,6 +362,15 @@ clients_authz_cache_api() ->
{"/clients/:clientid/authz_cache", Metadata, authz_cache}. {"/clients/:clientid/authz_cache", Metadata, authz_cache}.
clients_subscriptions_api() -> clients_subscriptions_api() ->
SubscriptionSchema = #{
type => object,
properties => #{
topic => #{
type => string},
qos => #{
type => integer,
enum => [0,1,2]}}
},
Metadata = #{ Metadata = #{
get => #{ get => #{
description => <<"Get client subscriptions">>, description => <<"Get client subscriptions">>,
@ -294,10 +381,33 @@ clients_subscriptions_api() ->
required => true required => true
}], }],
responses => #{ responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}} <<"200">> =>
emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, SubscriptionSchema)}}
}, },
{"/clients/:clientid/subscriptions", Metadata, subscriptions}. {"/clients/:clientid/subscriptions", Metadata, subscriptions}.
unsubscribe_api() ->
Metadata = #{
post => #{
description => <<"Unsubscribe">>,
parameters => [
#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}
],
'requestBody' => emqx_mgmt_util:request_body_schema(#{
type => object,
properties => #{
topic => #{
type => string,
description => <<"Topic">>}}}),
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Unsubscribe ok">>)}}},
{"/clients/:clientid/unsubscribe", Metadata, unsubscribe}.
subscribe_api() -> subscribe_api() ->
Metadata = #{ Metadata = #{
post => #{ post => #{
@ -321,32 +431,14 @@ subscribe_api() ->
description => <<"QoS">>}}}), description => <<"QoS">>}}}),
responses => #{ responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Subscribe ok">>)}}, <<"200">> => emqx_mgmt_util:response_schema(<<"Subscribe ok">>)}}},
delete => #{
description => <<"Unsubscribe">>,
parameters => [
#{
name => clientid,
in => path,
schema => #{type => string},
required => true
},
#{
name => topic,
in => query,
schema => #{type => string},
required => true
}
],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Unsubscribe ok">>)}}},
{"/clients/:clientid/subscribe", Metadata, subscribe}. {"/clients/:clientid/subscribe", Metadata, subscribe}.
%%%============================================================================================== %%%==============================================================================================
%% parameters trans %% parameters trans
clients(get, _Request) -> clients(get, Request) ->
list(#{}). Params = cowboy_req:parse_qs(Request),
list(Params).
client(get, Request) -> client(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request), ClientID = cowboy_req:binding(clientid, Request),
@ -370,11 +462,13 @@ subscribe(post, Request) ->
TopicInfo = emqx_json:decode(Body, [return_maps]), TopicInfo = emqx_json:decode(Body, [return_maps]),
Topic = maps:get(<<"topic">>, TopicInfo), Topic = maps:get(<<"topic">>, TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0), Qos = maps:get(<<"qos">>, TopicInfo, 0),
subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}); subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}).
subscribe(delete, Request) -> unsubscribe(post, Request) ->
ClientID = cowboy_req:binding(clientid, Request), ClientID = cowboy_req:binding(clientid, Request),
#{topic := Topic} = cowboy_req:match_qs([topic], Request), {ok, Body, _} = cowboy_req:read_body(Request),
TopicInfo = emqx_json:decode(Body, [return_maps]),
Topic = maps:get(<<"topic">>, TopicInfo),
unsubscribe(#{clientid => ClientID, topic => Topic}). unsubscribe(#{clientid => ClientID, topic => Topic}).
%% TODO: batch %% TODO: batch
@ -402,7 +496,7 @@ subscriptions(get, Request) ->
%% api apply %% api apply
list(Params) -> list(Params) ->
Response = emqx_mgmt_api:cluster_query(maps:to_list(Params), ?CLIENT_QS_SCHEMA, ?query_fun), Response = emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun),
{200, Response}. {200, Response}.
lookup(#{clientid := ClientID}) -> lookup(#{clientid := ClientID}) ->
@ -486,7 +580,7 @@ format_channel_info({_, ClientInfo, ClientStats}) ->
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap3), ClientInfoMap = maps:put(connected, Connected, ClientInfoMap3),
RemoveList = [ RemoveList = [
auth_result auth_result
, peername , peername
, sockname , sockname
, peerhost , peerhost
@ -495,6 +589,7 @@ format_channel_info({_, ClientInfo, ClientStats}) ->
, conn_props , conn_props
, peercert , peercert
, sockstate , sockstate
, subscriptions
, receive_maximum , receive_maximum
, protocol , protocol
, is_superuser , is_superuser

View File

@ -94,15 +94,15 @@ t_clients(_) ->
%% post /clients/:clientid/subscribe %% post /clients/:clientid/subscribe
SubscribeBody = #{topic => Topic, qos => Qos}, SubscribeBody = #{topic => Topic, qos => Qos},
SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]), SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]),
{ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody), {ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody),
timer:sleep(100), timer:sleep(100),
[{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubTopic, Topic),
?assertEqual(AfterSubQos, Qos), ?assertEqual(AfterSubQos, Qos),
%% delete /clients/:clientid/subscribe %% post /clients/:clientid/unsubscribe
UnSubscribeQuery = "topic=" ++ binary_to_list(Topic), UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "unsubscribe"]),
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, SubscribePath, UnSubscribeQuery, AuthHeader), {ok, _} = emqx_mgmt_api_test_util:request_api(post, UnSubscribePath, "", AuthHeader, SubscribeBody),
timer:sleep(100), timer:sleep(100),
?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)). ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)).