Merge pull request #6985 from DDDHuang/ws_client_metrics44
feat(ws): more client metrics
This commit is contained in:
commit
5985b7cb09
|
@ -1,11 +1,12 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.4.0",
|
||||
[ {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||
, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}
|
||||
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}
|
||||
|
@ -17,11 +18,12 @@
|
|||
{<<".*">>,[]}
|
||||
],
|
||||
[{"4.4.0",
|
||||
[ {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}
|
||||
, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}
|
||||
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}
|
||||
|
|
|
@ -118,7 +118,32 @@
|
|||
quota_timer => expire_quota_limit
|
||||
}).
|
||||
|
||||
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
||||
-define(INFO_KEYS,
|
||||
[ conninfo
|
||||
, conn_state
|
||||
, clientinfo
|
||||
, session
|
||||
, will_msg
|
||||
]).
|
||||
|
||||
-define(CHANNEL_METRICS,
|
||||
[ recv_pkt
|
||||
, recv_msg
|
||||
, 'recv_msg.qos0'
|
||||
, 'recv_msg.qos1'
|
||||
, 'recv_msg.qos2'
|
||||
, 'recv_msg.dropped'
|
||||
, 'recv_msg.dropped.await_pubrel_timeout'
|
||||
, send_pkt
|
||||
, send_msg
|
||||
, 'send_msg.qos0'
|
||||
, 'send_msg.qos1'
|
||||
, 'send_msg.qos2'
|
||||
, 'send_msg.dropped'
|
||||
, 'send_msg.dropped.expired'
|
||||
, 'send_msg.dropped.queue_full'
|
||||
, 'send_msg.dropped.too_large'
|
||||
]).
|
||||
|
||||
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
|
||||
|
||||
|
@ -181,10 +206,9 @@ get_session(#channel{session = Session}) ->
|
|||
set_session(Session, Channel) ->
|
||||
Channel#channel{session = Session}.
|
||||
|
||||
%% TODO: Add more stats.
|
||||
-spec(stats(channel()) -> emqx_types:stats()).
|
||||
stats(#channel{session = Session})->
|
||||
emqx_session:stats(Session).
|
||||
lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)).
|
||||
|
||||
-spec(caps(channel()) -> emqx_types:caps()).
|
||||
caps(#channel{clientinfo = #{zone := Zone}}) ->
|
||||
|
|
|
@ -109,38 +109,6 @@
|
|||
|
||||
-define(ACTIVE_N, 100).
|
||||
|
||||
-define(INFO_KEYS, [ socktype
|
||||
, peername
|
||||
, sockname
|
||||
, sockstate
|
||||
, active_n
|
||||
]).
|
||||
|
||||
-define(CONN_STATS, [ recv_pkt
|
||||
, recv_msg
|
||||
, 'recv_msg.qos0'
|
||||
, 'recv_msg.qos1'
|
||||
, 'recv_msg.qos2'
|
||||
, 'recv_msg.dropped'
|
||||
, 'recv_msg.dropped.await_pubrel_timeout'
|
||||
, send_pkt
|
||||
, send_msg
|
||||
, 'send_msg.qos0'
|
||||
, 'send_msg.qos1'
|
||||
, 'send_msg.qos2'
|
||||
, 'send_msg.dropped'
|
||||
, 'send_msg.dropped.expired'
|
||||
, 'send_msg.dropped.queue_full'
|
||||
, 'send_msg.dropped.too_large'
|
||||
]).
|
||||
|
||||
-define(SOCK_STATS, [ recv_oct
|
||||
, recv_cnt
|
||||
, send_oct
|
||||
, send_cnt
|
||||
, send_pend
|
||||
]).
|
||||
|
||||
-define(ENABLED(X), (X =/= undefined)).
|
||||
|
||||
-define(ALARM_TCP_CONGEST(Channel),
|
||||
|
@ -148,12 +116,48 @@
|
|||
[emqx_channel:info(clientid, Channel),
|
||||
emqx_channel:info(username, Channel)]))).
|
||||
|
||||
-define(ALARM_CONN_INFO_KEYS, [
|
||||
socktype, sockname, peername,
|
||||
clientid, username, proto_name, proto_ver, connected_at
|
||||
]).
|
||||
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
||||
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
||||
-define(INFO_KEYS,
|
||||
[ socktype
|
||||
, peername
|
||||
, sockname
|
||||
, sockstate
|
||||
, active_n
|
||||
]).
|
||||
|
||||
-define(SOCK_STATS,
|
||||
[ recv_oct
|
||||
, recv_cnt
|
||||
, send_oct
|
||||
, send_cnt
|
||||
, send_pend
|
||||
]).
|
||||
|
||||
-define(ALARM_CONN_INFO_KEYS,
|
||||
[ socktype
|
||||
, sockname
|
||||
, peername
|
||||
, clientid
|
||||
, username
|
||||
, proto_name
|
||||
, proto_ver
|
||||
, connected_at
|
||||
]).
|
||||
|
||||
-define(ALARM_SOCK_STATS_KEYS,
|
||||
[ send_pend
|
||||
, recv_cnt
|
||||
, recv_oct
|
||||
, send_cnt
|
||||
, send_oct
|
||||
]).
|
||||
|
||||
-define(ALARM_SOCK_OPTS_KEYS,
|
||||
[ high_watermark
|
||||
, high_msgq_watermark
|
||||
, sndbuf
|
||||
, recbuf
|
||||
, buffer
|
||||
]).
|
||||
|
||||
-dialyzer({no_match, [info/2]}).
|
||||
-dialyzer({nowarn_function, [ init/4
|
||||
|
@ -214,10 +218,9 @@ stats(#state{transport = Transport,
|
|||
{ok, Ss} -> Ss;
|
||||
{error, _} -> []
|
||||
end,
|
||||
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
||||
ChanStats = emqx_channel:stats(Channel),
|
||||
ProcStats = emqx_misc:proc_stats(),
|
||||
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
||||
lists:append([SockStats, ChanStats, ProcStats]).
|
||||
|
||||
%% @doc Set TCP keepalive socket options to override system defaults.
|
||||
%% Idle: The number of seconds a connection needs to be idle before
|
||||
|
|
|
@ -93,9 +93,21 @@
|
|||
-type(ws_cmd() :: {active, boolean()}|close).
|
||||
|
||||
-define(ACTIVE_N, 100).
|
||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
||||
|
||||
-define(INFO_KEYS,
|
||||
[ socktype
|
||||
, peername
|
||||
, sockname
|
||||
, sockstate
|
||||
, active_n
|
||||
]).
|
||||
|
||||
-define(SOCK_STATS,
|
||||
[ recv_oct
|
||||
, recv_cnt
|
||||
, send_oct
|
||||
, send_cnt
|
||||
]).
|
||||
|
||||
-define(ENABLED(X), (X =/= undefined)).
|
||||
|
||||
|
@ -146,10 +158,9 @@ stats(WsPid) when is_pid(WsPid) ->
|
|||
call(WsPid, stats);
|
||||
stats(#state{channel = Channel}) ->
|
||||
SockStats = emqx_pd:get_counters(?SOCK_STATS),
|
||||
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
||||
ChanStats = emqx_channel:stats(Channel),
|
||||
ProcStats = emqx_misc:proc_stats(),
|
||||
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
||||
lists:append([SockStats, ChanStats, ProcStats]).
|
||||
|
||||
%% kick|discard|takeover
|
||||
-spec(call(pid(), Req :: term()) -> Reply :: term()).
|
||||
|
@ -615,6 +626,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
[emqx_packet:format(Packet)]),
|
||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
ok = inc_outgoing_stats({error, message_too_large}),
|
||||
<<>>;
|
||||
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
|
@ -641,19 +653,28 @@ inc_recv_stats(Cnt, Oct) ->
|
|||
|
||||
inc_incoming_stats(Packet = ?PACKET(Type)) ->
|
||||
_ = emqx_pd:inc_counter(recv_pkt, 1),
|
||||
if Type == ?PUBLISH ->
|
||||
case Type of
|
||||
?PUBLISH ->
|
||||
inc_counter(recv_msg, 1),
|
||||
inc_qos_stats(recv_msg, Packet),
|
||||
inc_counter(incoming_pubs, 1);
|
||||
true -> ok
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
emqx_metrics:inc_recv(Packet).
|
||||
|
||||
inc_outgoing_stats({error, message_too_large}) ->
|
||||
inc_counter('send_msg.dropped', 1),
|
||||
inc_counter('send_msg.dropped.too_large', 1);
|
||||
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
|
||||
_ = emqx_pd:inc_counter(send_pkt, 1),
|
||||
if Type == ?PUBLISH ->
|
||||
case Type of
|
||||
?PUBLISH ->
|
||||
inc_counter(send_msg, 1),
|
||||
inc_qos_stats(send_msg, Packet),
|
||||
inc_counter(outgoing_pubs, 1);
|
||||
true -> ok
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
emqx_metrics:inc_sent(Packet).
|
||||
|
||||
|
@ -666,6 +687,20 @@ inc_sent_stats(Cnt, Oct) ->
|
|||
inc_counter(Name, Value) ->
|
||||
_ = emqx_pd:inc_counter(Name, Value),
|
||||
ok.
|
||||
|
||||
inc_qos_stats(Type, #mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) when ?IS_QOS(QoS) ->
|
||||
inc_counter(inc_qos_stats_key(Type, QoS), 1);
|
||||
inc_qos_stats(_, _) ->
|
||||
ok.
|
||||
|
||||
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
|
||||
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
|
||||
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
|
||||
|
||||
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
|
||||
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
|
||||
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -169,8 +169,9 @@ t_stats(_) ->
|
|||
end
|
||||
end),
|
||||
Stats = ?ws_conn:call(WsPid, stats),
|
||||
[{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0},
|
||||
{recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}|_] = Stats.
|
||||
[lists:member(V, Stats) ||
|
||||
V <- [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0},
|
||||
{recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}]].
|
||||
|
||||
t_call(_) ->
|
||||
Info = ?ws_conn:info(st()),
|
||||
|
|
Loading…
Reference in New Issue