refactor(mgmt): move persistent session counting function to mgmt module
Follow up to https://github.com/emqx/emqx/pull/12500#discussion_r1496634087
This commit is contained in:
parent
529211b9ac
commit
03138f8345
|
@ -36,11 +36,7 @@
|
||||||
-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
|
-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
|
||||||
-export([get_subscriptions/1, put_subscription/4, del_subscription/3]).
|
-export([get_subscriptions/1, put_subscription/4, del_subscription/3]).
|
||||||
|
|
||||||
-export([
|
-export([make_session_iterator/0, session_iterator_next/2]).
|
||||||
make_session_iterator/0,
|
|
||||||
session_iterator_next/2,
|
|
||||||
session_count/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export_type([
|
-export_type([
|
||||||
t/0,
|
t/0,
|
||||||
|
@ -369,12 +365,6 @@ del_rank(Key, Rec) ->
|
||||||
fold_ranks(Fun, Acc, Rec) ->
|
fold_ranks(Fun, Acc, Rec) ->
|
||||||
gen_fold(ranks, Fun, Acc, Rec).
|
gen_fold(ranks, Fun, Acc, Rec).
|
||||||
|
|
||||||
-spec session_count() -> non_neg_integer().
|
|
||||||
session_count() ->
|
|
||||||
%% N.B.: this is potentially costly. Should not be called in hot paths.
|
|
||||||
%% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse...
|
|
||||||
do_session_count(make_session_iterator(), 0).
|
|
||||||
|
|
||||||
-spec make_session_iterator() -> session_iterator().
|
-spec make_session_iterator() -> session_iterator().
|
||||||
make_session_iterator() ->
|
make_session_iterator() ->
|
||||||
mnesia:dirty_first(?session_tab).
|
mnesia:dirty_first(?session_tab).
|
||||||
|
@ -577,16 +567,6 @@ ro_transaction(Fun) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
do_session_count('$end_of_table', N) ->
|
|
||||||
N;
|
|
||||||
do_session_count(Cursor, N) ->
|
|
||||||
case session_iterator_next(Cursor, 1) of
|
|
||||||
{[], _} ->
|
|
||||||
N;
|
|
||||||
{_, NextCursor} ->
|
|
||||||
do_session_count(NextCursor, N + 1)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-compile({inline, check_sequence/1}).
|
-compile({inline, check_sequence/1}).
|
||||||
|
|
||||||
-ifdef(CHECK_SEQNO).
|
-ifdef(CHECK_SEQNO).
|
||||||
|
|
|
@ -916,7 +916,7 @@ add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
|
||||||
%% to traverse the whole table), but also hard to deduplicate live connections
|
%% to traverse the whole table), but also hard to deduplicate live connections
|
||||||
%% from it... So this count will possibly overshoot the true count of
|
%% from it... So this count will possibly overshoot the true count of
|
||||||
%% sessions.
|
%% sessions.
|
||||||
SessionCount = emqx_persistent_session_ds_state:session_count(),
|
SessionCount = persistent_session_count(),
|
||||||
Totals = Totals0#{undefined => SessionCount},
|
Totals = Totals0#{undefined => SessionCount},
|
||||||
QueryState0#{total := Totals};
|
QueryState0#{total := Totals};
|
||||||
false ->
|
false ->
|
||||||
|
@ -965,6 +965,22 @@ no_persistent_sessions() ->
|
||||||
true
|
true
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec persistent_session_count() -> non_neg_integer().
|
||||||
|
persistent_session_count() ->
|
||||||
|
%% N.B.: this is potentially costly. Should not be called in hot paths.
|
||||||
|
%% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse...
|
||||||
|
do_persistent_session_count(init_persistent_session_iterator(), 0).
|
||||||
|
|
||||||
|
do_persistent_session_count('$end_of_table', N) ->
|
||||||
|
N;
|
||||||
|
do_persistent_session_count(Cursor, N) ->
|
||||||
|
case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of
|
||||||
|
{[], _} ->
|
||||||
|
N;
|
||||||
|
{_, NextCursor} ->
|
||||||
|
do_persistent_session_count(NextCursor, N + 1)
|
||||||
|
end.
|
||||||
|
|
||||||
do_persistent_session_query(ResultAcc, QueryState) ->
|
do_persistent_session_query(ResultAcc, QueryState) ->
|
||||||
case emqx_persistent_message:is_persistence_enabled() of
|
case emqx_persistent_message:is_persistence_enabled() of
|
||||||
true ->
|
true ->
|
||||||
|
|
Loading…
Reference in New Issue