From f87f80f6e4c4bf6f791962dc6fdec227d991791a Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 18 Jan 2022 11:28:41 +0800 Subject: [PATCH 1/3] feat(metrics): client metrics with more detail --- apps/emqx/src/emqx_connection.erl | 53 +++++++++++++++++-- apps/emqx/src/emqx_session.erl | 16 +++++- .../src/emqx_mgmt_api_clients.erl | 24 +++++++++ 3 files changed, 88 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 9a92f2d1d..5d0cb4b2a 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -123,9 +123,37 @@ -type cache() :: #cache{}. -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate]). --define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). --define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). + +-define(INFO_KEYS, [ socktype + , peername + , sockname + , sockstate + ]). + +-define(CONN_STATS, [ recv_pkt + , recv_msg + , 'recv_msg.qos0' + , 'recv_msg.qos1' + , 'recv_msg.qos2' + , 'recv_msg.dropped' + , 'recv_msg.dropped.expired' + , send_pkt + , send_msg + , 'send_msg.qos0' + , 'send_msg.qos1' + , 'send_msg.qos2' + , 'send_msg.dropped' + , 'send_msg.dropped.expired' + , 'send_msg.dropped.queue_full' + , 'send_msg.dropped.too_large' + ]). + +-define(SOCK_STATS, [ recv_oct + , recv_cnt + , send_oct + , send_cnt + , send_pend + ]). -define(ENABLED(X), (X =/= undefined)). @@ -758,6 +786,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> }), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), + ok = inc_outgoing_stats({error, message_too_large}), <<>>; Data -> ?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}), @@ -985,6 +1014,7 @@ inc_incoming_stats(Packet = ?PACKET(Type)) -> case Type =:= ?PUBLISH of true -> inc_counter(recv_msg, 1), + inc_qos_stats(recv_msg, Packet), inc_counter(incoming_pubs, 1); false -> ok @@ -992,17 +1022,32 @@ inc_incoming_stats(Packet = ?PACKET(Type)) -> emqx_metrics:inc_recv(Packet). -compile({inline, [inc_outgoing_stats/1]}). +inc_outgoing_stats({error, message_too_large}) -> + inc_counter('send_msg.dropped', 1), + inc_counter('send_msg.dropped.too_large', 1); inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_counter(send_pkt, 1), case Type =:= ?PUBLISH of true -> inc_counter(send_msg, 1), - inc_counter(outgoing_pubs, 1); + inc_counter(outgoing_pubs, 1), + inc_qos_stats(send_msg, Packet); false -> ok end, emqx_metrics:inc_sent(Packet). +inc_qos_stats(Type, Packet) -> + inc_counter(inc_qos_stats_key(Type, emqx_packet:qos(Packet)), 1). + +inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0'; +inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1'; +inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2'; + +inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0'; +inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1'; +inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'. + %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index b34315900..f5c9e0e1e 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -329,7 +329,8 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts, {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS), - ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), + ok = emqx_hooks:run('session.unsubscribed', + [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> {error, ?RC_NO_SUBSCRIPTION_EXISTED} @@ -541,11 +542,14 @@ log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) -> case (QoS == ?QOS_0) andalso (not StoreQos0) of true -> ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), + ok = inc_pd('send_msg.dropped'), ?SLOG(warning, #{msg => "dropped_qos0_msg", queue => QueueInfo, payload => Payload}, #{topic => Topic}); false -> ok = emqx_metrics:inc('delivery.dropped.queue_full'), + ok = inc_pd('send_msg.dropped'), + ok = inc_pd('send_msg.dropped.queue_full'), ?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full", queue => QueueInfo, payload => Payload}, #{topic => Topic}) @@ -723,13 +727,23 @@ run_hook(Name, Args) -> inc_expired_cnt(K) -> inc_expired_cnt(K, 1). inc_expired_cnt(delivery, N) -> + ok = inc_pd('send_msg.dropped', N), + ok = inc_pd('send_msg.dropped.expired', N), ok = emqx_metrics:inc('delivery.dropped', N), emqx_metrics:inc('delivery.dropped.expired', N); inc_expired_cnt(message, N) -> + ok = inc_pd('recv_msg.dropped', N), + ok = inc_pd('recv_msg.dropped.expired', N), ok = emqx_metrics:inc('messages.dropped', N), emqx_metrics:inc('messages.dropped.expired', N). +inc_pd(Key) -> + inc_pd(Key, 1). +inc_pd(Key, Inc) -> + _ = emqx_pd:inc_counter(Key, Inc), + ok. + %%-------------------------------------------------------------------- %% Next Packet Id %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index be38e401d..7ea987ef3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -146,6 +146,16 @@ properties(client) -> <<"Number of TCP packets received">>}, {recv_msg, integer, <<"Number of PUBLISH packets received">>}, + {'recv_msg.qos0', integer, + <<"Number of PUBLISH QoS0 packets received">>}, + {'recv_msg.qos1', integer, + <<"Number of PUBLISH QoS1 packets received">>}, + {'recv_msg.qos2', integer, + <<"Number of PUBLISH QoS2 packets received">>}, + {'recv_msg.dropped', integer, + <<"Number of Dropped PUBLISH packets received">>}, + {'recv_msg.dropped.expired', integer, + <<"Number of Dropped Expired PUBLISH packets received">>}, {recv_oct, integer, <<"Number of bytes received by EMQ X Broker (the same below)">>}, {recv_pkt, integer, @@ -156,6 +166,20 @@ properties(client) -> <<"Number of TCP packets sent">>}, {send_msg, integer, <<"Number of PUBLISH packets sent">>}, + {'send_msg.qos0', integer, + <<"Number of PUBLISH QoS0 packets sent">>}, + {'send_msg.qos1', integer, + <<"Number of PUBLISH QoS1 packets sent">>}, + {'send_msg.qos2', integer, + <<"Number of PUBLISH QoS2 packets sent">>}, + {'send_msg.dropped', integer, + <<"Number of Dropped PUBLISH packets sent">>}, + {'send_msg.dropped.expired', integer, + <<"Number of Dropped Expired PUBLISH packets sent">>}, + {'send_msg.dropped.queue_full', integer, + <<"Number of Dropped Queue Full PUBLISH packets sent">>}, + {'send_msg.dropped.too_large', integer, + <<"Number of Dropped Too Large PUBLISH packets sent">>}, {send_oct, integer, <<"Number of bytes sent">>}, {send_pkt, integer, From 98e7ce7b7cc1c271c526c2b0906d79f844426802 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 18 Jan 2022 18:06:47 +0800 Subject: [PATCH 2/3] fix(test): for paho qos3 --- apps/emqx/src/emqx_connection.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 5d0cb4b2a..46ce25d10 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -1038,7 +1038,12 @@ inc_outgoing_stats(Packet = ?PACKET(Type)) -> emqx_metrics:inc_sent(Packet). inc_qos_stats(Type, Packet) -> - inc_counter(inc_qos_stats_key(Type, emqx_packet:qos(Packet)), 1). + case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of + undefined -> + ignore; + Key -> + inc_counter(Key, 1) + end. inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0'; inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1'; @@ -1046,7 +1051,9 @@ inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2'; inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0'; inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1'; -inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'. +inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'; +%% for bad qos +inc_qos_stats_key(_, _) -> undefined. %%-------------------------------------------------------------------- %% Helper functions From 3a80baa087d2efbb27df041b9204dd203233fe7e Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 18 Jan 2022 18:53:32 +0800 Subject: [PATCH 3/3] fix(api): clients api docs --- apps/emqx_management/src/emqx_mgmt_api_clients.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7ea987ef3..f4483523d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -153,9 +153,9 @@ properties(client) -> {'recv_msg.qos2', integer, <<"Number of PUBLISH QoS2 packets received">>}, {'recv_msg.dropped', integer, - <<"Number of Dropped PUBLISH packets received">>}, + <<"Number of dropped PUBLISH packets">>}, {'recv_msg.dropped.expired', integer, - <<"Number of Dropped Expired PUBLISH packets received">>}, + <<"Number of dropped PUBLISH packets due to expired">>}, {recv_oct, integer, <<"Number of bytes received by EMQ X Broker (the same below)">>}, {recv_pkt, integer, @@ -173,13 +173,13 @@ properties(client) -> {'send_msg.qos2', integer, <<"Number of PUBLISH QoS2 packets sent">>}, {'send_msg.dropped', integer, - <<"Number of Dropped PUBLISH packets sent">>}, + <<"Number of dropped PUBLISH packets">>}, {'send_msg.dropped.expired', integer, - <<"Number of Dropped Expired PUBLISH packets sent">>}, + <<"Number of dropped PUBLISH packets due to expired">>}, {'send_msg.dropped.queue_full', integer, - <<"Number of Dropped Queue Full PUBLISH packets sent">>}, + <<"Number of dropped PUBLISH packets due to queue full">>}, {'send_msg.dropped.too_large', integer, - <<"Number of Dropped Too Large PUBLISH packets sent">>}, + <<"Number of dropped PUBLISH packets due to packet length too large">>}, {send_oct, integer, <<"Number of bytes sent">>}, {send_pkt, integer,