diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl index 46e45233d..c80bbd456 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_router.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -33,7 +33,10 @@ ]). %% Topics API --export([stream/1]). +-export([ + stream/1, + stats/1 +]). -export([cleanup_routes/1]). -export([print_routes/1]). @@ -211,6 +214,14 @@ foldr_routes(FoldFun, AccIn) -> stream(MTopic) -> emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)). +%% @doc Retrieve router stats. +%% n_routes: total number of routes, should be equal to the length of `stream('_')`. +-spec stats(n_routes) -> non_neg_integer(). +stats(n_routes) -> + NTopics = ets:info(?PS_ROUTER_TAB, size), + NFilters = ets:info(?PS_FILTERS_TAB, size), + emqx_maybe:define(NTopics, 0) + emqx_maybe:define(NFilters, 0). + %%-------------------------------------------------------------------- %% Internal fns %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 845ac5af2..1cb12f8f3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -179,6 +179,17 @@ mk_persistent_topic_stream(Spec) -> emqx_utils_stream:empty() end. +eval_count() -> + emqx_router:stats(n_routes) + eval_persistent_count(). + +eval_persistent_count() -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + emqx_persistent_session_ds_router:stats(n_routes); + false -> + 0 + end. + eval_topic_query(Stream, QState = #{limit := Limit}, QResult) -> case emqx_utils_stream:consume(Limit, Stream) of {Rows, NStream} -> @@ -209,7 +220,7 @@ format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) -> format_response_meta(Meta, _Query, #{hasnext := HasNext, complete := true, cursor := Cursor}) -> Meta#{hasnext => HasNext, count => Cursor}; format_response_meta(Meta, _Query = {[], []}, #{hasnext := HasNext}) -> - Meta#{hasnext => HasNext, count => emqx_router:stats(n_routes)}; + Meta#{hasnext => HasNext, count => eval_count()}; format_response_meta(Meta, _Query, #{hasnext := HasNext}) -> Meta#{hasnext => HasNext}. diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index 7426f9734..55113c9e2 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -246,13 +246,21 @@ t_persistent_topics(_Config) -> lists:sort(maps:get(<<"data">>, Matched)) ), %% Are results the same when paginating? - #{<<"data">> := Page1} = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]), + #{<<"data">> := Page1} = R1 = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]), #{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]), #{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]), ?assertEqual( lists:sort(Expected), lists:sort(Page1 ++ Page2 ++ Page3) ), + %% Count respects persistent sessions. + ?assertMatch( + #{ + <<"meta">> := #{<<"page">> := 1, <<"limit">> := 3, <<"count">> := 8}, + <<"data">> := [_, _, _] + }, + R1 + ), %% Filtering by node makes no sense for persistent sessions. ?assertMatch( #{