%%-------------------------------------------------------------------- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mgmt_api_subscription_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -define(CLIENTID, <<"api_clientid">>). -define(USERNAME, <<"api_username">>). %% notice: integer topic for sort response -define(TOPIC1, <<"t/0000">>). -define(TOPIC1RH, 1). -define(TOPIC1RAP, false). -define(TOPIC1NL, false). -define(TOPIC1QOS, 1). -define(TOPIC2, <<"$share/test_group/t/0001">>). -define(TOPIC2_TOPIC_ONLY, <<"t/0001">>). -define(TOPIC_SORT, #{?TOPIC1 => 1, ?TOPIC2 => 2}). all() -> [ {group, mem}, {group, persistent} ]. groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), CommonTCs = AllTCs -- persistent_only_tcs(), [ {mem, CommonTCs}, %% Shared subscriptions are currently not supported: {persistent, (CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()} ]. persistent_only_tcs() -> [ t_mixed_persistent_sessions ]. init_per_suite(Config) -> Apps = emqx_cth_suite:start( [ {emqx, "durable_sessions {\n" " enable = true\n" " renew_streams_interval = 10ms\n" "}"}, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard() ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{apps, Apps} | Config]. end_per_suite(Config) -> ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_group(persistent, Config) -> ClientConfig = #{ username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5, clean_start => true, properties => #{'Session-Expiry-Interval' => 300} }, [{client_config, ClientConfig}, {durable, true} | Config]; init_per_group(mem, Config) -> ClientConfig = #{ username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5, clean_start => true }, [{client_config, ClientConfig}, {durable, false} | Config]. end_per_group(_, Config) -> Config. init_per_testcase(_TC, Config) -> case ?config(client_config, Config) of ClientConfig when is_map(ClientConfig) -> {ok, Client} = emqtt:start_link(ClientConfig), {ok, _} = emqtt:connect(Client), [{client, Client} | Config]; _ -> Config end. end_per_testcase(_TC, Config) -> Client = proplists:get_value(client, Config), emqtt:disconnect(Client). t_subscription_api(Config) -> Client = proplists:get_value(client, Config), Durable = atom_to_list(?config(durable, Config)), {ok, _, _} = emqtt:subscribe( Client, [ {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} ] ), {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), timer:sleep(100), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), Data = emqx_utils_json:decode(Response, [return_maps]), Meta = maps:get(<<"meta">>, Data), ?assertEqual(1, maps:get(<<"page">>, Meta)), ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)), ?assertEqual(2, maps:get(<<"count">>, Meta), Data), Subscriptions = maps:get(<<"data">>, Data), ?assertEqual(length(Subscriptions), 2), Sort = fun(#{<<"topic">> := T1}, #{<<"topic">> := T2}) -> maps:get(T1, ?TOPIC_SORT) =< maps:get(T2, ?TOPIC_SORT) end, [Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions), ?assertMatch( #{ <<"topic">> := ?TOPIC1, <<"qos">> := ?TOPIC1QOS, <<"nl">> := _, <<"rap">> := _, <<"rh">> := ?TOPIC1RH, <<"clientid">> := ?CLIENTID, <<"node">> := _ }, Subscriptions1 ), ?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2), ?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID), QS = [ {"clientid", ?CLIENTID}, {"topic", ?TOPIC2_TOPIC_ONLY}, {"node", atom_to_list(node())}, {"qos", "0"}, {"share_group", "test_group"}, {"match_topic", "t/#"}, {"durable", Durable} ], Headers = emqx_mgmt_api_test_util:auth_header_(), DataTopic2 = #{<<"meta">> := Meta2} = request_json(get, QS, Headers), ?assertEqual(1, maps:get(<<"page">>, Meta2)), ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta2)), ?assertEqual(1, maps:get(<<"count">>, Meta2)), SubscriptionsList2 = maps:get(<<"data">>, DataTopic2), ?assertEqual(length(SubscriptionsList2), 1). %% Checks a few edge cases where persistent and non-persistent client subscriptions exist. t_mixed_persistent_sessions(Config) -> ClientConfig = ?config(client_config, Config), PersistentClient = ?config(client, Config), {ok, MemClient} = emqtt:start_link(ClientConfig#{clientid => <<"mem">>, properties => #{}}), {ok, _} = emqtt:connect(MemClient), {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(PersistentClient, <<"t/1">>, 1), {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(MemClient, <<"t/1">>, 1), %% First page with sufficient limit should have both mem and DS clients. ?assertMatch( {ok, {{_, 200, _}, _, #{ <<"data">> := [_, _], <<"meta">> := #{ <<"hasnext">> := false, <<"count">> := 2 } }}}, get_subs(#{page => "1"}) ), ?assertMatch( {ok, {{_, 200, _}, _, #{ <<"data">> := [_], <<"meta">> := #{<<"hasnext">> := true} }}}, get_subs(#{page => "1", limit => "1"}) ), ?assertMatch( {ok, {{_, 200, _}, _, #{ <<"data">> := [_], <<"meta">> := #{<<"hasnext">> := false} }}}, get_subs(#{page => "2", limit => "1"}) ), emqtt:disconnect(MemClient), ok. t_subscription_fuzzy_search(Config) -> Client = proplists:get_value(client, Config), Durable = atom_to_list(?config(durable, Config)), Topics = [ <<"t/foo">>, <<"t/foo/bar">>, <<"t/foo/baz">>, <<"topic/foo/bar">>, <<"topic/foo/baz">> ], _ = [{ok, _, _} = emqtt:subscribe(Client, T) || T <- Topics], Headers = emqx_mgmt_api_test_util:auth_header_(), MatchQs = [ {"clientid", ?CLIENTID}, {"node", atom_to_list(node())}, {"match_topic", "t/#"}, {"durable", Durable} ], MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers), ?assertEqual(1, maps:get(<<"page">>, MatchMeta1)), ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, MatchMeta1)), %% count is undefined in fuzzy searching ?assertNot(maps:is_key(<<"count">>, MatchMeta1)), ?assertMatch(3, length(maps:get(<<"data">>, MatchData1))), ?assertEqual(false, maps:get(<<"hasnext">>, MatchMeta1)), LimitMatchQuery = [ {"clientid", ?CLIENTID}, {"match_topic", "+/+/+"}, {"limit", "3"}, {"durable", Durable} ], MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers), ?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2), ?assertEqual(3, length(maps:get(<<"data">>, MatchData2)), MatchData2), MatchData2P2 = #{<<"meta">> := MatchMeta2P2} = request_json(get, [{"page", "2"} | LimitMatchQuery], Headers), ?assertEqual(#{<<"page">> => 2, <<"limit">> => 3, <<"hasnext">> => false}, MatchMeta2P2), ?assertEqual(1, length(maps:get(<<"data">>, MatchData2P2))). %% checks that we can list when there are subscriptions made by %% `emqx:subscribe'. t_list_with_internal_subscription(_Config) -> emqx:subscribe(<<"some/topic">>), QS = [], Headers = emqx_mgmt_api_test_util:auth_header_(), ?assertMatch( #{<<"data">> := [#{<<"clientid">> := null}]}, request_json(get, QS, Headers) ), ok. t_list_with_shared_sub(_Config) -> Client = proplists:get_value(client, _Config), RealTopic = <<"t/+">>, Topic = <<"$share/g1/", RealTopic/binary>>, {ok, _, _} = emqtt:subscribe(Client, Topic), {ok, _, _} = emqtt:subscribe(Client, RealTopic), QS = [ {"clientid", ?CLIENTID}, {"match_topic", "t/#"} ], Headers = emqx_mgmt_api_test_util:auth_header_(), ?assertMatch( #{<<"data">> := [#{<<"clientid">> := ?CLIENTID}, #{<<"clientid">> := ?CLIENTID}]}, request_json(get, QS, Headers) ), ok. t_list_with_invalid_match_topic(Config) -> Client = proplists:get_value(client, Config), RealTopic = <<"t/+">>, Topic = <<"$share/g1/", RealTopic/binary>>, {ok, _, _} = emqtt:subscribe(Client, Topic), {ok, _, _} = emqtt:subscribe(Client, RealTopic), QS = [ {"clientid", ?CLIENTID}, {"match_topic", "$share/g1/t/1"} ], Headers = emqx_mgmt_api_test_util:auth_header_(), ?assertMatch( {error, {{_, 400, _}, _, #{ <<"message">> := <<"match_topic_invalid">>, <<"code">> := <<"INVALID_PARAMETER">> }}}, begin {error, {R, _H, Body}} = emqx_mgmt_api_test_util:request_api( get, path(), uri_string:compose_query(QS), Headers, [], #{return_all => true} ), {error, {R, _H, emqx_utils_json:decode(Body, [return_maps])}} end ), ok. request_json(Method, Query, Headers) when is_list(Query) -> Qs = uri_string:compose_query(Query), {ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers), emqx_utils_json:decode(MatchRes, [return_maps]). path() -> emqx_mgmt_api_test_util:api_path(["subscriptions"]). get_subs() -> get_subs(_QueryParams = #{}). get_subs(QueryParams = #{}) -> QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))), request(get, path(), [], QS). request(Method, Path, Params) -> request(Method, Path, Params, _QueryParams = ""). request(Method, Path, Params, QueryParams) -> AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of {ok, {Status, Headers, Body0}} -> Body = maybe_json_decode(Body0), {ok, {Status, Headers, Body}}; {error, {Status, Headers, Body0}} -> Body = case emqx_utils_json:safe_decode(Body0, [return_maps]) of {ok, Decoded0 = #{<<"message">> := Msg0}} -> Msg = maybe_json_decode(Msg0), Decoded0#{<<"message">> := Msg}; {ok, Decoded0} -> Decoded0; {error, _} -> Body0 end, {error, {Status, Headers, Body}}; Error -> Error end. maybe_json_decode(X) -> case emqx_utils_json:safe_decode(X, [return_maps]) of {ok, Decoded} -> Decoded; {error, _} -> X end.