feat(stats): add `live_connections.{count,max}` stats to `emqx_stats`

This commit is contained in:
Thales Macedo Garitezi 2021-11-04 09:26:51 -03:00
parent 2d29ba8550
commit 8f853982a6
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
3 changed files with 75 additions and 4 deletions

View File

@ -92,6 +92,8 @@
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'}
]).
-define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}).
%% Batch drain
-define(BATCH_SIZE, 100000).
@ -516,7 +518,8 @@ clean_down({ChanPid, ClientId}) ->
do_unregister_channel({ClientId, ChanPid}).
stats_fun() ->
lists:foreach(fun update_stats/1, ?CHAN_STATS).
lists:foreach(fun update_stats/1, ?CHAN_STATS),
update_connected_client_stats().
update_stats({Tab, Stat, MaxStat}) ->
case ets:info(Tab, size) of
@ -524,6 +527,12 @@ update_stats({Tab, Stat, MaxStat}) ->
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end.
update_connected_client_stats() ->
{CountStat, MaxStat} = ?CONNECTED_CLIENT_STATS,
CCCount = get_connected_client_count(),
emqx_stats:setstat(CountStat, MaxStat, CCCount),
ok.
get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod

View File

@ -21,6 +21,7 @@
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-logger_header("[Stats]").
@ -67,8 +68,10 @@
%% Connection stats
-define(CONNECTION_STATS,
['connections.count', %% Count of Concurrent Connections
'connections.max' %% Maximum Number of Concurrent Connections
[ 'connections.count' %% Count of Concurrent Connections
, 'connections.max' %% Maximum Number of Concurrent Connections
, 'live_connections.count' %% Count of connected clients
, 'live_connections.max' %% Maximum number of connected clients
]).
%% Channel stats
@ -216,6 +219,11 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
ets:insert(?TAB, {MaxStat, Val})
end,
safe_update_element(Stat, Val),
?tp(emqx_stats_setstat,
#{ count_stat => Stat
, max_stat => MaxStat
, value => Val
}),
{noreply, State};
handle_cast({update_interval, Update = #update{name = Name}},
@ -274,4 +282,3 @@ safe_update_element(Key, Val) ->
error:badarg ->
?LOG(warning, "Failed to update ~0p to ~0p", [Key, Val])
end.

View File

@ -364,6 +364,40 @@ t_connected_client_count_refresh({'end', Config}) ->
{ok, _} = supervisor:restart_child(emqx_cm_sup, manager),
ok.
t_connected_client_stats({init, Config}) ->
ok = snabbkaffe:start_trace(),
Config;
t_connected_client_stats(Config) when is_list(Config) ->
?assertEqual(0, emqx_cm:get_connected_client_count()),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(0, emqx_stats:getstat('live_connections.max')),
{ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
{{ok, _}, _} = wait_for_events(
fun() -> emqtt:connect(ConnPid) end,
[emqx_cm_connected_client_count_inc]
),
%% ensure stats are synchronized
wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(1, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
ok = emqtt:disconnect(ConnPid),
%% ensure stats are synchronized
wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
ok;
t_connected_client_stats({'end', _Config}) ->
ok = snabbkaffe:stop(),
ok.
wait_for_events(Action, Kinds) ->
Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds)
@ -379,6 +413,27 @@ wait_for_events(Action, Kinds) ->
{Res, {ok, Events}}
end.
wait_for_stats(Action, Stats) ->
Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) ->
Stat = maps:take(
[ count_stat
, max_stat
], Event),
lists:member(Stat, Stats);
(_) ->
false
end,
N = length(Stats),
Timeout = 100,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, []} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
insert_fake_channels() ->
%% Insert copies to simulate missed counts
Tab = emqx_channel_info,