Merge pull request #12326 from zmstone/0111-unregister-session-with-timestamp

0111 unregister session with timestamp
This commit is contained in:
Zaiming (Stone) Shi 2024-02-02 17:00:23 +01:00 committed by GitHub
commit b1a05c7b59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 623 additions and 65 deletions

View File

@ -23,7 +23,7 @@
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).
%% Mria/Mnesia Tables for channel management.
%% Mria table for session registration.
-define(CHAN_REG_TAB, emqx_channel_registry).
-define(T_KICK, 5_000).
@ -32,4 +32,11 @@
-define(CM_POOL, emqx_cm_pool).
%% Registered sessions.
-record(channel, {
chid :: emqx_types:clientid() | '_',
%% pid field is extended in 5.6.0 to support recording unregistration timestamp.
pid :: pid() | non_neg_integer() | '$1'
}).
-endif.

View File

@ -124,7 +124,8 @@
{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'},
{?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'}
]).
%% Batch drain

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -19,18 +19,15 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").
-export([start_link/0]).
-export([is_enabled/0]).
-export([is_enabled/0, is_hist_enabled/0]).
-export([
register_channel/1,
unregister_channel/1
register_channel2/1,
unregister_channel/1,
unregister_channel2/1
]).
-export([lookup_channels/1]).
@ -50,10 +47,13 @@
do_cleanup_channels/1
]).
-define(REGISTRY, ?MODULE).
-define(LOCK, {?MODULE, cleanup_down}).
-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").
-record(channel, {chid, pid}).
-define(REGISTRY, ?MODULE).
-define(NODE_DOWN_CLEANUP_LOCK, {?MODULE, cleanup_down}).
%% @doc Start the global channel registry.
-spec start_link() -> startlink_ret().
@ -69,6 +69,11 @@ start_link() ->
is_enabled() ->
emqx:get_config([broker, enable_session_registry]).
%% @doc Is the global session registration history enabled?
-spec is_hist_enabled() -> boolean().
is_hist_enabled() ->
retain_duration() > 0.
%% @doc Register a global channel.
-spec register_channel(
emqx_types:clientid()
@ -77,11 +82,21 @@ is_enabled() ->
register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, self()});
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
IsHistEnabled = is_hist_enabled(),
case is_enabled() of
true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
true when IsHistEnabled ->
mria:async_dirty(?CM_SHARD, fun ?MODULE:register_channel2/1, [record(ClientId, ChanPid)]);
true ->
mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false ->
ok
end.
%% @private
register_channel2(#channel{chid = ClientId} = Record) ->
_ = delete_hist_d(ClientId),
mria:dirty_write(?CHAN_REG_TAB, Record).
%% @doc Unregister a global channel.
-spec unregister_channel(
emqx_types:clientid()
@ -90,19 +105,54 @@ register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid)
unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, self()});
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
IsHistEnabled = is_hist_enabled(),
case is_enabled() of
true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
true when IsHistEnabled ->
mria:async_dirty(?CM_SHARD, fun ?MODULE:unregister_channel2/1, [
record(ClientId, ChanPid)
]);
true ->
mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
false ->
ok
end.
%% @private
unregister_channel2(#channel{chid = ClientId} = Record) ->
mria:dirty_delete_object(?CHAN_REG_TAB, Record),
ok = insert_hist_d(ClientId).
%% @doc Lookup the global channels.
-spec lookup_channels(emqx_types:clientid()) -> list(pid()).
lookup_channels(ClientId) ->
[ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)].
lists:filtermap(
fun
(#channel{pid = ChanPid}) when is_pid(ChanPid) ->
case is_pid_down(ChanPid) of
true ->
false;
_ ->
{true, ChanPid}
end;
(_) ->
false
end,
mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
).
%% Return 'true' or 'false' if it's a local pid.
%% Otherwise return 'unknown'.
is_pid_down(Pid) when node(Pid) =:= node() ->
not erlang:is_process_alive(Pid);
is_pid_down(_) ->
unknown.
record(ClientId, ChanPid) ->
#channel{chid = ClientId, pid = ChanPid}.
hist(ClientId) ->
#channel{chid = ClientId, pid = now_ts()}.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -158,15 +208,95 @@ code_change(_OldVsn, State, _Extra) ->
cleanup_channels(Node) ->
global:trans(
{?LOCK, self()},
{?NODE_DOWN_CLEANUP_LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node])
end
).
do_cleanup_channels(Node) ->
Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)).
Pat = [
{
#channel{pid = '$1', _ = '_'},
_Match = [{'andalso', {is_pid, '$1'}, {'==', {node, '$1'}, Node}}],
_Return = ['$_']
}
],
IsHistEnabled = is_hist_enabled(),
lists:foreach(
fun(Chan) -> delete_channel(IsHistEnabled, Chan) end,
mnesia:select(?CHAN_REG_TAB, Pat, write)
).
delete_channel(Chan) ->
mnesia:delete_object(?CHAN_REG_TAB, Chan, write).
delete_channel(IsHistEnabled, Chan) ->
mnesia:delete_object(?CHAN_REG_TAB, Chan, write),
case IsHistEnabled of
true ->
insert_hist_t(Chan#channel.chid);
false ->
ok
end.
%%--------------------------------------------------------------------
%% History entry operations
%% Insert unregistration history in a transaction when unregistering the last channel for a clientid.
insert_hist_t(ClientId) ->
case delete_hist_t(ClientId) of
true ->
ok;
false ->
mnesia:write(?CHAN_REG_TAB, hist(ClientId), write)
end.
%% Dirty insert unregistration history.
%% Since dirty opts are used, async pool workers may race deletes and inserts,
%% so there could be more than one history records for a clientid,
%% but it should be eventually consistent after the client re-registers or the periodic cleanup.
insert_hist_d(ClientId) ->
%% delete old hist records first
case delete_hist_d(ClientId) of
true ->
ok;
false ->
mria:dirty_write(?CHAN_REG_TAB, hist(ClientId))
end.
%% Current timestamp in seconds.
now_ts() ->
erlang:system_time(seconds).
%% Delete all history records for a clientid, return true if there is a Pid found.
delete_hist_t(ClientId) ->
fold_hist(
fun(Hist) -> mnesia:delete_object(?CHAN_REG_TAB, Hist, write) end,
mnesia:read(?CHAN_REG_TAB, ClientId, write)
).
%% Delete all history records for a clientid, return true if there is a Pid found.
delete_hist_d(ClientId) ->
fold_hist(
fun(Hist) -> mria:dirty_delete_object(?CHAN_REG_TAB, Hist) end,
mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
).
%% Fold over the history records, return true if there is a Pid found.
fold_hist(F, List) ->
lists:foldl(
fun(#channel{pid = Ts} = Record, HasPid) ->
case is_integer(Ts) of
true ->
ok = F(Record),
HasPid;
false ->
true
end
end,
false,
List
).
%% Return the session registration history retain duration.
-spec retain_duration() -> non_neg_integer().
retain_duration() ->
emqx:get_config([broker, session_history_retain]).

View File

@ -0,0 +1,194 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This module implements the global session registry history cleaner.
-module(emqx_cm_registry_keeper).
-behaviour(gen_server).
-export([
start_link/0,
count/1
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-include_lib("stdlib/include/ms_transform.hrl").
-include("emqx_cm.hrl").
-define(CACHE_COUNT_THRESHOLD, 1000).
-define(MIN_COUNT_INTERVAL_SECONDS, 5).
-define(CLEANUP_CHUNK_SIZE, 10000).
-define(IS_HIST_ENABLED(RETAIN), (RETAIN > 0)).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_) ->
case mria_config:whoami() =:= replicant of
true ->
ignore;
false ->
ok = send_delay_start(),
{ok, #{next_clientid => undefined}}
end.
%% @doc Count the number of sessions.
%% Include sessions which are expired since the given timestamp if `since' is greater than 0.
-spec count(non_neg_integer()) -> non_neg_integer().
count(Since) ->
Retain = retain_duration(),
Now = now_ts(),
%% Get table size if hist is not enabled or
%% Since is before the earliest possible retention time.
IsCountAll = (not ?IS_HIST_ENABLED(Retain) orelse (Now - Retain >= Since)),
case IsCountAll of
true ->
mnesia:table_info(?CHAN_REG_TAB, size);
false ->
%% make a gen call to avoid many callers doing the same concurrently
gen_server:call(?MODULE, {count, Since}, infinity)
end.
handle_call({count, Since}, _From, State) ->
{LastCountTime, LastCount} =
case State of
#{last_count_time := T, last_count := C} ->
{T, C};
_ ->
{0, 0}
end,
Now = now_ts(),
Total = mnesia:table_info(?CHAN_REG_TAB, size),
%% Always count if the table is small enough
%% or when the last count is too old
IsTableSmall = (Total < ?CACHE_COUNT_THRESHOLD),
IsLastCountOld = (Now - LastCountTime > ?MIN_COUNT_INTERVAL_SECONDS),
case IsTableSmall orelse IsLastCountOld of
true ->
Count = do_count(Since),
CountFinishedAt = now_ts(),
{reply, Count, State#{last_count_time => CountFinishedAt, last_count => Count}};
false ->
{reply, LastCount, State}
end;
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(start, #{next_clientid := NextClientId} = State) ->
case is_hist_enabled() of
true ->
NewNext =
case cleanup_one_chunk(NextClientId) of
'$end_of_table' ->
ok = send_delay_start(),
undefined;
Id ->
_ = erlang:garbage_collect(),
Id
end,
{noreply, State#{next_clientid := NewNext}};
false ->
%% if not enabled, delay and check again
%% because it might be enabled from online config change while waiting
ok = send_delay_start(),
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
cleanup_one_chunk(NextClientId) ->
Retain = retain_duration(),
Now = now_ts(),
IsExpired = fun(#channel{pid = Ts}) ->
is_integer(Ts) andalso (Ts < Now - Retain)
end,
cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired).
cleanup_loop(ClientId, 0, _IsExpired) ->
ClientId;
cleanup_loop('$end_of_table', _Count, _IsExpired) ->
'$end_of_table';
cleanup_loop(undefined, Count, IsExpired) ->
cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired);
cleanup_loop(ClientId, Count, IsExpired) ->
Records = mnesia:dirty_read(?CHAN_REG_TAB, ClientId),
Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId),
lists:foreach(
fun(R) ->
case IsExpired(R) of
true ->
mria:dirty_delete_object(?CHAN_REG_TAB, R);
false ->
ok
end
end,
Records
),
cleanup_loop(Next, Count - 1, IsExpired).
is_hist_enabled() ->
retain_duration() > 0.
%% Return the session registration history retain duration in seconds.
-spec retain_duration() -> non_neg_integer().
retain_duration() ->
emqx:get_config([broker, session_history_retain]).
cleanup_delay() ->
Default = timer:minutes(2),
case retain_duration() of
0 ->
%% prepare for online config change
Default;
RetainSeconds ->
Min = max(timer:seconds(1), timer:seconds(RetainSeconds) div 4),
min(Min, Default)
end.
send_delay_start() ->
Delay = cleanup_delay(),
ok = send_delay_start(Delay).
send_delay_start(Delay) ->
_ = erlang:send_after(Delay, self(), start),
ok.
now_ts() ->
erlang:system_time(seconds).
do_count(Since) ->
Ms = ets:fun2ms(fun(#channel{pid = V}) ->
is_pid(V) orelse (is_integer(V) andalso (V >= Since))
end),
ets:select_count(?CHAN_REG_TAB, Ms).

View File

@ -49,6 +49,7 @@ init([]) ->
Locker = child_spec(emqx_cm_locker, 5000, worker),
CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
Registry = child_spec(emqx_cm_registry, 5000, worker),
RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
Children =
@ -58,6 +59,7 @@ init([]) ->
Locker,
CmPool,
Registry,
RegistryKeeper,
Manager,
DSSessionGCSup
],

View File

@ -182,7 +182,7 @@
-define(DEFAULT_MULTIPLIER, 1.5).
-define(DEFAULT_BACKOFF, 0.75).
namespace() -> broker.
namespace() -> emqx.
tags() ->
[<<"EMQX">>].
@ -230,7 +230,7 @@ roots(high) ->
);
roots(medium) ->
[
{"broker",
{broker,
sc(
ref("broker"),
#{
@ -1347,24 +1347,43 @@ fields("deflate_opts") ->
];
fields("broker") ->
[
{"enable_session_registry",
{enable_session_registry,
sc(
boolean(),
#{
default => true,
importance => ?IMPORTANCE_HIGH,
desc => ?DESC(broker_enable_session_registry)
}
)},
{"session_locking_strategy",
{session_history_retain,
sc(
duration_s(),
#{
default => <<"0s">>,
importance => ?IMPORTANCE_LOW,
desc => ?DESC("broker_session_history_retain")
}
)},
{session_locking_strategy,
sc(
hoconsc:enum([local, leader, quorum, all]),
#{
default => quorum,
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(broker_session_locking_strategy)
}
)},
shared_subscription_strategy(),
{"shared_dispatch_ack_enabled",
%% moved to under mqtt root
{shared_subscription_strategy,
sc(
string(),
#{
deprecated => {since, "5.1.0"},
importance => ?IMPORTANCE_HIDDEN
}
)},
{shared_dispatch_ack_enabled,
sc(
boolean(),
#{
@ -1374,7 +1393,7 @@ fields("broker") ->
desc => ?DESC(broker_shared_dispatch_ack_enabled)
}
)},
{"route_batch_clean",
{route_batch_clean,
sc(
boolean(),
#{
@ -1383,18 +1402,18 @@ fields("broker") ->
importance => ?IMPORTANCE_HIDDEN
}
)},
{"perf",
{perf,
sc(
ref("broker_perf"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
{"routing",
{routing,
sc(
ref("broker_routing"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
%% FIXME: Need new design for shared subscription group
{"shared_subscription_group",
{shared_subscription_group,
sc(
map(name, ref("shared_subscription_group")),
#{
@ -3640,7 +3659,22 @@ mqtt_general() ->
desc => ?DESC(mqtt_shared_subscription)
}
)},
shared_subscription_strategy(),
{"shared_subscription_strategy",
sc(
hoconsc:enum([
random,
round_robin,
round_robin_per_group,
sticky,
local,
hash_topic,
hash_clientid
]),
#{
default => round_robin,
desc => ?DESC(mqtt_shared_subscription_strategy)
}
)},
{"exclusive_subscription",
sc(
boolean(),
@ -3846,24 +3880,6 @@ mqtt_session() ->
)}
].
shared_subscription_strategy() ->
{"shared_subscription_strategy",
sc(
hoconsc:enum([
random,
round_robin,
round_robin_per_group,
sticky,
local,
hash_topic,
hash_clientid
]),
#{
default => round_robin,
desc => ?DESC(broker_shared_subscription_strategy)
}
)}.
default_mem_check_interval() ->
case emqx_os_mon:is_os_check_supported() of
true -> <<"60s">>;

View File

@ -99,7 +99,11 @@
[
'sessions.count',
%% Maximum Number of Concurrent Sessions
'sessions.max'
'sessions.max',
%% Count of Sessions in the cluster
'cluster_sessions.count',
%% Maximum Number of Sessions in the cluster
'cluster_sessions.max'
]
).
@ -164,6 +168,8 @@ names() ->
emqx_connections_max,
emqx_live_connections_count,
emqx_live_connections_max,
emqx_cluster_sessions_count,
emqx_cluster_sessions_max,
emqx_sessions_count,
emqx_sessions_max,
emqx_channels_count,

View File

@ -0,0 +1,100 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cm_registry_keeper_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_cm.hrl").
%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
AppConfig = "broker.session_history_retain = 2s",
Apps = emqx_cth_suite:start(
[{emqx, #{config => AppConfig}}],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, Config) ->
Config.
t_cleanup_after_retain(_) ->
Pid = spawn(fun() ->
receive
stop -> ok
end
end),
ClientId = <<"clientid">>,
ClientId2 = <<"clientid2">>,
emqx_cm_registry:register_channel({ClientId, Pid}),
emqx_cm_registry:register_channel({ClientId2, Pid}),
?assertEqual([Pid], emqx_cm_registry:lookup_channels(ClientId)),
?assertEqual([Pid], emqx_cm_registry:lookup_channels(ClientId2)),
?assertEqual(2, emqx_cm_registry_keeper:count(0)),
T0 = erlang:system_time(seconds),
exit(Pid, kill),
%% lookup_channel chesk if the channel is still alive
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)),
%% simulate a DOWN message which causes emqx_cm to call clean_down
%% to clean the channels for real
ok = emqx_cm:clean_down({Pid, ClientId}),
ok = emqx_cm:clean_down({Pid, ClientId2}),
?assertEqual(2, emqx_cm_registry_keeper:count(T0)),
?assertEqual(2, emqx_cm_registry_keeper:count(0)),
?retry(_Interval = 1000, _Attempts = 4, begin
?assertEqual(0, emqx_cm_registry_keeper:count(T0)),
?assertEqual(0, emqx_cm_registry_keeper:count(0))
end),
ok.
%% count is cached when the number of entries is greater than 1000
t_count_cache(_) ->
Pid = self(),
ClientsCount = 999,
ClientIds = lists:map(fun erlang:integer_to_binary/1, lists:seq(1, ClientsCount)),
Channels = lists:map(fun(ClientId) -> {ClientId, Pid} end, ClientIds),
lists:foreach(
fun emqx_cm_registry:register_channel/1,
Channels
),
T0 = erlang:system_time(seconds),
?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(0)),
?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(T0)),
%% insert another one to trigger the cache threshold
emqx_cm_registry:register_channel({<<"-1">>, Pid}),
?assertEqual(ClientsCount + 1, emqx_cm_registry_keeper:count(0)),
?assertEqual(ClientsCount, emqx_cm_registry_keeper:count(T0)),
mnesia:clear_table(?CHAN_REG_TAB),
ok.
channel(Id, Pid) ->
#channel{chid = Id, pid = Pid}.

View File

@ -76,8 +76,7 @@ t_fill_default_values(C) when is_list(C) ->
<<"trie_compaction">> := true
},
<<"route_batch_clean">> := false,
<<"session_locking_strategy">> := <<"quorum">>,
<<"shared_subscription_strategy">> := <<"round_robin">>
<<"session_history_retain">> := <<"0s">>
}
},
WithDefaults

View File

@ -415,6 +415,7 @@ getstats(Key) ->
stats(connections) -> emqx_stats:getstat('connections.count');
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
stats(topics) -> emqx_stats:getstat('topics.count');
stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
stats(received) -> emqx_metrics:val('messages.received');

View File

@ -194,6 +194,12 @@ swagger_desc(live_connections) ->
"Connections at the time of sampling."
" Can only represent the approximate state"
>>;
swagger_desc(cluster_sessions) ->
<<
"Total number of sessions in the cluster at the time of sampling. "
"It includes expired sessions when `broker.session_history_retain` is set to a duration greater than `0s`. "
"Can only represent the approximate state"
>>;
swagger_desc(received_msg_rate) ->
swagger_desc_format("Dropped messages ", per);
%swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per);

View File

@ -145,6 +145,7 @@ node_info() ->
),
connections => ets:info(?CHAN_TAB, size),
live_connections => ets:info(?CHAN_LIVE_TAB, size),
cluster_sessions => ets:info(?CHAN_REG_TAB, size),
node_status => 'running',
uptime => proplists:get_value(uptime, BrokerInfo),
version => iolist_to_binary(proplists:get_value(version, BrokerInfo)),

View File

@ -45,7 +45,8 @@
subscribe_batch/2,
unsubscribe/2,
unsubscribe_batch/2,
set_keepalive/2
set_keepalive/2,
sessions_count/2
]).
-export([
@ -96,7 +97,8 @@ paths() ->
"/clients/:clientid/subscribe/bulk",
"/clients/:clientid/unsubscribe",
"/clients/:clientid/unsubscribe/bulk",
"/clients/:clientid/keepalive"
"/clients/:clientid/keepalive",
"/sessions_count"
].
schema("/clients") ->
@ -385,6 +387,30 @@ schema("/clients/:clientid/keepalive") ->
)
}
}
};
schema("/sessions_count") ->
#{
'operationId' => sessions_count,
get => #{
description => ?DESC(get_sessions_count),
tags => ?TAGS,
parameters => [
{since,
hoconsc:mk(non_neg_integer(), #{
in => query,
required => false,
default => 0,
desc =>
<<"Include sessions expired after this time (UNIX Epoch in seconds precision)">>,
example => 1705391625
})}
],
responses => #{
200 => hoconsc:mk(binary(), #{
desc => <<"Number of sessions">>
})
}
}
}.
fields(clients) ->
@ -1059,3 +1085,8 @@ client_example() ->
<<"recv_cnt">> => 4,
<<"recv_msg.qos0">> => 0
}.
sessions_count(get, #{query_string := QString}) ->
Since = maps:get(<<"since">>, QString, 0),
Count = emqx_cm_registry_keeper:count(Since),
{200, integer_to_binary(Count)}.

View File

@ -53,7 +53,8 @@
<<"alarm">>,
<<"sys_topics">>,
<<"sysmon">>,
<<"log">>
<<"log">>,
<<"broker">>
| ?ROOT_KEYS_EE
]).

View File

@ -160,6 +160,19 @@ fields(node_info) ->
non_neg_integer(),
#{desc => <<"Number of clients currently connected to this node">>, example => 0}
)},
{cluster_sessions,
mk(
non_neg_integer(),
#{
desc =>
<<
"By default, it includes only those sessions that have not expired. "
"If the `broker.session_history_retain` config is set to a duration greater than `0s`, "
"this count will also include sessions that expired within the specified retain time"
>>,
example => 0
}
)},
{load1,
mk(
float(),

View File

@ -89,6 +89,10 @@ fields(node_stats_data) ->
stats_schema('delayed.max', <<"Historical maximum number of delayed messages">>),
stats_schema('live_connections.count', <<"Number of current live connections">>),
stats_schema('live_connections.max', <<"Historical maximum number of live connections">>),
stats_schema('cluster_sessions.count', <<"Number of sessions in the cluster">>),
stats_schema(
'cluster_sessions.max', <<"Historical maximum number of sessions in the cluster">>
),
stats_schema('retained.count', <<"Number of currently retained messages">>),
stats_schema('retained.max', <<"Historical maximum number of retained messages">>),
stats_schema('sessions.count', <<"Number of current sessions">>),

View File

@ -251,7 +251,7 @@ add_collect_family(Name, Data, Callback, Type) ->
%% behaviour
fetch_from_local_node(Mode) ->
{node(self()), #{
{node(), #{
stats_data => stats_data(Mode),
vm_data => vm_data(Mode),
cluster_data => cluster_data(Mode),
@ -308,6 +308,8 @@ emqx_collect(K = emqx_sessions_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_sessions_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_channels_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
%% pub/sub stats
emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
@ -500,6 +502,8 @@ stats_metric_meta() ->
{emqx_sessions_max, gauge, 'sessions.max'},
{emqx_channels_count, gauge, 'channels.count'},
{emqx_channels_max, gauge, 'channels.max'},
{emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'},
{emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'},
%% pub/sub stats
{emqx_suboptions_count, gauge, 'suboptions.count'},
{emqx_suboptions_max, gauge, 'suboptions.max'},

View File

@ -1,6 +1,6 @@
{application, emqx_telemetry, [
{description, "Report telemetry data for EMQX Opensource edition"},
{vsn, "0.1.3"},
{vsn, "0.2.0"},
{registered, [emqx_telemetry_sup, emqx_telemetry]},
{mod, {emqx_telemetry_app, []}},
{applications, [

View File

@ -303,6 +303,9 @@ active_plugins() ->
num_clients() ->
emqx_stats:getstat('live_connections.count').
num_cluster_sessions() ->
emqx_stats:getstat('cluster_sessions.count').
messages_sent() ->
emqx_metrics:val('messages.sent').
@ -348,6 +351,7 @@ get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID})
{nodes_uuid, nodes_uuid()},
{active_plugins, active_plugins()},
{num_clients, num_clients()},
{num_cluster_sessions, num_cluster_sessions()},
{messages_received, messages_received()},
{messages_sent, messages_sent()},
{build_info, build_info()},

View File

@ -0,0 +1,14 @@
Add session registration history.
Setting config `broker.session_history_retain` allows EMQX to keep track of expired sessions for the retained period.
API `GET /api/v5/sessions_count?since=1705682238` can be called to count the cluster-wide sessions which were alive (unexpired) since the provided timestamp (UNIX epoch at seconds precision).
A new gauge `cluster_sessions` is added to the metrics collection. Exposed to prometheus as
```
# TYPE emqx_cluster_sessions_count gauge
emqx_cluster_sessions_count 1234
```
The counter can only be used for an approximate estimation as the collection and calculations are async.

View File

@ -60,4 +60,14 @@ set_keepalive_seconds.desc:
set_keepalive_seconds.label:
"""Set the online client keepalive by seconds"""
get_sessions_count.desc:
"""Get the total number of sessions in the cluster.
By default, it includes only those sessions that have not expired.
If the `broker.session_history_retain` config is set to a duration greater than 0s,
this count will also include sessions that expired within the specified retain time.
By specifying the `since` parameter, it can return the number of sessions that have expired within the specified time."""
get_sessions_count.label:
"""Count number of sessions"""
}

View File

@ -22,9 +22,9 @@ install_dir.label:
"""Install Directory"""
name_vsn.desc:
"""The {name}-{version} of the plugin.<br/>
It should match the plugin application name-version as the for the plugin release package name<br/>
For example: my_plugin-0.1.0."""
"""The `{name}-{version}` of the plugin.<br/>
It should match the plugin application name-version as plugin release package name<br/>
For example: `my_plugin-0.1.0`."""
name_vsn.label:
"""Name-Version"""

View File

@ -1022,7 +1022,7 @@ fields_ws_opts_supported_subprotocols.desc:
fields_ws_opts_supported_subprotocols.label:
"""Supported subprotocols"""
broker_shared_subscription_strategy.desc:
mqtt_shared_subscription_strategy.desc:
"""Dispatch strategy for shared subscription.
- `random`: Randomly select a subscriber for dispatch;
- `round_robin`: Messages from a single publisher are dispatched to subscribers in turn;
@ -1420,7 +1420,21 @@ force_shutdown_enable.label:
"""Enable `force_shutdown` feature"""
broker_enable_session_registry.desc:
"""Enable session registry"""
"""The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster.
Recommendations for Use<br/>
- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconnected to another node in the cluster, the new connection will need to find the old session and take it over.
- Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases.
Advantages of Disabling<br/>
- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system.
- Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster."""
broker_session_history_retain.desc:
"""The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance.
This retained history can be used to monitor how many sessions were registered in the past configured duration.
Note: This config has no effect if `enable_session_registry` is set to `false`.<br/>
Note: If the clients are using random client IDs, it's not recommended to enable this feature, at least not for a long retention period.<br/>
Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect."""
overload_protection_backoff_delay.desc:
"""The maximum duration of delay for background task execution during high load conditions."""