fix(client mgmt api): cache disconnected durable session count for `/clients` api

Fixes https://emqx.atlassian.net/browse/EMQX-12396
This commit is contained in:
Thales Macedo Garitezi 2024-05-20 10:39:34 -03:00
parent 7880353224
commit 8b4a1c3d75
5 changed files with 84 additions and 38 deletions

View File

@ -21,7 +21,8 @@
%% API
-export([
start_link/0,
get_subscription_count/0
get_subscription_count/0,
get_disconnected_session_count/0
]).
%% `gen_server' API
@ -39,7 +40,9 @@
%% call/cast/info events
-record(tally_subs, {}).
-record(tally_disconnected_sessions, {}).
-record(get_subscription_count, {}).
-record(get_disconnected_session_count, {}).
%%------------------------------------------------------------------------------
%% API
@ -59,6 +62,16 @@ get_subscription_count() ->
0
end.
%% @doc Gets a cached view of the cluster-global count of disconnected persistent sessions.
-spec get_disconnected_session_count() -> non_neg_integer().
get_disconnected_session_count() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
gen_server:call(?MODULE, #get_disconnected_session_count{}, infinity);
false ->
0
end.
%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------
@ -66,7 +79,10 @@ get_subscription_count() ->
init(_Opts) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
State = #{subs_count => 0},
State = #{
subs_count => 0,
disconnected_session_count => 0
},
{ok, State, {continue, #tally_subs{}}};
false ->
ignore
@ -75,11 +91,18 @@ init(_Opts) ->
handle_continue(#tally_subs{}, State0) ->
State = tally_persistent_subscriptions(State0),
ensure_subs_tally_timer(),
{noreply, State, {continue, #tally_disconnected_sessions{}}};
handle_continue(#tally_disconnected_sessions{}, State0) ->
State = tally_disconnected_persistent_sessions(State0),
ensure_disconnected_sessions_tally_timer(),
{noreply, State}.
handle_call(#get_subscription_count{}, _From, State) ->
#{subs_count := N} = State,
{reply, N, State};
handle_call(#get_disconnected_session_count{}, _From, State) ->
#{disconnected_session_count := N} = State,
{reply, N, State};
handle_call(_Call, _From, State) ->
{reply, {error, bad_call}, State}.
@ -90,6 +113,10 @@ handle_info(#tally_subs{}, State0) ->
State = tally_persistent_subscriptions(State0),
ensure_subs_tally_timer(),
{noreply, State};
handle_info(#tally_disconnected_sessions{}, State0) ->
State = tally_disconnected_persistent_sessions(State0),
ensure_disconnected_sessions_tally_timer(),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@ -101,7 +128,38 @@ tally_persistent_subscriptions(State0) ->
N = emqx_persistent_session_ds_state:total_subscription_count(),
State0#{subs_count := N}.
tally_disconnected_persistent_sessions(State0) ->
N = do_tally_disconnected_persistent_sessions(),
State0#{disconnected_session_count := N}.
ensure_subs_tally_timer() ->
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
ok.
ensure_disconnected_sessions_tally_timer() ->
Timeout = emqx_config:get([durable_sessions, disconnected_session_count_refresh_interval]),
_ = erlang:send_after(Timeout, self(), #tally_disconnected_sessions{}),
ok.
do_tally_disconnected_persistent_sessions() ->
Iter = emqx_persistent_session_ds_state:make_session_iterator(),
do_tally_disconnected_persistent_sessions(Iter, 0).
do_tally_disconnected_persistent_sessions('$end_of_table', N) ->
N;
do_tally_disconnected_persistent_sessions(Iter0, N) ->
case emqx_persistent_session_ds_state:session_iterator_next(Iter0, 1) of
{[], _} ->
N;
{[{Id, _Meta}], Iter} ->
case is_live_session(Id) of
true ->
do_tally_disconnected_persistent_sessions(Iter, N);
false ->
do_tally_disconnected_persistent_sessions(Iter, N + 1)
end
end.
is_live_session(Id) ->
[] =/= emqx_cm_registry:lookup_channels(Id).

View File

@ -1719,6 +1719,14 @@ fields("durable_sessions") ->
importance => ?IMPORTANCE_HIDDEN
}
)},
{"disconnected_session_count_refresh_interval",
sc(
timeout_duration(),
#{
default => <<"5s">>,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"message_retention_period",
sc(
timeout_duration(),

View File

@ -476,6 +476,7 @@ zone_global_defaults() ->
renew_streams_interval => 5000,
session_gc_batch_size => 100,
session_gc_interval => 600000,
subscription_count_refresh_interval => 5000
subscription_count_refresh_interval => 5000,
disconnected_session_count_refresh_interval => 5000
}
}.

View File

@ -1383,7 +1383,6 @@ do_list_clients_cluster_query(
{Rows, QueryState1 = #{complete := Complete0}} ->
case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of
{enough, NResultAcc} ->
%% TODO: add persistent session count?
%% TODO: this may return `{error, _, _}'...
QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes(
Tail, QueryState1
@ -1427,8 +1426,9 @@ add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
%% to traverse the whole table), but also hard to deduplicate live connections
%% from it... So this count will possibly overshoot the true count of
%% sessions.
SessionCount = persistent_session_count(),
Totals = Totals0#{undefined => SessionCount},
DisconnectedSessionCount =
emqx_persistent_session_bookkeeper:get_disconnected_session_count(),
Totals = Totals0#{undefined => DisconnectedSessionCount},
QueryState0#{total := Totals};
false ->
QueryState0
@ -1476,27 +1476,6 @@ no_persistent_sessions() ->
true
end.
-spec persistent_session_count() -> non_neg_integer().
persistent_session_count() ->
%% N.B.: this is potentially costly. Should not be called in hot paths.
%% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse...
do_persistent_session_count(init_persistent_session_iterator(), 0).
do_persistent_session_count('$end_of_table', N) ->
N;
do_persistent_session_count(Cursor, N) ->
case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of
{[], _} ->
N;
{[{_Id, Meta}], NextCursor} ->
case is_expired(Meta) of
true ->
do_persistent_session_count(NextCursor, N);
false ->
do_persistent_session_count(NextCursor, N + 1)
end
end.
is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) ->
LastAliveAt + ExpiryInterval < erlang:system_time(millisecond).

View File

@ -79,7 +79,9 @@ end_per_suite(Config) ->
init_per_group(persistent_sessions, Config) ->
AppSpecs = [
{emqx, "durable_sessions.enable = true"},
{emqx,
"durable_sessions.enable = true\n"
"durable_sessions.disconnected_session_count_refresh_interval = 100ms"},
emqx_management
],
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
@ -457,9 +459,7 @@ t_persistent_sessions5(Config) ->
{{_, 200, _}, _, #{
<<"data">> := [_, _, _],
<<"meta">> := #{
%% TODO: if/when we fix the persistent session count, this
%% should be 4.
<<"count">> := 6,
<<"count">> := 4,
<<"hasnext">> := true
}
}}},
@ -470,9 +470,7 @@ t_persistent_sessions5(Config) ->
{{_, 200, _}, _, #{
<<"data">> := [_],
<<"meta">> := #{
%% TODO: if/when we fix the persistent session count, this
%% should be 4.
<<"count">> := 6,
<<"count">> := 4,
<<"hasnext">> := false
}
}}},
@ -489,9 +487,7 @@ t_persistent_sessions5(Config) ->
{{_, 200, _}, _, #{
<<"data">> := [_, _],
<<"meta">> := #{
%% TODO: if/when we fix the persistent session count, this
%% should be 4.
<<"count">> := 6,
<<"count">> := 4,
<<"hasnext">> := true
}
}}},
@ -1996,7 +1992,11 @@ assert_single_client(Opts) ->
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
{ok,
{{_, 200, _}, _, #{
<<"data">> := [#{<<"connected">> := IsConnected}],
<<"meta">> := #{<<"count">> := 1}
}}},
list_request(APIPort)
)
),