fix(topics-api): respond with correct totals to paged queries

This commit is contained in:
Andrew Mayorov 2024-03-21 16:02:16 +01:00
parent ebd039ecce
commit 7d6fde5960
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 33 additions and 3 deletions

View File

@ -33,7 +33,10 @@
]).
%% Topics API
-export([stream/1]).
-export([
stream/1,
stats/1
]).
-export([cleanup_routes/1]).
-export([print_routes/1]).
@ -211,6 +214,14 @@ foldr_routes(FoldFun, AccIn) ->
stream(MTopic) ->
emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)).
%% @doc Retrieve router stats.
%% n_routes: total number of routes, should be equal to the length of `stream('_')`.
-spec stats(n_routes) -> non_neg_integer().
stats(n_routes) ->
NTopics = ets:info(?PS_ROUTER_TAB, size),
NFilters = ets:info(?PS_FILTERS_TAB, size),
emqx_maybe:define(NTopics, 0) + emqx_maybe:define(NFilters, 0).
%%--------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------

View File

@ -179,6 +179,17 @@ mk_persistent_topic_stream(Spec) ->
emqx_utils_stream:empty()
end.
eval_count() ->
emqx_router:stats(n_routes) + eval_persistent_count().
eval_persistent_count() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
emqx_persistent_session_ds_router:stats(n_routes);
false ->
0
end.
eval_topic_query(Stream, QState = #{limit := Limit}, QResult) ->
case emqx_utils_stream:consume(Limit, Stream) of
{Rows, NStream} ->
@ -209,7 +220,7 @@ format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
format_response_meta(Meta, _Query, #{hasnext := HasNext, complete := true, cursor := Cursor}) ->
Meta#{hasnext => HasNext, count => Cursor};
format_response_meta(Meta, _Query = {[], []}, #{hasnext := HasNext}) ->
Meta#{hasnext => HasNext, count => emqx_router:stats(n_routes)};
Meta#{hasnext => HasNext, count => eval_count()};
format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
Meta#{hasnext => HasNext}.

View File

@ -246,13 +246,21 @@ t_persistent_topics(_Config) ->
lists:sort(maps:get(<<"data">>, Matched))
),
%% Are results the same when paginating?
#{<<"data">> := Page1} = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]),
#{<<"data">> := Page1} = R1 = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]),
#{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]),
#{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]),
?assertEqual(
lists:sort(Expected),
lists:sort(Page1 ++ Page2 ++ Page3)
),
%% Count respects persistent sessions.
?assertMatch(
#{
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 3, <<"count">> := 8},
<<"data">> := [_, _, _]
},
R1
),
%% Filtering by node makes no sense for persistent sessions.
?assertMatch(
#{