From 734a5b94202d6fbf3b20f67d12568aae8413d04b Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Mon, 12 Jul 2021 18:10:40 +0800 Subject: [PATCH] test: add clients api SUITE & add delete sub api --- .../src/emqx_mgmt_api_clients.erl | 57 ++++++- .../test/emqx_mgmt_clients_api_SUITE.erl | 156 ++++++++++++++++++ 2 files changed, 206 insertions(+), 7 deletions(-) create mode 100644 apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 1afd906f1..74cd995a4 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -269,7 +269,6 @@ clients_acl_cache_api() -> {"/clients/:clientid/acl_cache", Metadata, acl_cache}. subscribe_api() -> - Path = "/clients/:clientid/subscribe", Metadata = #{ post => #{ description => "subscribe", @@ -282,7 +281,7 @@ subscribe_api() -> default => 123456 }, #{ - name => topics, + name => topic_data, in => body, schema => #{ type => object, @@ -300,8 +299,28 @@ subscribe_api() -> ], responses => #{ <<"404">> => emqx_mgmt_util:not_found_schema(<<"Client id not found">>), - <<"200">> => #{description => <<"publish ok">>}}}}, - {Path, Metadata, subscribe}. + <<"200">> => #{description => <<"subscribe ok">>}}}, + delete => #{ + description => "unsubscribe", + parameters => [ + #{ + name => clientid, + in => path, + type => string, + required => true, + default => 123456 + }, + #{ + name => topic, + in => query, + required => true, + default => <<"topic_1">> + } + ], + responses => #{ + <<"404">> => emqx_mgmt_util:not_found_schema(<<"Client id not found">>), + <<"200">> => #{description => <<"unsubscribe ok">>}}}}, + {"/clients/:clientid/subscribe", Metadata, subscribe}. %%%============================================================================================== %% parameters trans @@ -330,7 +349,12 @@ subscribe(post, Request) -> TopicInfo = emqx_json:decode(Body, [return_maps]), Topic = maps:get(<<"topic">>, TopicInfo), Qos = maps:get(<<"qos">>, TopicInfo, 0), - subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}). + subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}); + +subscribe(delete, Request) -> + ClientID = cowboy_req:binding(clientid, Request), + #{topic := Topic} = cowboy_req:match_qs([topic], Request), + unsubscribe(#{clientid => ClientID, topic => Topic}). %% TODO: batch subscribe_batch(post, Request) -> @@ -359,7 +383,7 @@ lookup(#{clientid := ClientID}) -> {404, ?CLIENT_ID_NOT_FOUND}; ClientInfo -> Response = emqx_json:encode(hd(ClientInfo)), - {ok, Response} + {200, Response} end. kickout(#{clientid := ClientID}) -> @@ -393,11 +417,22 @@ subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> Body = emqx_json:encode(#{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}), - {200, Body}; + {500, Body}; ok -> {200} end. +unsubscribe(#{clientid := ClientID, topic := Topic}) -> + case do_unsubscribe(ClientID, Topic) of + {error, channel_not_found} -> + {404, ?CLIENT_ID_NOT_FOUND}; + {error, Reason} -> + Body = emqx_json:encode(#{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}), + {500, Body}; + {unsubscribe, [{Topic, #{}}]} -> + {200} + end. + subscribe_batch(#{clientid := ClientID, topics := Topics}) -> ArgList = [[ClientID, Topic, Qos]|| #{topic := Topic, qos := Qos} <- Topics], emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). @@ -477,6 +512,14 @@ do_subscribe(ClientID, Topic0, Qos) -> {error, unknow_error} end end. + +do_unsubscribe(ClientID, Topic) -> + case emqx_mgmt:unsubscribe(ClientID, Topic) of + {error, Reason} -> + {error, Reason}; + Res -> + Res + end. %%%============================================================================================== %% Query Functions diff --git a/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl new file mode 100644 index 000000000..199028c04 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl @@ -0,0 +1,156 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_clients_api_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(APP, emqx_management). + +-define(SERVER, "http://127.0.0.1:8081"). +-define(BASE_PATH, "/api/v5"). + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_clients(_) -> + process_flag(trap_exit, true), + + Username1 = <<"user1">>, + ClientId1 = <<"client1">>, + + Username2 = <<"user2">>, + ClientId2 = <<"client2">>, + + Topic = <<"topic_1">>, + Qos = 0, + + {ok, C1} = emqtt:start_link(#{username => Username1, clientid => ClientId1}), + {ok, _} = emqtt:connect(C1), + {ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}), + {ok, _} = emqtt:connect(C2), + + timer:sleep(300), + + %% get /clients + {ok, Clients} = request_api(get, api_path(["clients"])), + ClientsResponse = emqx_json:decode(Clients, [return_maps]), + ClientsMeta = maps:get(<<"meta">>, ClientsResponse), + ClientsPage = maps:get(<<"page">>, ClientsMeta), + ClientsLimit = maps:get(<<"limit">>, ClientsMeta), + ClientsCount = maps:get(<<"count">>, ClientsMeta), + ?assertEqual(ClientsPage, 1), + ?assertEqual(ClientsLimit, emqx_mgmt:max_row_limit()), + ?assertEqual(ClientsCount, 2), + + %% get /clients/:clientid + {ok, Client1} = request_api(get, api_path(["clients", binary_to_list(ClientId1)])), + Client1Response = emqx_json:decode(Client1, [return_maps]), + ?assertEqual(Username1, maps:get(<<"username">>, Client1Response)), + ?assertEqual(ClientId1, maps:get(<<"clientid">>, Client1Response)), + + %% delete /clients/:clientid kickout + {ok, _} = request_api(delete, api_path(["clients", binary_to_list(ClientId2)])), + AfterKickoutResponse = request_api(get, api_path(["clients", binary_to_list(ClientId2)])), + ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse), + + %% get /clients/:clientid/acl_cache should has no acl cache + {ok, Client1AclCache} = request_api(get, + api_path(["clients", binary_to_list(ClientId1), "acl_cache"])), + ?assertEqual("[]", Client1AclCache), + + %% get /clients/:clientid/acl_cache should has no acl cache + {ok, Client1AclCache} = request_api(get, + api_path(["clients", binary_to_list(ClientId1), "acl_cache"])), + ?assertEqual("[]", Client1AclCache), + + %% post /clients/:clientid/subscribe + SubscribeBody = #{topic => Topic, qos => Qos}, + SubscribePath = api_path(["clients", binary_to_list(ClientId1), "subscribe"]), + {ok, _} = request_api(post, SubscribePath, "", auth_header_(), SubscribeBody), + [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), + ?assertEqual(AfterSubTopic, Topic), + ?assertEqual(AfterSubQos, Qos), + + %% delete /clients/:clientid/subscribe + UnSubscribeQuery = "topic=" ++ binary_to_list(Topic), + {ok, _} = request_api(delete, SubscribePath, UnSubscribeQuery, auth_header_()), + ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)). + +%%%============================================================================================== +%% test util function +request_api(Method, Url) -> + request_api(Method, Url, [], auth_header_(), []). + +request_api(Method, Url, Auth) -> + request_api(Method, Url, [], Auth, []). + +request_api(Method, Url, QueryParams, Auth) -> + request_api(Method, Url, QueryParams, Auth, []). + +request_api(Method, Url, QueryParams, Auth, []) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth]}); +request_api(Method, Url, QueryParams, Auth, Body) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}). + +do_request_api(Method, Request)-> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], []) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } + when Code =:= 200 orelse Code =:= 201 -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +auth_header_() -> + AppId = <<"admin">>, + AppSecret = <<"public">>, + auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)). + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User,":",Pass])), + {"Authorization","Basic " ++ Encoded}. + +api_path(Parts)-> + ?SERVER ++ filename:join([?BASE_PATH | Parts]).