fix(subs mgmt api): attempt to return mixed clients ds/non-ds in the same page

Fixes https://emqx.atlassian.net/browse/EMQX-12294
This commit is contained in:
Thales Macedo Garitezi 2024-05-10 12:04:58 -03:00
parent c491b83857
commit e25fe62cbc
3 changed files with 135 additions and 9 deletions

View File

@ -310,7 +310,12 @@ consume_n_matching(Map, Pred, N, S) ->
consume_n_matching(_Map, _Pred, _N, [], Acc) ->
{lists:reverse(Acc), []};
consume_n_matching(_Map, _Pred, 0, S, Acc) ->
{lists:reverse(Acc), S};
case emqx_utils_stream:next(S) of
[] ->
{lists:reverse(Acc), []};
_ ->
{lists:reverse(Acc), S}
end;
consume_n_matching(Map, Pred, N, S0, Acc) ->
case emqx_utils_stream:next(S0) of
[] ->
@ -396,11 +401,16 @@ merge_queries(QString0, Q1, Q2) ->
Q2Page = ceil(C1 / Limit),
case Page =< Q2Page of
true ->
#{data := Data, meta := #{hasnext := HN}} = Q1(QString0),
#{
data => Data,
meta => Meta#{hasnext => HN orelse C2 > 0}
};
#{data := Data1, meta := #{hasnext := HN1}} = Q1(QString0),
maybe_fetch_from_second_query(#{
rows1 => Data1,
limit => Limit,
hasnext1 => HN1,
meta => Meta,
count2 => C2,
query2 => Q2,
query_string => QString0
});
false ->
QString = QString0#{<<"page">> => Page - Q2Page},
#{data := Data, meta := #{hasnext := HN}} = Q2(QString),
@ -421,6 +431,31 @@ merge_queries(QString0, Q1, Q2) ->
}
end.
maybe_fetch_from_second_query(Params) ->
#{
rows1 := Data1,
limit := Limit,
hasnext1 := HN1,
meta := Meta,
count2 := C2,
query2 := Q2,
query_string := QString0
} = Params,
NumRows1 = length(Data1),
{Data, HN} =
case NumRows1 >= Limit of
true ->
{Data1, HN1 orelse C2 > 0};
false ->
#{data := Data2, meta := #{hasnext := HN2}} =
Q2(QString0#{<<"limit">> := Limit - NumRows1}),
{Data1 ++ Data2, HN1 or HN2}
end,
#{
data => Data,
meta => Meta#{hasnext => HN}
}.
resp_count(Query, QFun) ->
#{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}),
maps:get(count, Meta, undefined).

View File

@ -24,7 +24,6 @@
-include_lib("proper/include/proper.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
all() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),

View File

@ -20,6 +20,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
@ -42,11 +43,18 @@ all() ->
].
groups() ->
CommonTCs = emqx_common_test_helpers:all(?MODULE),
AllTCs = emqx_common_test_helpers:all(?MODULE),
CommonTCs = AllTCs -- persistent_only_tcs(),
[
{mem, CommonTCs},
%% Shared subscriptions are currently not supported:
{persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]}
{persistent,
(CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()}
].
persistent_only_tcs() ->
[
t_mixed_persistent_sessions
].
init_per_suite(Config) ->
@ -158,6 +166,51 @@ t_subscription_api(Config) ->
SubscriptionsList2 = maps:get(<<"data">>, DataTopic2),
?assertEqual(length(SubscriptionsList2), 1).
%% Checks a few edge cases where persistent and non-persistent client subscriptions exist.
t_mixed_persistent_sessions(Config) ->
ClientConfig = ?config(client_config, Config),
PersistentClient = ?config(client, Config),
{ok, MemClient} = emqtt:start_link(ClientConfig#{clientid => <<"mem">>, properties => #{}}),
{ok, _} = emqtt:connect(MemClient),
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(PersistentClient, <<"t/1">>, 1),
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(MemClient, <<"t/1">>, 1),
%% First page with sufficient limit should have both mem and DS clients.
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_, _],
<<"meta">> :=
#{
<<"hasnext">> := false,
<<"count">> := 2
}
}}},
get_subs(#{page => "1"})
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_],
<<"meta">> := #{<<"hasnext">> := true}
}}},
get_subs(#{page => "1", limit => "1"})
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_],
<<"meta">> := #{<<"hasnext">> := false}
}}},
get_subs(#{page => "2", limit => "1"})
),
emqtt:disconnect(MemClient),
ok.
t_subscription_fuzzy_search(Config) ->
Client = proplists:get_value(client, Config),
Durable = atom_to_list(?config(durable, Config)),
@ -272,3 +325,42 @@ request_json(Method, Query, Headers) when is_list(Query) ->
path() ->
emqx_mgmt_api_test_util:api_path(["subscriptions"]).
get_subs() ->
get_subs(_QueryParams = #{}).
get_subs(QueryParams = #{}) ->
QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))),
request(get, path(), [], QS).
request(Method, Path, Params) ->
request(Method, Path, Params, _QueryParams = "").
request(Method, Path, Params, QueryParams) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
Body = maybe_json_decode(Body0),
{ok, {Status, Headers, Body}};
{error, {Status, Headers, Body0}} ->
Body =
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
{ok, Decoded0 = #{<<"message">> := Msg0}} ->
Msg = maybe_json_decode(Msg0),
Decoded0#{<<"message">> := Msg};
{ok, Decoded0} ->
Decoded0;
{error, _} ->
Body0
end,
{error, {Status, Headers, Body}};
Error ->
Error
end.
maybe_json_decode(X) ->
case emqx_utils_json:safe_decode(X, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> X
end.