Merge pull request #12757 from keynslug/fix/EMQX-12050/topics-api-count
fix(topics-api): respond with correct totals to paged queries
This commit is contained in:
commit
25c07425a3
|
@ -33,7 +33,10 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Topics API
|
%% Topics API
|
||||||
-export([stream/1]).
|
-export([
|
||||||
|
stream/1,
|
||||||
|
stats/1
|
||||||
|
]).
|
||||||
|
|
||||||
-export([cleanup_routes/1]).
|
-export([cleanup_routes/1]).
|
||||||
-export([print_routes/1]).
|
-export([print_routes/1]).
|
||||||
|
@ -211,6 +214,14 @@ foldr_routes(FoldFun, AccIn) ->
|
||||||
stream(MTopic) ->
|
stream(MTopic) ->
|
||||||
emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)).
|
emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)).
|
||||||
|
|
||||||
|
%% @doc Retrieve router stats.
|
||||||
|
%% n_routes: total number of routes, should be equal to the length of `stream('_')`.
|
||||||
|
-spec stats(n_routes) -> non_neg_integer().
|
||||||
|
stats(n_routes) ->
|
||||||
|
NTopics = ets:info(?PS_ROUTER_TAB, size),
|
||||||
|
NFilters = ets:info(?PS_FILTERS_TAB, size),
|
||||||
|
emqx_maybe:define(NTopics, 0) + emqx_maybe:define(NFilters, 0).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal fns
|
%% Internal fns
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -179,6 +179,17 @@ mk_persistent_topic_stream(Spec) ->
|
||||||
emqx_utils_stream:empty()
|
emqx_utils_stream:empty()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
eval_count() ->
|
||||||
|
emqx_router:stats(n_routes) + eval_persistent_count().
|
||||||
|
|
||||||
|
eval_persistent_count() ->
|
||||||
|
case emqx_persistent_message:is_persistence_enabled() of
|
||||||
|
true ->
|
||||||
|
emqx_persistent_session_ds_router:stats(n_routes);
|
||||||
|
false ->
|
||||||
|
0
|
||||||
|
end.
|
||||||
|
|
||||||
eval_topic_query(Stream, QState = #{limit := Limit}, QResult) ->
|
eval_topic_query(Stream, QState = #{limit := Limit}, QResult) ->
|
||||||
case emqx_utils_stream:consume(Limit, Stream) of
|
case emqx_utils_stream:consume(Limit, Stream) of
|
||||||
{Rows, NStream} ->
|
{Rows, NStream} ->
|
||||||
|
@ -209,7 +220,7 @@ format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
|
||||||
format_response_meta(Meta, _Query, #{hasnext := HasNext, complete := true, cursor := Cursor}) ->
|
format_response_meta(Meta, _Query, #{hasnext := HasNext, complete := true, cursor := Cursor}) ->
|
||||||
Meta#{hasnext => HasNext, count => Cursor};
|
Meta#{hasnext => HasNext, count => Cursor};
|
||||||
format_response_meta(Meta, _Query = {[], []}, #{hasnext := HasNext}) ->
|
format_response_meta(Meta, _Query = {[], []}, #{hasnext := HasNext}) ->
|
||||||
Meta#{hasnext => HasNext, count => emqx_router:stats(n_routes)};
|
Meta#{hasnext => HasNext, count => eval_count()};
|
||||||
format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
|
format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
|
||||||
Meta#{hasnext => HasNext}.
|
Meta#{hasnext => HasNext}.
|
||||||
|
|
||||||
|
|
|
@ -246,13 +246,21 @@ t_persistent_topics(_Config) ->
|
||||||
lists:sort(maps:get(<<"data">>, Matched))
|
lists:sort(maps:get(<<"data">>, Matched))
|
||||||
),
|
),
|
||||||
%% Are results the same when paginating?
|
%% Are results the same when paginating?
|
||||||
#{<<"data">> := Page1} = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]),
|
#{<<"data">> := Page1} = R1 = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]),
|
||||||
#{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]),
|
#{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]),
|
||||||
#{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]),
|
#{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
lists:sort(Expected),
|
lists:sort(Expected),
|
||||||
lists:sort(Page1 ++ Page2 ++ Page3)
|
lists:sort(Page1 ++ Page2 ++ Page3)
|
||||||
),
|
),
|
||||||
|
%% Count respects persistent sessions.
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 3, <<"count">> := 8},
|
||||||
|
<<"data">> := [_, _, _]
|
||||||
|
},
|
||||||
|
R1
|
||||||
|
),
|
||||||
%% Filtering by node makes no sense for persistent sessions.
|
%% Filtering by node makes no sense for persistent sessions.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
|
|
Loading…
Reference in New Issue