From 0bb01210ae79e344b3608f44490822b9e3ccfba2 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 11 Feb 2022 10:50:46 +0800 Subject: [PATCH] feat(ws): more client metrics --- src/emqx_channel.erl | 30 +++++++++-- src/emqx_connection.erl | 83 ++++++++++++++++--------------- src/emqx_ws_connection.erl | 61 ++++++++++++++++++----- test/emqx_ws_connection_SUITE.erl | 5 +- 4 files changed, 121 insertions(+), 58 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 0f988eed4..a1771fd14 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -118,7 +118,32 @@ quota_timer => expire_quota_limit }). --define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). +-define(INFO_KEYS, + [ conninfo + , conn_state + , clientinfo + , session + , will_msg + ]). + +-define(CHANNEL_METRICS, + [ recv_pkt + , recv_msg + , 'recv_msg.qos0' + , 'recv_msg.qos1' + , 'recv_msg.qos2' + , 'recv_msg.dropped' + , 'recv_msg.dropped.await_pubrel_timeout' + , send_pkt + , send_msg + , 'send_msg.qos0' + , 'send_msg.qos1' + , 'send_msg.qos2' + , 'send_msg.dropped' + , 'send_msg.dropped.expired' + , 'send_msg.dropped.queue_full' + , 'send_msg.dropped.too_large' + ]). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). @@ -181,10 +206,9 @@ get_session(#channel{session = Session}) -> set_session(Session, Channel) -> Channel#channel{session = Session}. -%% TODO: Add more stats. -spec(stats(channel()) -> emqx_types:stats()). stats(#channel{session = Session})-> - emqx_session:stats(Session). + lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)). -spec(caps(channel()) -> emqx_types:caps()). caps(#channel{clientinfo = #{zone := Zone}}) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 7938ada74..d419a5e6a 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -109,38 +109,6 @@ -define(ACTIVE_N, 100). --define(INFO_KEYS, [ socktype - , peername - , sockname - , sockstate - , active_n - ]). - --define(CONN_STATS, [ recv_pkt - , recv_msg - , 'recv_msg.qos0' - , 'recv_msg.qos1' - , 'recv_msg.qos2' - , 'recv_msg.dropped' - , 'recv_msg.dropped.await_pubrel_timeout' - , send_pkt - , send_msg - , 'send_msg.qos0' - , 'send_msg.qos1' - , 'send_msg.qos2' - , 'send_msg.dropped' - , 'send_msg.dropped.expired' - , 'send_msg.dropped.queue_full' - , 'send_msg.dropped.too_large' - ]). - --define(SOCK_STATS, [ recv_oct - , recv_cnt - , send_oct - , send_cnt - , send_pend - ]). - -define(ENABLED(X), (X =/= undefined)). -define(ALARM_TCP_CONGEST(Channel), @@ -148,12 +116,48 @@ [emqx_channel:info(clientid, Channel), emqx_channel:info(username, Channel)]))). --define(ALARM_CONN_INFO_KEYS, [ - socktype, sockname, peername, - clientid, username, proto_name, proto_ver, connected_at -]). --define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). --define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). +-define(INFO_KEYS, + [ socktype + , peername + , sockname + , sockstate + , active_n + ]). + +-define(SOCK_STATS, + [ recv_oct + , recv_cnt + , send_oct + , send_cnt + , send_pend + ]). + +-define(ALARM_CONN_INFO_KEYS, + [ socktype + , sockname + , peername + , clientid + , username + , proto_name + , proto_ver + , connected_at + ]). + +-define(ALARM_SOCK_STATS_KEYS, + [ send_pend + , recv_cnt + , recv_oct + , send_cnt + , send_oct + ]). + +-define(ALARM_SOCK_OPTS_KEYS, + [ high_watermark + , high_msgq_watermark + , sndbuf + , recbuf + , buffer + ]). -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [ init/4 @@ -214,10 +218,9 @@ stats(#state{transport = Transport, {ok, Ss} -> Ss; {error, _} -> [] end, - ConnStats = emqx_pd:get_counters(?CONN_STATS), ChanStats = emqx_channel:stats(Channel), ProcStats = emqx_misc:proc_stats(), - lists:append([SockStats, ConnStats, ChanStats, ProcStats]). + lists:append([SockStats, ChanStats, ProcStats]). %% @doc Set TCP keepalive socket options to override system defaults. %% Idle: The number of seconds a connection needs to be idle before diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 07d437b4b..d683ebe00 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -93,9 +93,21 @@ -type(ws_cmd() :: {active, boolean()}|close). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). --define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). --define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). + +-define(INFO_KEYS, + [ socktype + , peername + , sockname + , sockstate + , active_n + ]). + +-define(SOCK_STATS, + [ recv_oct + , recv_cnt + , send_oct + , send_cnt + ]). -define(ENABLED(X), (X =/= undefined)). @@ -146,10 +158,9 @@ stats(WsPid) when is_pid(WsPid) -> call(WsPid, stats); stats(#state{channel = Channel}) -> SockStats = emqx_pd:get_counters(?SOCK_STATS), - ConnStats = emqx_pd:get_counters(?CONN_STATS), ChanStats = emqx_channel:stats(Channel), ProcStats = emqx_misc:proc_stats(), - lists:append([SockStats, ConnStats, ChanStats, ProcStats]). + lists:append([SockStats, ChanStats, ProcStats]). %% kick|discard|takeover -spec(call(pid(), Req :: term()) -> Reply :: term()). @@ -615,6 +626,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), + ok = inc_outgoing_stats({error, message_too_large}), <<>>; Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), @@ -641,19 +653,28 @@ inc_recv_stats(Cnt, Oct) -> inc_incoming_stats(Packet = ?PACKET(Type)) -> _ = emqx_pd:inc_counter(recv_pkt, 1), - if Type == ?PUBLISH -> - inc_counter(recv_msg, 1), - inc_counter(incoming_pubs, 1); - true -> ok + case Type of + ?PUBLISH -> + inc_counter(recv_msg, 1), + inc_qos_stats(recv_msg, Packet), + inc_counter(incoming_pubs, 1); + _ -> + ok end, emqx_metrics:inc_recv(Packet). +inc_outgoing_stats({error, message_too_large}) -> + inc_counter('send_msg.dropped', 1), + inc_counter('send_msg.dropped.too_large', 1); inc_outgoing_stats(Packet = ?PACKET(Type)) -> _ = emqx_pd:inc_counter(send_pkt, 1), - if Type == ?PUBLISH -> - inc_counter(send_msg, 1), - inc_counter(outgoing_pubs, 1); - true -> ok + case Type of + ?PUBLISH -> + inc_counter(send_msg, 1), + inc_qos_stats(send_msg, Packet), + inc_counter(outgoing_pubs, 1); + _ -> + ok end, emqx_metrics:inc_sent(Packet). @@ -666,6 +687,20 @@ inc_sent_stats(Cnt, Oct) -> inc_counter(Name, Value) -> _ = emqx_pd:inc_counter(Name, Value), ok. + +inc_qos_stats(Type, #mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) when ?IS_QOS(QoS) -> + inc_counter(inc_qos_stats_key(Type, QoS), 1); +inc_qos_stats(_, _) -> + ok. + +inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0'; +inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1'; +inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2'; + +inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0'; +inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1'; +inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 3c1aad3de..f282d2eb6 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -169,8 +169,9 @@ t_stats(_) -> end end), Stats = ?ws_conn:call(WsPid, stats), - [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0}, - {recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}|_] = Stats. + [lists:member(V, Stats) || + V <- [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0}, + {recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}]]. t_call(_) -> Info = ?ws_conn:info(st()),