diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index cc0e96ed6..05e1bdaba 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -310,7 +310,12 @@ consume_n_matching(Map, Pred, N, S) -> consume_n_matching(_Map, _Pred, _N, [], Acc) -> {lists:reverse(Acc), []}; consume_n_matching(_Map, _Pred, 0, S, Acc) -> - {lists:reverse(Acc), S}; + case emqx_utils_stream:next(S) of + [] -> + {lists:reverse(Acc), []}; + _ -> + {lists:reverse(Acc), S} + end; consume_n_matching(Map, Pred, N, S0, Acc) -> case emqx_utils_stream:next(S0) of [] -> @@ -396,11 +401,16 @@ merge_queries(QString0, Q1, Q2) -> Q2Page = ceil(C1 / Limit), case Page =< Q2Page of true -> - #{data := Data, meta := #{hasnext := HN}} = Q1(QString0), - #{ - data => Data, - meta => Meta#{hasnext => HN orelse C2 > 0} - }; + #{data := Data1, meta := #{hasnext := HN1}} = Q1(QString0), + maybe_fetch_from_second_query(#{ + rows1 => Data1, + limit => Limit, + hasnext1 => HN1, + meta => Meta, + count2 => C2, + query2 => Q2, + query_string => QString0 + }); false -> QString = QString0#{<<"page">> => Page - Q2Page}, #{data := Data, meta := #{hasnext := HN}} = Q2(QString), @@ -421,6 +431,31 @@ merge_queries(QString0, Q1, Q2) -> } end. +maybe_fetch_from_second_query(Params) -> + #{ + rows1 := Data1, + limit := Limit, + hasnext1 := HN1, + meta := Meta, + count2 := C2, + query2 := Q2, + query_string := QString0 + } = Params, + NumRows1 = length(Data1), + {Data, HN} = + case NumRows1 >= Limit of + true -> + {Data1, HN1 orelse C2 > 0}; + false -> + #{data := Data2, meta := #{hasnext := HN2}} = + Q2(QString0#{<<"limit">> := Limit - NumRows1}), + {Data1 ++ Data2, HN1 or HN2} + end, + #{ + data => Data, + meta => Meta#{hasnext => HN} + }. + resp_count(Query, QFun) -> #{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}), maps:get(count, Meta, undefined). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 39d775a7a..7a1409038 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -24,7 +24,6 @@ -include_lib("proper/include/proper.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). all() -> AllTCs = emqx_common_test_helpers:all(?MODULE), diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 435a837e3..947ee5f56 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -20,6 +20,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -define(CLIENTID, <<"api_clientid">>). -define(USERNAME, <<"api_username">>). @@ -42,11 +43,18 @@ all() -> ]. groups() -> - CommonTCs = emqx_common_test_helpers:all(?MODULE), + AllTCs = emqx_common_test_helpers:all(?MODULE), + CommonTCs = AllTCs -- persistent_only_tcs(), [ {mem, CommonTCs}, %% Shared subscriptions are currently not supported: - {persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]} + {persistent, + (CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()} + ]. + +persistent_only_tcs() -> + [ + t_mixed_persistent_sessions ]. init_per_suite(Config) -> @@ -158,6 +166,51 @@ t_subscription_api(Config) -> SubscriptionsList2 = maps:get(<<"data">>, DataTopic2), ?assertEqual(length(SubscriptionsList2), 1). +%% Checks a few edge cases where persistent and non-persistent client subscriptions exist. +t_mixed_persistent_sessions(Config) -> + ClientConfig = ?config(client_config, Config), + PersistentClient = ?config(client, Config), + {ok, MemClient} = emqtt:start_link(ClientConfig#{clientid => <<"mem">>, properties => #{}}), + {ok, _} = emqtt:connect(MemClient), + + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(PersistentClient, <<"t/1">>, 1), + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(MemClient, <<"t/1">>, 1), + + %% First page with sufficient limit should have both mem and DS clients. + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_, _], + <<"meta">> := + #{ + <<"hasnext">> := false, + <<"count">> := 2 + } + }}}, + get_subs(#{page => "1"}) + ), + + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_], + <<"meta">> := #{<<"hasnext">> := true} + }}}, + get_subs(#{page => "1", limit => "1"}) + ), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_], + <<"meta">> := #{<<"hasnext">> := false} + }}}, + get_subs(#{page => "2", limit => "1"}) + ), + + emqtt:disconnect(MemClient), + + ok. + t_subscription_fuzzy_search(Config) -> Client = proplists:get_value(client, Config), Durable = atom_to_list(?config(durable, Config)), @@ -272,3 +325,42 @@ request_json(Method, Query, Headers) when is_list(Query) -> path() -> emqx_mgmt_api_test_util:api_path(["subscriptions"]). + +get_subs() -> + get_subs(_QueryParams = #{}). + +get_subs(QueryParams = #{}) -> + QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))), + request(get, path(), [], QS). + +request(Method, Path, Params) -> + request(Method, Path, Params, _QueryParams = ""). + +request(Method, Path, Params, QueryParams) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end.