feat(api): add /sessions_count api to count sessions

This commit is contained in:
Zaiming (Stone) Shi 2024-01-16 14:19:31 +01:00
parent 562a2736ae
commit 509ab6f35a
7 changed files with 103 additions and 12 deletions

View File

@ -283,4 +283,4 @@ fold_hist(F, List) ->
%% Return the session registration history retain duration. %% Return the session registration history retain duration.
-spec retain_duration() -> non_neg_integer(). -spec retain_duration() -> non_neg_integer().
retain_duration() -> retain_duration() ->
emqx:get_config([broker, session_registration_history_retain]). emqx:get_config([broker, session_history_retain]).

View File

@ -15,10 +15,13 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc This module implements the global session registry history cleaner. %% @doc This module implements the global session registry history cleaner.
-module(emqx_cm_registry_cleaner). -module(emqx_cm_registry_keeper).
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/0]). -export([
start_link/0,
count/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([ -export([
@ -30,8 +33,15 @@
code_change/3 code_change/3
]). ]).
-include_lib("stdlib/include/ms_transform.hrl").
-include("emqx_cm.hrl"). -include("emqx_cm.hrl").
-define(CACHE_COUNT_THRESHOLD, 1000).
-define(MIN_COUNT_INTERVAL_SECONDS, 5).
-define(CLEANUP_CHUNK_SIZE, 10000).
-define(IS_HIST_ENABLED(RETAIN), (RETAIN > 0)).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -44,6 +54,45 @@ init(_) ->
ignore ignore
end. end.
%% @doc Count the number of sessions.
%% Include sessions which are expired since the given timestamp if `since' is greater than 0.
-spec count(non_neg_integer()) -> non_neg_integer().
count(Since) ->
Retain = retain_duration(),
Now = now_ts(),
%% Get table size if hist is not enabled or
%% Since is before the earliest possible retention time.
IsCountAll = (not ?IS_HIST_ENABLED(Retain) orelse (Now - Retain >= Since)),
case IsCountAll of
true ->
mnesia:table_info(?CHAN_REG_TAB, size);
false ->
%% make a gen call to avoid many callers doing the same concurrently
gen_server:call(?MODULE, {count, Since}, infinity)
end.
handle_call({count, Since}, _From, State) ->
{LastCountTime, LastCount} =
case State of
#{last_count_time := T, last_count := C} ->
{T, C};
_ ->
{0, 0}
end,
Now = now_ts(),
Total = mnesia:table_info(?CHAN_REG_TAB, size),
%% Always count if the table is small enough
%% or when the last count is too old
IsTableSmall = (Total < ?CACHE_COUNT_THRESHOLD),
IsLastCountOld = (Now - LastCountTime > ?MIN_COUNT_INTERVAL_SECONDS),
case IsTableSmall orelse IsLastCountOld of
true ->
Count = do_count(Since),
CountFinishedAt = now_ts(),
{reply, Count, State#{last_count_time => CountFinishedAt, last_count => Count}};
false ->
{reply, LastCount, State}
end;
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
@ -84,7 +133,7 @@ cleanup_one_chunk(NextClientId) ->
IsExpired = fun(#channel{pid = Ts}) -> IsExpired = fun(#channel{pid = Ts}) ->
is_integer(Ts) andalso (Ts < Now - Retain) is_integer(Ts) andalso (Ts < Now - Retain)
end, end,
cleanup_loop(NextClientId, 10000, IsExpired). cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired).
cleanup_loop(ClientId, 0, _IsExpired) -> cleanup_loop(ClientId, 0, _IsExpired) ->
ClientId; ClientId;
@ -114,7 +163,7 @@ is_hist_enabled() ->
%% Return the session registration history retain duration in seconds. %% Return the session registration history retain duration in seconds.
-spec retain_duration() -> non_neg_integer(). -spec retain_duration() -> non_neg_integer().
retain_duration() -> retain_duration() ->
emqx:get_config([broker, session_registration_history_retain]). emqx:get_config([broker, session_history_retain]).
cleanup_delay() -> cleanup_delay() ->
Default = timer:minutes(2), Default = timer:minutes(2),
@ -137,3 +186,7 @@ send_delay_start(Delay) ->
now_ts() -> now_ts() ->
erlang:system_time(seconds). erlang:system_time(seconds).
do_count(Since) ->
Ms = ets:fun2ms(fun(#channel{pid = V}) -> is_pid(V) orelse (is_integer(V) andalso (V >= Since)) end),
ets:select_count(?CHAN_REG_TAB, Ms).

View File

@ -49,7 +49,7 @@ init([]) ->
Locker = child_spec(emqx_cm_locker, 5000, worker), Locker = child_spec(emqx_cm_locker, 5000, worker),
CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]), CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
Registry = child_spec(emqx_cm_registry, 5000, worker), Registry = child_spec(emqx_cm_registry, 5000, worker),
RegistryCleaner = child_spec(emqx_cm_registry_cleaner, 5000, worker), RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
Children = Children =
@ -59,7 +59,7 @@ init([]) ->
Locker, Locker,
CmPool, CmPool,
Registry, Registry,
RegistryCleaner, RegistryKeeper,
Manager, Manager,
DSSessionGCSup DSSessionGCSup
], ],

View File

@ -1356,13 +1356,13 @@ fields("broker") ->
desc => ?DESC(broker_enable_session_registry) desc => ?DESC(broker_enable_session_registry)
} }
)}, )},
{session_registration_history_retain, {session_history_retain,
sc( sc(
duration_s(), duration_s(),
#{ #{
default => <<"0s">>, default => <<"0s">>,
importance => ?IMPORTANCE_LOW, importance => ?IMPORTANCE_LOW,
desc => ?DESC("broker_session_registration_history_retain") desc => ?DESC("broker_session_history_retain")
} }
)}, )},
{session_locking_strategy, {session_locking_strategy,

View File

@ -45,7 +45,8 @@
subscribe_batch/2, subscribe_batch/2,
unsubscribe/2, unsubscribe/2,
unsubscribe_batch/2, unsubscribe_batch/2,
set_keepalive/2 set_keepalive/2,
sessions_count/2
]). ]).
-export([ -export([
@ -96,7 +97,8 @@ paths() ->
"/clients/:clientid/subscribe/bulk", "/clients/:clientid/subscribe/bulk",
"/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe",
"/clients/:clientid/unsubscribe/bulk", "/clients/:clientid/unsubscribe/bulk",
"/clients/:clientid/keepalive" "/clients/:clientid/keepalive",
"/sessions_count"
]. ].
schema("/clients") -> schema("/clients") ->
@ -385,6 +387,30 @@ schema("/clients/:clientid/keepalive") ->
) )
} }
} }
};
schema("/sessions_count") ->
#{
'operationId' => sessions_count,
get => #{
description => ?DESC(get_sessions_count),
tags => ?TAGS,
parameters => [
{since,
hoconsc:mk(non_neg_integer(), #{
in => query,
required => false,
default => 0,
desc =>
<<"Include sessions expired after this time (UNIX Epoch in seconds precesion)">>,
example => 1705391625
})}
],
responses => #{
200 => hoconsc:mk(binary(), #{
desc => <<"Number of sessions">>
})
}
}
}. }.
fields(clients) -> fields(clients) ->
@ -1059,3 +1085,8 @@ client_example() ->
<<"recv_cnt">> => 4, <<"recv_cnt">> => 4,
<<"recv_msg.qos0">> => 0 <<"recv_msg.qos0">> => 0
}. }.
sessions_count(get, #{query_string := QString}) ->
Since = maps:get(<<"since">>, QString, undefined),
Count = emqx_cm_registry_keeper:count(Since),
{200, integer_to_binary(Count)}.

View File

@ -60,4 +60,11 @@ set_keepalive_seconds.desc:
set_keepalive_seconds.label: set_keepalive_seconds.label:
"""Set the online client keepalive by seconds""" """Set the online client keepalive by seconds"""
get_sessions_count.desc:
"""Get the number of sessions. By default it returns the number of non-expired sessions.
if `broker.session_history_retain` is set to a duration greater than `0s`,
this API can also count expired sessions by providing the `since` parameter."""
get_sessions_count.label:
"""Count number of sessions"""
} }

View File

@ -1429,7 +1429,7 @@ Advantages of Disabling<br/>
- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system. - Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system.
- Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster.""" - Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster."""
broker_session_registration_history_retain.desc: broker_session_history_retain.desc:
"""The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance. """The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance.
This retained history can be used to monitor how many sessions were registered in the past configured duration. This retained history can be used to monitor how many sessions were registered in the past configured duration.
Note: This config has no effect if `enable_session_registry` is set to `false`.<br/> Note: This config has no effect if `enable_session_registry` is set to `false`.<br/>