feat(metrics): add cluster_session guage

This commit is contained in:
Zaiming (Stone) Shi 2024-01-19 17:31:16 +01:00
parent 509ab6f35a
commit 87a2368e37
13 changed files with 55 additions and 10 deletions

View File

@ -124,7 +124,8 @@
{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'},
{?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'}
]).
%% Batch drain

View File

@ -99,7 +99,7 @@ unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid
case is_enabled() of
true ->
mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)),
%% insert unregistration history after unrestration
%% insert unregistration history after unregstration
ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end);
false ->
ok

View File

@ -172,7 +172,7 @@ cleanup_delay() ->
%% prepare for online config change
Default;
RetainSeconds ->
Min = max(1, timer:seconds(RetainSeconds div 4)),
Min = max(timer:seconds(1), timer:seconds(RetainSeconds) div 4),
min(Min, Default)
end.
@ -188,5 +188,7 @@ now_ts() ->
erlang:system_time(seconds).
do_count(Since) ->
Ms = ets:fun2ms(fun(#channel{pid = V}) -> is_pid(V) orelse (is_integer(V) andalso (V >= Since)) end),
Ms = ets:fun2ms(fun(#channel{pid = V}) ->
is_pid(V) orelse (is_integer(V) andalso (V >= Since))
end),
ets:select_count(?CHAN_REG_TAB, Ms).

View File

@ -99,7 +99,11 @@
[
'sessions.count',
%% Maximum Number of Concurrent Sessions
'sessions.max'
'sessions.max',
%% Count of Sessions in the cluster
'cluster_sessions.count',
%% Maximum Number of Sessions in the cluster
'cluster_sessions.max'
]
).
@ -164,6 +168,8 @@ names() ->
emqx_connections_max,
emqx_live_connections_count,
emqx_live_connections_max,
emqx_cluster_sessions_count,
emqx_cluster_sessions_max,
emqx_sessions_count,
emqx_sessions_max,
emqx_channels_count,

View File

@ -415,6 +415,7 @@ getstats(Key) ->
stats(connections) -> emqx_stats:getstat('connections.count');
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
stats(topics) -> emqx_stats:getstat('topics.count');
stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
stats(received) -> emqx_metrics:val('messages.received');

View File

@ -194,6 +194,12 @@ swagger_desc(live_connections) ->
"Connections at the time of sampling."
" Can only represent the approximate state"
>>;
swagger_desc(cluster_sessions) ->
<<
"Total number of sessions in the cluster at the time of sampling. "
"It includes expired sessions when `broker.session_history_retain` is set to a duration greater than `0s`. "
"Can only represent the approximate state"
>>;
swagger_desc(received_msg_rate) ->
swagger_desc_format("Dropped messages ", per);
%swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per);

View File

@ -145,6 +145,7 @@ node_info() ->
),
connections => ets:info(?CHAN_TAB, size),
live_connections => ets:info(?CHAN_LIVE_TAB, size),
cluster_sessions => ets:info(?CHAN_REG_TAB, size),
node_status => 'running',
uptime => proplists:get_value(uptime, BrokerInfo),
version => iolist_to_binary(proplists:get_value(version, BrokerInfo)),

View File

@ -160,6 +160,19 @@ fields(node_info) ->
non_neg_integer(),
#{desc => <<"Number of clients currently connected to this node">>, example => 0}
)},
{cluster_sessions,
mk(
non_neg_integer(),
#{
desc =>
<<
"By default, it includes only those sessions that have not expired. "
"If the `broker.session_history_retain` config is set to a duration greater than `0s`, "
"this count will also include sessions that expired within the specified retain time"
>>,
example => 0
}
)},
{load1,
mk(
float(),

View File

@ -89,6 +89,10 @@ fields(node_stats_data) ->
stats_schema('delayed.max', <<"Historical maximum number of delayed messages">>),
stats_schema('live_connections.count', <<"Number of current live connections">>),
stats_schema('live_connections.max', <<"Historical maximum number of live connections">>),
stats_schema('cluster_sessions.count', <<"Number of sessions in the cluster">>),
stats_schema(
'cluster_sessions.max', <<"Historical maximum number of sessions in the cluster">>
),
stats_schema('retained.count', <<"Number of currently retained messages">>),
stats_schema('retained.max', <<"Historical maximum number of retained messages">>),
stats_schema('sessions.count', <<"Number of current sessions">>),

View File

@ -251,7 +251,7 @@ add_collect_family(Name, Data, Callback, Type) ->
%% behaviour
fetch_from_local_node(Mode) ->
{node(self()), #{
{node(), #{
stats_data => stats_data(Mode),
vm_data => vm_data(Mode),
cluster_data => cluster_data(Mode),
@ -308,6 +308,8 @@ emqx_collect(K = emqx_sessions_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_sessions_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_channels_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
%% pub/sub stats
emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
@ -500,6 +502,8 @@ stats_metric_meta() ->
{emqx_sessions_max, gauge, 'sessions.max'},
{emqx_channels_count, gauge, 'channels.count'},
{emqx_channels_max, gauge, 'channels.max'},
{emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'},
{emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'},
%% pub/sub stats
{emqx_suboptions_count, gauge, 'suboptions.count'},
{emqx_suboptions_max, gauge, 'suboptions.max'},

View File

@ -1,6 +1,6 @@
{application, emqx_telemetry, [
{description, "Report telemetry data for EMQX Opensource edition"},
{vsn, "0.1.3"},
{vsn, "0.2.0"},
{registered, [emqx_telemetry_sup, emqx_telemetry]},
{mod, {emqx_telemetry_app, []}},
{applications, [

View File

@ -303,6 +303,9 @@ active_plugins() ->
num_clients() ->
emqx_stats:getstat('live_connections.count').
num_cluster_sessions() ->
emqx_stats:getstat('cluster_sessions.count').
messages_sent() ->
emqx_metrics:val('messages.sent').
@ -348,6 +351,7 @@ get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID})
{nodes_uuid, nodes_uuid()},
{active_plugins, active_plugins()},
{num_clients, num_clients()},
{num_cluster_sessions, num_cluster_sessions()},
{messages_received, messages_received()},
{messages_sent, messages_sent()},
{build_info, build_info()},

View File

@ -61,9 +61,12 @@ set_keepalive_seconds.label:
"""Set the online client keepalive by seconds"""
get_sessions_count.desc:
"""Get the number of sessions. By default it returns the number of non-expired sessions.
if `broker.session_history_retain` is set to a duration greater than `0s`,
this API can also count expired sessions by providing the `since` parameter."""
"""Get the total number of sessions in the cluster.
By default, it includes only those sessions that have not expired.
If the `broker.session_history_retain` config is set to a duration greater than 0s,
this count will also include sessions that expired within the specified retain time.
By specifying the `since` parameter, it can return the number of sessions that have expired within the specified time."""
get_sessions_count.label:
"""Count number of sessions"""