diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 37b769655..be827510e 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -25,17 +25,27 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). +-define(HTTP200, {"HTTP/1.1", 200, "OK"}). +-define(HTTP201, {"HTTP/1.1", 201, "Created"}). +-define(HTTP204, {"HTTP/1.1", 204, "No Content"}). +-define(HTTP400, {"HTTP/1.1", 400, "Bad Request"}). +-define(HTTP404, {"HTTP/1.1", 404, "Not Found"}). + all() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), [ - {group, persistent_sessions}, - {group, msgs_base64_encoding}, - {group, msgs_plain_encoding} - | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases()) + {group, general}, + {group, persistent_sessions} ]. groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + GeneralTCs = AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases()), [ + {general, [ + {group, msgs_base64_encoding}, + {group, msgs_plain_encoding} + | GeneralTCs + ]}, {persistent_sessions, persistent_session_testcases()}, {msgs_base64_encoding, client_msgs_testcases()}, {msgs_plain_encoding, client_msgs_testcases()} @@ -60,7 +70,12 @@ client_msgs_testcases() -> ]. init_per_suite(Config) -> - ok = snabbkaffe:start_trace(), + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(general, Config) -> Apps = emqx_cth_suite:start( [ emqx, @@ -70,14 +85,11 @@ init_per_suite(Config) -> ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), - {ok, _} = emqx_common_test_http:create_default_app(), - [{apps, Apps} | Config]. - -end_per_suite(Config) -> - Apps = ?config(apps, Config), - emqx_cth_suite:stop(Apps), - ok. - + [ + {apps, Apps}, + {api_auth_header, emqx_mgmt_api_test_util:auth_header_()} + | Config + ]; init_per_group(persistent_sessions, Config) -> AppSpecs = [ {emqx, @@ -85,18 +97,21 @@ init_per_group(persistent_sessions, Config) -> "durable_sessions.disconnected_session_count_refresh_interval = 100ms"}, emqx_management ], - Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( - "dashboard.listeners.http { enable = true, bind = 18084 }" - ), + Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(), Cluster = [ {emqx_mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}}, {emqx_mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}} ], - Nodes = emqx_cth_cluster:start( - Cluster, - #{work_dir => emqx_cth_suite:work_dir(Config)} - ), - [{nodes, Nodes} | Config]; + Nodes = + [N1 | _] = emqx_cth_cluster:start( + Cluster, + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [ + {nodes, Nodes}, + {api_auth_header, erpc:call(N1, emqx_mgmt_api_test_util, auth_header_, [])} + | Config + ]; init_per_group(msgs_base64_encoding, Config) -> [{payload_encoding, base64} | Config]; init_per_group(msgs_plain_encoding, Config) -> @@ -104,14 +119,26 @@ init_per_group(msgs_plain_encoding, Config) -> init_per_group(_Group, Config) -> Config. +end_per_group(general, Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps); end_per_group(persistent_sessions, Config) -> Nodes = ?config(nodes, Config), - emqx_cth_cluster:stop(Nodes), - ok; + ok = emqx_cth_cluster:stop(Nodes); end_per_group(_Group, _Config) -> ok. init_per_testcase(_TC, Config) -> + %% NOTE + %% Wait until there are no stale clients data before running the testcase. + ?retry( + _Timeout = 100, + _N = 10, + ?assertMatch( + {ok, {?HTTP200, _, #{<<"data">> := []}}}, + list_request(Config) + ) + ), ok = snabbkaffe:start_trace(), Config. @@ -134,9 +161,7 @@ end_per_testcase(_TC, _Config) -> ok = snabbkaffe:stop(), ok. -t_clients(_) -> - process_flag(trap_exit, true), - +t_clients(Config) -> Username1 = <<"user1">>, ClientId1 = <<"client1">>, @@ -146,8 +171,6 @@ t_clients(_) -> Topic = <<"topic_1">>, Qos = 0, - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - {ok, C1} = emqtt:start_link(#{ username => Username1, clientid => ClientId1, @@ -162,9 +185,8 @@ t_clients(_) -> %% get /clients ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), - {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), - ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]), - ClientsMeta = maps:get(<<"meta">>, ClientsResponse), + {ok, {?HTTP200, _, Clients}} = request(get, ClientsPath, Config), + ClientsMeta = maps:get(<<"meta">>, Clients), ClientsPage = maps:get(<<"page">>, ClientsMeta), ClientsLimit = maps:get(<<"limit">>, ClientsMeta), ClientsCount = maps:get(<<"count">>, ClientsMeta), @@ -173,130 +195,100 @@ t_clients(_) -> ?assertEqual(ClientsCount, 2), %% get /clients/:clientid - Client1Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1)]), - {ok, Client1} = emqx_mgmt_api_test_util:request_api(get, Client1Path), - Client1Response = emqx_utils_json:decode(Client1, [return_maps]), - ?assertEqual(Username1, maps:get(<<"username">>, Client1Response)), - ?assertEqual(ClientId1, maps:get(<<"clientid">>, Client1Response)), - ?assertEqual(120, maps:get(<<"expiry_interval">>, Client1Response)), + Client1Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId1]), + {ok, {?HTTP200, _, Client1}} = request(get, Client1Path, Config), + ?assertEqual(Username1, maps:get(<<"username">>, Client1)), + ?assertEqual(ClientId1, maps:get(<<"clientid">>, Client1)), + ?assertEqual(120, maps:get(<<"expiry_interval">>, Client1)), %% delete /clients/:clientid kickout - Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]), - {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path), - Kick = - receive - {'EXIT', C2, _} -> - ok - after 300 -> - timeout - end, - ?assertEqual(ok, Kick), + true = erlang:unlink(C2), + MRef = erlang:monitor(process, C2), + Client2Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId2]), + {ok, {?HTTP204, _, _}} = request(delete, Client2Path, [], Config), + ?assertReceive({'DOWN', MRef, process, C2, _}), %% Client info is cleared after DOWN event ?retry(_Interval = 100, _Attempts = 5, begin - AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path), - ?assertEqual(AfterKickoutResponse2, {error, {"HTTP/1.1", 404, "Not Found"}}) + ?assertMatch( + {error, {?HTTP404, _, _}}, + request(get, Client2Path, Config) + ) end), %% get /clients/:clientid/authorization/cache should have no authz cache Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path([ "clients", - binary_to_list(ClientId1), + ClientId1, "authorization", "cache" ]), - {ok, Client1AuthzCache0} = emqx_mgmt_api_test_util:request_api(get, Client1AuthzCachePath), - ?assertEqual("[]", Client1AuthzCache0), + ?assertMatch( + {ok, {?HTTP200, _, []}}, + request(get, Client1AuthzCachePath, Config) + ), %% post /clients/:clientid/subscribe SubscribeBody = #{topic => Topic, qos => Qos, nl => 1, rh => 1}, - SubscribePath = emqx_mgmt_api_test_util:api_path([ - "clients", - binary_to_list(ClientId1), - "subscribe" - ]), - {ok, _} = emqx_mgmt_api_test_util:request_api( - post, - SubscribePath, - "", - AuthHeader, - SubscribeBody - ), + SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", ClientId1, "subscribe"]), + {ok, {?HTTP200, _, _}} = request(post, SubscribePath, SubscribeBody, Config), timer:sleep(100), {_, [{AfterSubTopic, #{qos := AfterSubQos}}]} = emqx_mgmt:list_client_subscriptions(ClientId1), ?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubQos, Qos), %% get /clients/:clientid/subscriptions - SubscriptionsPath = emqx_mgmt_api_test_util:api_path([ - "clients", - binary_to_list(ClientId1), - "subscriptions" - ]), - {ok, SubscriptionsRes} = emqx_mgmt_api_test_util:request_api( - get, - SubscriptionsPath, - "", - AuthHeader - ), - [SubscriptionsData] = emqx_utils_json:decode(SubscriptionsRes, [return_maps]), + SubscriptionsPath = emqx_mgmt_api_test_util:api_path(["clients", ClientId1, "subscriptions"]), ?assertMatch( - #{ - <<"clientid">> := ClientId1, - <<"nl">> := 1, - <<"rap">> := 0, - <<"rh">> := 1, - <<"node">> := _, - <<"qos">> := Qos, - <<"topic">> := Topic - }, - SubscriptionsData + {ok, + {?HTTP200, _, [ + #{ + <<"clientid">> := ClientId1, + <<"nl">> := 1, + <<"rap">> := 0, + <<"rh">> := 1, + <<"node">> := _, + <<"qos">> := Qos, + <<"topic">> := Topic + } + ]}}, + request(get, SubscriptionsPath, Config) ), %% post /clients/:clientid/unsubscribe - UnSubscribePath = emqx_mgmt_api_test_util:api_path([ - "clients", - binary_to_list(ClientId1), - "unsubscribe" - ]), + UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients", ClientId1, "unsubscribe"]), UnSubscribeBody = #{topic => Topic}, - {ok, _} = emqx_mgmt_api_test_util:request_api( - post, - UnSubscribePath, - "", - AuthHeader, - UnSubscribeBody + ?assertMatch( + {ok, {?HTTP204, _, _}}, + request(post, UnSubscribePath, UnSubscribeBody, Config) ), timer:sleep(100), ?assertEqual([], emqx_mgmt:list_client_subscriptions(ClientId1)), %% testcase cleanup, kickout client1 - {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path), - timer:sleep(300), - AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path), - ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1). + disconnect_and_destroy_session(C1). t_persistent_sessions1(Config) -> [N1, _N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), - ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + ?assertMatch({ok, {?HTTP200, _, #{<<"data">> := []}}}, list_request(Config)), ?check_trace( begin %% Scenario 1 %% 1) Client connects and is listed as connected. ?tp(notice, "scenario 1", #{}), - O = #{api_port => APIPort}, ClientId = <<"c1">>, C1 = connect_client(#{port => Port1, clientid => ClientId}), - assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config), %% 2) Client disconnects and is listed as disconnected. ok = emqtt:disconnect(C1), - assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}), + assert_single_client( + #{node => N1, clientid => ClientId, status => disconnected}, Config + ), %% 3) Client reconnects and is listed as connected. C2 = connect_client(#{port => Port1, clientid => ClientId}), - assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config), %% 4) Client disconnects. ok = emqtt:stop(C2), %% 5) Session is GC'ed, client is removed from list. @@ -307,8 +299,8 @@ t_persistent_sessions1(Config) -> 100, 20, ?assertMatch( - {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, - list_request(APIPort) + {ok, {?HTTP200, _, #{<<"data">> := []}}}, + list_request(Config) ) ), ok @@ -319,33 +311,23 @@ t_persistent_sessions1(Config) -> t_persistent_sessions2(Config) -> [N1, _N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), - ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + ?assertMatch({ok, {?HTTP200, _, #{<<"data">> := []}}}, list_request(Config)), ?check_trace( begin %% Scenario 2 %% 1) Client connects and is listed as connected. ?tp(notice, "scenario 2", #{}), - O = #{api_port => APIPort}, ClientId = <<"c2">>, C1 = connect_client(#{port => Port1, clientid => ClientId}), - assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config), unlink(C1), %% 2) Client connects to the same node and takes over, listed only once. C2 = connect_client(#{port => Port1, clientid => ClientId}), - assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), - disconnect_and_destroy_session(C2), - ?retry( - 100, - 20, - ?assertMatch( - {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, - list_request(APIPort) - ) - ) + assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config), + disconnect_and_destroy_session(C2) end, [] ), @@ -353,32 +335,30 @@ t_persistent_sessions2(Config) -> t_persistent_sessions3(Config) -> [N1, N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), Port2 = get_mqtt_port(N2, tcp), - ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + ?assertMatch({ok, {?HTTP200, _, #{<<"data">> := []}}}, list_request(Config)), ?check_trace( begin %% Scenario 3 %% 1) Client connects and is listed as connected. ?tp(notice, "scenario 3", #{}), - O = #{api_port => APIPort}, ClientId = <<"c3">>, C1 = connect_client(#{port => Port1, clientid => ClientId}), - assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config), unlink(C1), %% 2) Client connects to *another node* and takes over, listed only once. C2 = connect_client(#{port => Port2, clientid => ClientId}), - assert_single_client(O#{node => N2, clientid => ClientId, status => connected}), + assert_single_client(#{node => N2, clientid => ClientId, status => connected}, Config), %% Doesn't show up in the other node while alive ?retry( 100, 20, ?assertMatch( - {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, - list_request(APIPort, "node=" ++ atom_to_list(N1)) + {ok, {?HTTP200, _, #{<<"data">> := []}}}, + list_request(#{node => N1}, Config) ) ), disconnect_and_destroy_session(C2) @@ -389,36 +369,38 @@ t_persistent_sessions3(Config) -> t_persistent_sessions4(Config) -> [N1, N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), Port2 = get_mqtt_port(N2, tcp), - ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + ?assertMatch({ok, {?HTTP200, _, #{<<"data">> := []}}}, list_request(Config)), ?check_trace( begin %% Scenario 4 %% 1) Client connects and is listed as connected. ?tp(notice, "scenario 4", #{}), - O = #{api_port => APIPort}, ClientId = <<"c4">>, C1 = connect_client(#{port => Port1, clientid => ClientId}), - assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config), %% 2) Client disconnects and is listed as disconnected. ok = emqtt:stop(C1), %% While disconnected, shows up in both nodes. - assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}), - assert_single_client(O#{node => N2, clientid => ClientId, status => disconnected}), + assert_single_client( + #{node => N1, clientid => ClientId, status => disconnected}, Config + ), + assert_single_client( + #{node => N2, clientid => ClientId, status => disconnected}, Config + ), %% 3) Client reconnects to *another node* and is listed as connected once. C2 = connect_client(#{port => Port2, clientid => ClientId}), - assert_single_client(O#{node => N2, clientid => ClientId, status => connected}), + assert_single_client(#{node => N2, clientid => ClientId, status => connected}, Config), %% Doesn't show up in the other node while alive ?retry( 100, 20, ?assertMatch( - {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, - list_request(APIPort, "node=" ++ atom_to_list(N1)) + {ok, {?HTTP200, _, #{<<"data">> := []}}}, + list_request(#{node => N1}, Config) ) ), disconnect_and_destroy_session(C2) @@ -429,11 +411,10 @@ t_persistent_sessions4(Config) -> t_persistent_sessions5(Config) -> [N1, N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), Port2 = get_mqtt_port(N2, tcp), - ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + ?assertMatch({ok, {?HTTP200, _, #{<<"data">> := []}}}, list_request(Config)), ?check_trace( begin @@ -442,6 +423,7 @@ t_persistent_sessions5(Config) -> ClientId2 = <<"c6">>, ClientId3 = <<"c7">>, ClientId4 = <<"c8">>, + ClientIds = [ClientId1, ClientId2, ClientId3, ClientId4], %% persistent C1 = connect_client(#{port => Port1, clientid => ClientId1}), C2 = connect_client(#{port => Port2, clientid => ClientId2}), @@ -453,11 +435,11 @@ t_persistent_sessions5(Config) -> port => Port2, clientid => ClientId4, expiry => 0, clean_start => true }), - P1 = list_request(APIPort, "limit=3&page=1"), - P2 = list_request(APIPort, "limit=3&page=2"), + P1 = list_request(#{limit => 3, page => 1}, Config), + P2 = list_request(#{limit => 3, page => 2}, Config), ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"data">> := [_, _, _], <<"meta">> := #{ <<"count">> := 4, @@ -468,7 +450,7 @@ t_persistent_sessions5(Config) -> ), ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"data">> := [_], <<"meta">> := #{ <<"count">> := 4, @@ -480,29 +462,29 @@ t_persistent_sessions5(Config) -> {ok, {_, _, #{<<"data">> := R1}}} = P1, {ok, {_, _, #{<<"data">> := R2}}} = P2, ?assertEqual( - lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]), + lists:sort(ClientIds), lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R1 ++ R2)) ), ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"data">> := [_, _], <<"meta">> := #{ <<"count">> := 4, <<"hasnext">> := true } }}}, - list_request(APIPort, "limit=2&page=1") + list_request(#{limit => 2, page => 1}, Config) ), %% Disconnect persistent sessions lists:foreach(fun emqtt:stop/1, [C1, C2]), P3 = ?retry(200, 10, begin - P3_ = list_request(APIPort, "limit=3&page=1"), + P3_ = list_request(#{limit => 3, page => 1}, Config), ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"data">> := [_, _, _], <<"meta">> := #{ <<"count">> := 4, @@ -515,10 +497,10 @@ t_persistent_sessions5(Config) -> end), P4 = ?retry(200, 10, begin - P4_ = list_request(APIPort, "limit=3&page=2"), + P4_ = list_request(#{limit => 3, page => 2}, Config), ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"data">> := [_], <<"meta">> := #{ <<"count">> := 4, @@ -532,7 +514,7 @@ t_persistent_sessions5(Config) -> {ok, {_, _, #{<<"data">> := R3}}} = P3, {ok, {_, _, #{<<"data">> := R4}}} = P4, ?assertEqual( - lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]), + lists:sort(ClientIds), lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R3 ++ R4)) ), @@ -541,7 +523,7 @@ t_persistent_sessions5(Config) -> fun(ClientId) -> ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]) end, - [ClientId1, ClientId2, ClientId3, ClientId4] + ClientIds ), ok @@ -553,37 +535,37 @@ t_persistent_sessions5(Config) -> %% Checks that expired durable sessions are returned with `is_expired => true'. t_persistent_sessions6(Config) -> [N1, _N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), - ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + ?assertMatch({ok, {?HTTP200, _, #{<<"data">> := []}}}, list_request(Config)), ?check_trace( begin - O = #{api_port => APIPort}, ClientId = <<"c1">>, C1 = connect_client(#{port => Port1, clientid => ClientId, expiry => 1}), - assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config), ?retry( 100, 20, ?assertMatch( - {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := false}]}}}, - list_request(APIPort) + {ok, {?HTTP200, _, #{<<"data">> := [#{<<"is_expired">> := false}]}}}, + list_request(Config) ) ), ok = emqtt:disconnect(C1), %% Wait for session to be considered expired but not GC'ed ct:sleep(2_000), - assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}), + assert_single_client( + #{node => N1, clientid => ClientId, status => disconnected}, Config + ), N1Bin = atom_to_binary(N1), ?retry( 100, 20, ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"data">> := [ #{ <<"is_expired">> := true, @@ -592,17 +574,17 @@ t_persistent_sessions6(Config) -> } ] }}}, - list_request(APIPort) + list_request(Config) ) ), ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"is_expired">> := true, <<"node">> := N1Bin, <<"disconnected_at">> := <<_/binary>> }}}, - get_client_request(APIPort, ClientId) + get_client_request(ClientId, Config) ), C2 = connect_client(#{port => Port1, clientid => ClientId}), @@ -617,10 +599,9 @@ t_persistent_sessions6(Config) -> %% Check that the output of `/clients/:clientid/subscriptions' has the expected keys. t_persistent_sessions_subscriptions1(Config) -> [N1, _N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), - ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + ?assertMatch({ok, {?HTTP200, _, #{<<"data">> := []}}}, list_request(Config)), ?check_trace( begin @@ -629,7 +610,7 @@ t_persistent_sessions_subscriptions1(Config) -> {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, <<"topic/1">>, 1), ?assertMatch( {ok, - {{_, 200, _}, _, [ + {?HTTP200, _, [ #{ <<"durable">> := true, <<"node">> := <<_/binary>>, @@ -641,14 +622,14 @@ t_persistent_sessions_subscriptions1(Config) -> <<"topic">> := <<"topic/1">> } ]}}, - get_subscriptions_request(APIPort, ClientId) + get_subscriptions_request(ClientId, Config) ), %% Just disconnect ok = emqtt:disconnect(C1), ?assertMatch( {ok, - {{_, 200, _}, _, [ + {?HTTP200, _, [ #{ <<"durable">> := true, <<"node">> := null, @@ -660,7 +641,7 @@ t_persistent_sessions_subscriptions1(Config) -> <<"topic">> := <<"topic/1">> } ]}}, - get_subscriptions_request(APIPort, ClientId) + get_subscriptions_request(ClientId, Config) ), C2 = connect_client(#{port => Port1, clientid => ClientId}), @@ -671,24 +652,21 @@ t_persistent_sessions_subscriptions1(Config) -> ), ok. -t_clients_bad_value_type(_) -> +t_clients_bad_value_type(Config) -> %% get /clients - AuthHeader = [emqx_common_test_http:default_auth_header()], ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), - QsString = cow_qs:qs([{<<"ip_address">>, <<"127.0.0.1:8080">>}]), - {ok, 400, Resp} = emqx_mgmt_api_test_util:request_api( - get, ClientsPath, QsString, AuthHeader, [], #{compatible_mode => true} - ), + QS = cow_qs:qs([{<<"ip_address">>, <<"127.0.0.1:8080">>}]), ?assertMatch( - #{ - <<"code">> := <<"INVALID_PARAMETER">>, - <<"message">> := - <<"the ip_address parameter expected type is ip, but the value is 127.0.0.1:8080">> - }, - emqx_utils_json:decode(Resp, [return_maps]) + {error, + {?HTTP400, _, #{ + <<"code">> := <<"INVALID_PARAMETER">>, + <<"message">> := + <<"the ip_address parameter expected type is ip, but the value is 127.0.0.1:8080">> + }}}, + request(get, ClientsPath, [], QS, Config) ). -t_authz_cache(_) -> +t_authz_cache(Config) -> ClientId = <<"client_authz">>, {ok, C} = emqtt:start_link(#{clientid => ClientId}), @@ -701,25 +679,22 @@ t_authz_cache(_) -> "authorization", "cache" ]), - {ok, ClientAuthzCache} = emqx_mgmt_api_test_util:request_api(get, ClientAuthzCachePath), ?assertMatch( - [ - #{ - <<"access">> := - #{<<"action_type">> := <<"subscribe">>, <<"qos">> := 1}, - <<"result">> := <<"allow">>, - <<"topic">> := <<"topic/1">>, - <<"updated_time">> := _ - } - ], - emqx_utils_json:decode(ClientAuthzCache, [return_maps]) + {ok, + {{_HTTP, 200, _}, _, [ + #{ + <<"access">> := #{<<"action_type">> := <<"subscribe">>, <<"qos">> := 1}, + <<"result">> := <<"allow">>, + <<"topic">> := <<"topic/1">>, + <<"updated_time">> := _ + } + ]}}, + request(get, ClientAuthzCachePath, Config) ), ok = emqtt:stop(C). -t_kickout_clients(_) -> - process_flag(trap_exit, true), - +t_kickout_clients(Config) -> ClientId1 = <<"client1">>, ClientId2 = <<"client2">>, ClientId3 = <<"client3">>, @@ -735,6 +710,8 @@ t_kickout_clients(_) -> {ok, C3} = emqtt:start_link(#{clientid => ClientId3}), {ok, _} = emqtt:connect(C3), + _MRefs = [erlang:unlink(C) andalso erlang:monitor(process, C) || C <- [C1, C2, C3]], + emqx_common_test_helpers:wait_for( ?FUNCTION_NAME, ?LINE, @@ -754,9 +731,8 @@ t_kickout_clients(_) -> %% get /clients ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), - {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), - ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]), - ClientsMeta = maps:get(<<"meta">>, ClientsResponse), + {ok, {{_HTTP, 200, _}, _, Clients}} = request(get, ClientsPath, Config), + ClientsMeta = maps:get(<<"meta">>, Clients), ClientsPage = maps:get(<<"page">>, ClientsMeta), ClientsLimit = maps:get(<<"limit">>, ClientsMeta), ClientsCount = maps:get(<<"count">>, ClientsMeta), @@ -767,24 +743,19 @@ t_kickout_clients(_) -> %% kickout clients KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]), KickoutBody = [ClientId1, ClientId2, ClientId3], - {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), - - ReceiveExit = fun({ClientPid, ClientId}) -> - receive - {'EXIT', Pid, _} when Pid =:= ClientPid -> - ok - after 1000 -> - error({timeout, ClientId}) - end - end, - lists:foreach(ReceiveExit, [{C1, ClientId1}, {C2, ClientId2}, {C3, ClientId3}]), - {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), - ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), - ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2). - -t_query_clients_with_time(_) -> - process_flag(trap_exit, true), + ?assertMatch( + {ok, {{_HTTP, 204, _}, _, _}}, + request(post, KickoutPath, KickoutBody, Config) + ), + ?assertReceive({'DOWN', _MRef, process, C1, _}), + ?assertReceive({'DOWN', _MRef, process, C2, _}), + ?assertReceive({'DOWN', _MRef, process, C3, _}), + ?assertMatch( + {ok, {_200, _, #{<<"meta">> := #{<<"count">> := 0}}}}, + request(get, ClientsPath, Config) + ). +t_query_clients_with_time(Config) -> Username1 = <<"user1">>, ClientId1 = <<"client1">>, @@ -798,7 +769,6 @@ t_query_clients_with_time(_) -> timer:sleep(100), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), %% get /clients with time(rfc3339) NowTimeStampInt = erlang:system_time(millisecond), @@ -817,18 +787,14 @@ t_query_clients_with_time(_) -> GteParamRfc3339 = [Param ++ Rfc3339String || Param <- GteKeys], GteParamStamp = [Param ++ TimeStampString || Param <- GteKeys], - RequestResults = - [ - emqx_mgmt_api_test_util:request_api(get, ClientsPath, Param, AuthHeader) - || Param <- - LteParamRfc3339 ++ LteParamStamp ++ - GteParamRfc3339 ++ GteParamStamp - ], - DecodedResults = [ - emqx_utils_json:decode(Response, [return_maps]) - || {ok, Response} <- RequestResults + RequestResults = [ + begin + {ok, {?HTTP200, _, Result}} = request(get, ClientsPath, [], Param, Config), + Result + end + || Param <- LteParamRfc3339 ++ LteParamStamp ++ GteParamRfc3339 ++ GteParamStamp ], - {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults), + {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, RequestResults), %% EachData :: list() [ ?assert(time_string_to_epoch_millisecond(CreatedAt) < NowTimeStampInt) @@ -845,14 +811,11 @@ t_query_clients_with_time(_) -> || #{<<"data">> := EachData} <- GteResponseDecodeds ], - %% testcase cleanup, kickout client1 and client2 - Client1Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1)]), - Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]), - {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path), - {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path). + %% testcase cleanup, stop client1 and client2 + ok = emqtt:stop(C1), + ok = emqtt:stop(C2). -t_query_multiple_clients(_) -> - process_flag(trap_exit, true), +t_query_multiple_clients(Config) -> ClientIdsUsers = [ {<<"multi_client1">>, <<"multi_user1">>}, {<<"multi_client1-1">>, <<"multi_user1">>}, @@ -873,7 +836,7 @@ t_query_multiple_clients(_) -> ), timer:sleep(100), - Auth = emqx_mgmt_api_test_util:auth_header_(), + Auth = ?config(api_auth_header, Config), %% Not found clients/users ?assertEqual([], get_clients(Auth, "clientid=no_such_client")), @@ -1007,8 +970,7 @@ t_query_multiple_clients(_) -> ?assertEqual([], get_clients(Auth, AllClientsQs ++ "&like_username=nouser")), ?assertEqual([], get_clients(Auth, AllUsersQs ++ "&like_clientid=nouser")). -t_query_multiple_clients_urlencode(_) -> - process_flag(trap_exit, true), +t_query_multiple_clients_urlencode(Config) -> ClientIdsUsers = [ {<<"multi_client=a?">>, <<"multi_user=a?">>}, {<<"mutli_client=b?">>, <<"multi_user=b?">>} @@ -1023,31 +985,29 @@ t_query_multiple_clients_urlencode(_) -> ), timer:sleep(100), - Auth = emqx_mgmt_api_test_util:auth_header_(), + Auth = ?config(api_auth_header, Config), ClientsQs = uri_string:compose_query([{<<"clientid">>, C} || {C, _} <- ClientIdsUsers]), UsersQs = uri_string:compose_query([{<<"username">>, U} || {_, U} <- ClientIdsUsers]), ExpectedClients = lists:sort([C || {C, _} <- ClientIdsUsers]), ?assertEqual(ExpectedClients, lists:sort(get_clients(Auth, ClientsQs))), ?assertEqual(ExpectedClients, lists:sort(get_clients(Auth, UsersQs))). -t_query_clients_with_fields(_) -> - process_flag(trap_exit, true), +t_query_clients_with_fields(Config) -> TCBin = atom_to_binary(?FUNCTION_NAME), - APIPort = 18083, ClientId = <>, Username = <>, {ok, C} = emqtt:start_link(#{clientid => ClientId, username => Username}), {ok, _} = emqtt:connect(C), timer:sleep(100), - Auth = emqx_mgmt_api_test_util:auth_header_(), + Auth = ?config(api_auth_header, Config), ?assertEqual([#{<<"clientid">> => ClientId}], get_clients_all_fields(Auth, "fields=clientid")), ?assertMatch( {ok, - {{_, 200, _}, _, #{ + {?HTTP200, _, #{ <<"data">> := [#{<<"client_attrs">> := #{}}] }}}, - list_request(APIPort, "fields=client_attrs") + list_request(#{fields => client_attrs}, Config) ), ?assertEqual( [#{<<"clientid">> => ClientId, <<"username">> => Username}], @@ -1092,14 +1052,15 @@ get_clients(Auth, Qs, ExpectError, ClientIdOnly) -> Resp end. -t_keepalive(_Config) -> +t_keepalive(Config) -> Username = "user_keepalive", ClientId = "client_keepalive", - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "keepalive"]), Body = #{interval => 11}, - {error, {"HTTP/1.1", 404, "Not Found"}} = - emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), + ?assertMatch( + {error, {?HTTP404, _, _}}, + request(put, Path, Body, Config) + ), %% 65535 is the max value of keepalive MaxKeepalive = 65535, InitKeepalive = round(MaxKeepalive / 1.5 + 1), @@ -1110,72 +1071,86 @@ t_keepalive(_Config) -> [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), %% will reset to max keepalive if keepalive > max keepalive #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid), - ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))), + ?assertMatch( + #{interval := 65535000}, + emqx_connection:info({channel, keepalive}, sys:get_state(Pid)) + ), - {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), - #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]), - #{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid), - ?assertEqual(11, Keepalive), + ?assertMatch( + {ok, {?HTTP200, _, #{<<"keepalive">> := 11}}}, + request(put, Path, Body, Config) + ), + ?assertMatch( + #{conninfo := #{keepalive := 11}}, + emqx_connection:info(Pid) + ), %% Disable keepalive - Body1 = #{interval => 0}, - {ok, NewClient1} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body1), - #{<<"keepalive">> := 0} = emqx_utils_json:decode(NewClient1, [return_maps]), - ?assertMatch(#{conninfo := #{keepalive := 0}}, emqx_connection:info(Pid)), + ?assertMatch( + {ok, {?HTTP200, _, #{<<"keepalive">> := 0}}}, + request(put, Path, #{interval => 0}, Config) + ), + ?assertMatch( + #{conninfo := #{keepalive := 0}}, + emqx_connection:info(Pid) + ), %% Maximal keepalive - Body2 = #{interval => 65536}, - {error, {"HTTP/1.1", 400, _}} = - emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body2), - emqtt:disconnect(C1), - ok. + ?assertMatch( + {error, {?HTTP400, _, #{<<"code">> := <<"BAD_REQUEST">>}}}, + request(put, Path, #{interval => 65536}, Config) + ), + ok = emqtt:disconnect(C1). -t_client_id_not_found(_Config) -> - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Http = {"HTTP/1.1", 404, "Not Found"}, - Body = "{\"code\":\"CLIENTID_NOT_FOUND\",\"message\":\"Client ID not found\"}", +t_client_id_not_found(Config) -> + Body = #{ + <<"code">> => <<"CLIENTID_NOT_FOUND">>, + <<"message">> => <<"Client ID not found">> + }, PathFun = fun(Suffix) -> emqx_mgmt_api_test_util:api_path(["clients", "no_existed_clientid"] ++ Suffix) end, ReqFun = fun(Method, Path) -> - emqx_mgmt_api_test_util:request_api( - Method, Path, "", AuthHeader, [], #{return_all => true} - ) + request(Method, Path, [], [], Config) end, - PostFun = fun(Method, Path, Data) -> - emqx_mgmt_api_test_util:request_api( - Method, Path, "", AuthHeader, Data, #{return_all => true} - ) + request(Method, Path, Data, [], Config) end, %% Client lookup - ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun([]))), + ?assertMatch({error, {?HTTP404, _, Body}}, ReqFun(get, PathFun([]))), %% Client kickout - ?assertMatch({error, {Http, _, Body}}, ReqFun(delete, PathFun([]))), + ?assertMatch({error, {?HTTP404, _, Body}}, ReqFun(delete, PathFun([]))), %% Client Subscription list - ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["subscriptions"]))), + ?assertMatch({error, {?HTTP404, _, Body}}, ReqFun(get, PathFun(["subscriptions"]))), %% AuthZ Cache lookup - ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["authorization", "cache"]))), + ?assertMatch({error, {?HTTP404, _, Body}}, ReqFun(get, PathFun(["authorization", "cache"]))), %% AuthZ Cache clean - ?assertMatch({error, {Http, _, Body}}, ReqFun(delete, PathFun(["authorization", "cache"]))), + ?assertMatch( + {error, {?HTTP404, _, Body}}, + ReqFun(delete, PathFun(["authorization", "cache"])) + ), %% Client Subscribe SubBody = #{topic => <<"testtopic">>, qos => 1, nl => 1, rh => 1}, - ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["subscribe"]), SubBody)), + ?assertMatch({error, {?HTTP404, _, Body}}, PostFun(post, PathFun(["subscribe"]), SubBody)), ?assertMatch( - {error, {Http, _, Body}}, PostFun(post, PathFun(["subscribe", "bulk"]), [SubBody]) + {error, {?HTTP404, _, Body}}, PostFun(post, PathFun(["subscribe", "bulk"]), [SubBody]) ), %% Client Unsubscribe UnsubBody = #{topic => <<"testtopic">>}, - ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)), ?assertMatch( - {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody]) + {error, {?HTTP404, _, Body}}, + PostFun(post, PathFun(["unsubscribe"]), UnsubBody) + ), + ?assertMatch( + {error, {?HTTP404, _, Body}}, + PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody]) ), %% Mqueue messages - ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))), + ?assertMatch({error, {?HTTP404, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))), %% Inflight messages - ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))). + ?assertMatch({error, {?HTTP404, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))). -t_sessions_count(_Config) -> +t_sessions_count(Config) -> ClientId = atom_to_binary(?FUNCTION_NAME), Topic = <<"t/test_sessions_count">>, Conf0 = emqx_config:get([broker]), @@ -1192,21 +1167,16 @@ t_sessions_count(_Config) -> {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, Topic, 1), Path = emqx_mgmt_api_test_util:api_path(["sessions_count"]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), ?assertMatch( - {ok, "1"}, - emqx_mgmt_api_test_util:request_api( - get, Path, "since=" ++ integer_to_list(Since), AuthHeader - ) + {ok, {?HTTP200, _, 1}}, + request(get, Path, [], "since=" ++ integer_to_list(Since), Config) ), ok = emqtt:disconnect(Client), %% simulate the situation in which the process is not running ok = supervisor:terminate_child(emqx_cm_sup, emqx_cm_registry_keeper), ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api( - get, Path, "since=" ++ integer_to_list(Since), AuthHeader - ) + {error, {?HTTP400, _, _}}, + request(get, Path, [], "since=" ++ integer_to_list(Since), Config) ), %% restore default value ok = emqx_config:put(#{broker => Conf0}), @@ -1220,21 +1190,18 @@ t_mqueue_messages(Config) -> ok = client_with_mqueue(ClientId, Topic, Count), Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]), ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - IsMqueue = true, - test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config), IsMqueue), + + AuthHeader = ?config(api_auth_header, Config), + PayloadEncoding = ?config(payload_encoding, Config), + test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, _IsMqueue = true), ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api( - get, Path, "limit=10&position=not-valid", AuthHeader - ) + {error, {?HTTP400, _, #{<<"code">> := <<"INVALID_PARAMETER">>}}}, + request(get, Path, [], "limit=10&position=not-valid", Config) ), ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api( - get, Path, "limit=-5&position=not-valid", AuthHeader - ) + {error, {?HTTP400, _, #{<<"code">> := <<"BAD_REQUEST">>}}}, + request(get, Path, [], "limit=-5&position=not-valid", Config) ). t_inflight_messages(Config) -> @@ -1244,23 +1211,18 @@ t_inflight_messages(Config) -> {ok, Client} = client_with_inflight(ClientId, Topic, PubCount), Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]), InflightLimit = emqx:get_config([mqtt, max_inflight]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - IsMqueue = false, - test_messages( - Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config), IsMqueue - ), + + AuthHeader = ?config(api_auth_header, Config), + PayloadEncoding = ?config(payload_encoding, Config), + test_messages(Path, Topic, InflightLimit, AuthHeader, PayloadEncoding, _IsMqueue = false), ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api( - get, Path, "limit=10&position=not-int", AuthHeader - ) + {error, {?HTTP400, _, #{<<"code">> := <<"INVALID_PARAMETER">>}}}, + request(get, Path, [], "limit=10&position=not-int", Config) ), ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api( - get, Path, "limit=-5&position=invalid-int", AuthHeader - ) + {error, {?HTTP400, _, #{<<"code">> := <<"BAD_REQUEST">>}}}, + request(get, Path, [], "limit=-5&position=invalid-int", Config) ), emqtt:stop(Client). @@ -1447,7 +1409,7 @@ decode_payload(Payload, base64) -> decode_payload(Payload, _) -> Payload. -t_subscribe_shared_topic(_Config) -> +t_subscribe_shared_topic(Config) -> ClientId = <<"client_subscribe_shared">>, {ok, C} = emqtt:start_link(#{clientid => ClientId}), @@ -1457,19 +1419,8 @@ t_subscribe_shared_topic(_Config) -> {ok, PC} = emqtt:start_link(#{clientid => ClientPuber}), {ok, _} = emqtt:connect(PC), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - - Http200 = {"HTTP/1.1", 200, "OK"}, - Http204 = {"HTTP/1.1", 204, "No Content"}, - PathFun = fun(Suffix) -> - emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix) - end, - - PostFun = fun(Method, Path, Data) -> - emqx_mgmt_api_test_util:request_api( - Method, Path, "", AuthHeader, Data, #{return_all => true} - ) + emqx_mgmt_api_test_util:api_path(["clients", ClientId] ++ Suffix) end, SharedT = <<"$share/group/testtopic">>, @@ -1481,15 +1432,16 @@ t_subscribe_shared_topic(_Config) -> %% ==================== %% Client Subscribe ?assertMatch( - {ok, {Http200, _, _}}, - PostFun(post, PathFun(["subscribe"]), SubBodyFun(SharedT)) + {ok, {?HTTP200, _, _}}, + request(post, PathFun(["subscribe"]), SubBodyFun(SharedT), Config) ), ?assertMatch( - {ok, {Http200, _, _}}, - PostFun( + {ok, {?HTTP200, _, _}}, + request( post, PathFun(["subscribe", "bulk"]), - [SubBodyFun(T) || T <- [SharedT, NonSharedT]] + [SubBodyFun(T) || T <- [SharedT, NonSharedT]], + Config ) ), @@ -1525,15 +1477,16 @@ t_subscribe_shared_topic(_Config) -> %% ==================== %% Client Unsubscribe ?assertMatch( - {ok, {Http204, _, _}}, - PostFun(post, PathFun(["unsubscribe"]), UnSubBodyFun(SharedT)) + {ok, {?HTTP204, _, _}}, + request(post, PathFun(["unsubscribe"]), UnSubBodyFun(SharedT), Config) ), ?assertMatch( - {ok, {Http204, _, _}}, - PostFun( + {ok, {?HTTP204, _, _}}, + request( post, PathFun(["unsubscribe", "bulk"]), - [UnSubBodyFun(T) || T <- [SharedT, NonSharedT]] + [UnSubBodyFun(T) || T <- [SharedT, NonSharedT]], + Config ) ), @@ -1548,34 +1501,25 @@ t_subscribe_shared_topic(_Config) -> ?assertNotReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg3">>}}), ?assertNotReceive({publish, #{topic := <<"t/1">>, payload := <<"msg4">>}}). -t_subscribe_shared_topic_nl(_Config) -> - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Http400 = {"HTTP/1.1", 400, "Bad Request"}, - Body = - "{\"code\":\"INVALID_PARAMETER\"," - "\"message\":\"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]\"}", +t_subscribe_shared_topic_nl(Config) -> ClientId = <<"client_subscribe_shared">>, - {ok, C} = emqtt:start_link(#{clientid => ClientId}), {ok, _} = emqtt:connect(C), - PathFun = fun(Suffix) -> - emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix) - end, - PostFun = fun(Method, Path, Data) -> - emqx_mgmt_api_test_util:request_api( - Method, Path, "", AuthHeader, Data, #{return_all => true} - ) - end, - T = <<"$share/group/testtopic">>, + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "subscribe"]), + Topic = <<"$share/group/testtopic">>, ?assertMatch( - {error, {Http400, _, Body}}, - PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) + {error, + {?HTTP400, _, #{ + <<"code">> := <<"INVALID_PARAMETER">>, + <<"message">> := + <<"Invalid Subscribe options: `no_local` not allowed for shared-sub", _/bytes>> + }}}, + request(post, Path, #{topic => Topic, qos => 1, nl => 1, rh => 1}, Config) ). t_list_clients_v2(Config) -> [N1, N2] = ?config(nodes, Config), - APIPort = 18084, Port1 = get_mqtt_port(N1, tcp), Port2 = get_mqtt_port(N2, tcp), @@ -1612,7 +1556,7 @@ t_list_clients_v2(Config) -> %% one by one QueryParams1 = #{limit => "1"}, - Res1 = list_all_v2(APIPort, QueryParams1), + Res1 = list_all_v2(QueryParams1, Config), ?assertMatch( [ #{ @@ -1674,11 +1618,11 @@ t_list_clients_v2(Config) -> assert_contains_clientids(Res1, AllClientIds), %% Reusing the same cursors yield the same pages - traverse_in_reverse_v2(APIPort, QueryParams1, Res1), + traverse_in_reverse_v2(QueryParams1, Res1, Config), %% paging QueryParams2 = #{limit => "4"}, - Res2 = list_all_v2(APIPort, QueryParams2), + Res2 = list_all_v2(QueryParams2, Config), ?assertMatch( [ #{ @@ -1702,10 +1646,10 @@ t_list_clients_v2(Config) -> Res2 ), assert_contains_clientids(Res2, AllClientIds), - traverse_in_reverse_v2(APIPort, QueryParams2, Res2), + traverse_in_reverse_v2(QueryParams2, Res2, Config), QueryParams3 = #{limit => "2"}, - Res3 = list_all_v2(APIPort, QueryParams3), + Res3 = list_all_v2(QueryParams3, Config), ?assertMatch( [ #{ @@ -1738,11 +1682,11 @@ t_list_clients_v2(Config) -> Res3 ), assert_contains_clientids(Res3, AllClientIds), - traverse_in_reverse_v2(APIPort, QueryParams3, Res3), + traverse_in_reverse_v2(QueryParams3, Res3, Config), %% fuzzy filters QueryParams4 = #{limit => "100", like_clientid => "ca"}, - Res4 = list_all_v2(APIPort, QueryParams4), + Res4 = list_all_v2(QueryParams4, Config), ?assertMatch( [ #{ @@ -1757,9 +1701,9 @@ t_list_clients_v2(Config) -> Res4 ), assert_contains_clientids(Res4, [ClientId1, ClientId4, ClientId5]), - traverse_in_reverse_v2(APIPort, QueryParams4, Res4), + traverse_in_reverse_v2(QueryParams4, Res4, Config), QueryParams5 = #{limit => "1", like_clientid => "ca"}, - Res5 = list_all_v2(APIPort, QueryParams5), + Res5 = list_all_v2(QueryParams5, Config), ?assertMatch( [ #{ @@ -1792,7 +1736,7 @@ t_list_clients_v2(Config) -> Res5 ), assert_contains_clientids(Res5, [ClientId1, ClientId4, ClientId5]), - traverse_in_reverse_v2(APIPort, QueryParams5, Res5), + traverse_in_reverse_v2(QueryParams5, Res5, Config), lists:foreach( fun(C) -> @@ -1812,7 +1756,7 @@ t_list_clients_v2(Config) -> ?assertMatch( {error, {{_, 400, _}, _, #{<<"message">> := <<"bad cursor">>}}}, - list_v2_request(APIPort, #{limit => "1", cursor => EvilAtomBin}) + list_v2_request(#{limit => "1", cursor => EvilAtomBin}, Config) ), %% Verify that the atom was not created erpc:call(N1, fun() -> @@ -1903,11 +1847,14 @@ get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. -request(Method, Path, Params) -> - request(Method, Path, Params, _QueryParams = ""). +request(Method, Path, Config) -> + request(Method, Path, _Params = [], _QueryParams = "", Config). -request(Method, Path, Params, QueryParams) -> - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), +request(Method, Path, Params, Config) -> + request(Method, Path, Params, _QueryParams = "", Config). + +request(Method, Path, Params, QueryParams, Config) -> + AuthHeader = ?config(api_auth_header, Config), Opts = #{return_all => true}, case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of {ok, {Status, Headers, Body0}} -> @@ -1935,37 +1882,32 @@ maybe_json_decode(X) -> {error, _} -> X end. -get_subscriptions_request(APIPort, ClientId) -> - Host = "http://127.0.0.1:" ++ integer_to_list(APIPort), - Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]), - request(get, Path, []). +get_subscriptions_request(ClientId, Config) -> + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "subscriptions"]), + request(get, Path, [], Config). -get_client_request(Port, ClientId) -> - Host = "http://127.0.0.1:" ++ integer_to_list(Port), - Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]), - request(get, Path, []). +get_client_request(ClientId, Config) -> + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId]), + request(get, Path, [], Config). -list_request(Port) -> - list_request(Port, _QueryParams = ""). +list_request(Config) -> + list_request(_QueryString = "", Config). -list_request(Port, QueryParams) -> - Host = "http://127.0.0.1:" ++ integer_to_list(Port), - Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]), - request(get, Path, [], QueryParams). +list_request(QueryParams, Config) -> + Path = emqx_mgmt_api_test_util:api_path(["clients"]), + request(get, Path, [], compose_query_string(QueryParams), Config). -list_v2_request(Port, QueryParams = #{}) -> - Host = "http://127.0.0.1:" ++ integer_to_list(Port), - Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]), - QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))), - request(get, Path, [], QS). +list_v2_request(QueryParams, Config) -> + Path = emqx_mgmt_api_test_util:api_path(["clients_v2"]), + request(get, Path, [], compose_query_string(QueryParams), Config). -list_all_v2(Port, QueryParams = #{}) -> - do_list_all_v2(Port, QueryParams, _Acc = []). +list_all_v2(QueryParams = #{}, Config) -> + do_list_all_v2(QueryParams, Config, _Acc = []). -do_list_all_v2(Port, QueryParams, Acc) -> - case list_v2_request(Port, QueryParams) of +do_list_all_v2(QueryParams, Config, Acc) -> + case list_v2_request(QueryParams, Config) of {ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"cursor">> := Cursor}}}} -> - do_list_all_v2(Port, QueryParams#{cursor => Cursor}, [Resp | Acc]); + do_list_all_v2(QueryParams#{cursor => Cursor}, Config, [Resp | Acc]); {ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"hasnext">> := false}}}} -> lists:reverse([Resp | Acc]); Other -> @@ -1978,17 +1920,20 @@ do_list_all_v2(Port, QueryParams, Acc) -> ) end. -lookup_request(ClientId) -> - lookup_request(ClientId, 18083). +lookup_request(ClientId, Config) -> + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId]), + request(get, Path, [], Config). -lookup_request(ClientId, Port) -> - Host = "http://127.0.0.1:" ++ integer_to_list(Port), - Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]), - request(get, Path, []). +compose_query_string(QueryParams = #{}) -> + QPList = maps:to_list(QueryParams), + uri_string:compose_query( + [{emqx_utils_conv:bin(K), emqx_utils_conv:bin(V)} || {K, V} <- QPList] + ); +compose_query_string(QueryString) when is_list(QueryString) -> + QueryString. -assert_single_client(Opts) -> +assert_single_client(Opts, Config) -> #{ - api_port := APIPort, clientid := ClientId, node := Node, status := Connected @@ -2007,7 +1952,7 @@ assert_single_client(Opts) -> <<"data">> := [#{<<"connected">> := IsConnected}], <<"meta">> := #{<<"count">> := 1} }}}, - list_request(APIPort) + list_request(Config) ) ), ?retry( @@ -2015,13 +1960,13 @@ assert_single_client(Opts) -> 20, ?assertMatch( {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}}, - list_request(APIPort, "node=" ++ atom_to_list(Node)), + list_request("node=" ++ atom_to_list(Node), Config), #{node => Node} ) ), ?assertMatch( {ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}}, - lookup_request(ClientId, APIPort) + lookup_request(ClientId, Config) ), ?assertMatch( {ok, @@ -2034,7 +1979,7 @@ assert_single_client(Opts) -> <<"listener">> := _, <<"clean_start">> := _ }}}, - get_client_request(APIPort, ClientId) + get_client_request(ClientId, Config) ), ok. @@ -2071,7 +2016,7 @@ assert_contains_clientids(Results, ExpectedClientIds) -> #{results => Results} ). -traverse_in_reverse_v2(APIPort, QueryParams0, Results) -> +traverse_in_reverse_v2(QueryParams0, Results, Config) -> Cursors0 = lists:map( fun(#{<<"meta">> := Meta}) -> @@ -2087,18 +2032,18 @@ traverse_in_reverse_v2(APIPort, QueryParams0, Results) -> ], ReverseCursors = lists:reverse(Cursors1), do_traverse_in_reverse_v2( - APIPort, QueryParams0, ReverseCursors, DirectOrderClientIds, _Acc = [] + QueryParams0, Config, ReverseCursors, DirectOrderClientIds, _Acc = [] ). -do_traverse_in_reverse_v2(_APIPort, _QueryParams0, _Cursors = [], DirectOrderClientIds, Acc) -> +do_traverse_in_reverse_v2(_QueryParams0, _Config, _Cursors = [], DirectOrderClientIds, Acc) -> ?assertEqual(DirectOrderClientIds, Acc); -do_traverse_in_reverse_v2(APIPort, QueryParams0, [Cursor | Rest], DirectOrderClientIds, Acc) -> +do_traverse_in_reverse_v2(QueryParams0, Config, [Cursor | Rest], DirectOrderClientIds, Acc) -> QueryParams = QueryParams0#{cursor => Cursor}, - Res0 = list_v2_request(APIPort, QueryParams), + Res0 = list_v2_request(QueryParams, Config), ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := _}}}, Res0), {ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0, ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows], - do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc). + do_traverse_in_reverse_v2(QueryParams0, Config, Rest, DirectOrderClientIds, ClientIds ++ Acc). disconnect_and_destroy_session(Client) -> ok = emqtt:disconnect(Client, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}).