From 1ce77de080e4f5875c99033e7cd6b8a036112ae8 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Mon, 17 Jan 2022 14:51:46 +0800 Subject: [PATCH 1/3] feat(metrics): client metrics --- src/emqx_connection.erl | 53 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 235d92783..5eeb323b0 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -108,9 +108,39 @@ -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). --define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). --define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). + +-define(INFO_KEYS, [ socktype + , peername + , sockname + , sockstate + , active_n + ]). + +-define(CONN_STATS, [ recv_pkt + , recv_msg + , 'recv_msg.qos0' + , 'recv_msg.qos1' + , 'recv_msg.qos2' + , 'recv_msg.dropped' + , 'recv_msg.dropped.expired' + + , send_pkt + , send_msg + , 'send_msg.qos0' + , 'send_msg.qos1' + , 'send_msg.qos2' + , 'send_msg.dropped' + , 'send_msg.dropped.expired' + , 'send_msg.dropped.queue_full' + , 'send_msg.dropped.too_large' + ]). + +-define(SOCK_STATS, [ recv_oct + , recv_cnt + , send_oct + , send_cnt + , send_pend + ]). -define(ENABLED(X), (X =/= undefined)). @@ -691,6 +721,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), + ok = inc_outgoing_stats({error, message_too_large}), <<>>; Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), @@ -835,17 +866,31 @@ inc_incoming_stats(Packet = ?PACKET(Type)) -> emqx_metrics:inc_recv(Packet). -compile({inline, [inc_outgoing_stats/1]}). +inc_outgoing_stats({error, message_too_large}) -> + inc_counter('send_msg.dropped', 1), + inc_counter('send_msg.dropped.too_large', 1); inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_counter(send_pkt, 1), case Type =:= ?PUBLISH of true -> inc_counter(send_msg, 1), - inc_counter(outgoing_pubs, 1); + inc_counter(outgoing_pubs, 1), + inc_qos_stats(send_msg, Packet); false -> ok end, emqx_metrics:inc_sent(Packet). +inc_qos_stats(Type, #mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) -> + inc_counter(inc_qos_stats_key(Type, QoS), 1). + +inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0'; +inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1'; +inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2'; + +inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0'; +inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1'; +inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'. %%-------------------------------------------------------------------- %% Helper functions From 5397d80680d7e6c0902206bc59c64db2bf5a4c74 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Mon, 17 Jan 2022 15:12:08 +0800 Subject: [PATCH 2/3] feat(metrics): session metrics & api format --- .../src/emqx_mgmt_api_clients.erl | 12 +++++++++--- src/emqx.appup.src | 2 ++ src/emqx_connection.erl | 10 +++++----- src/emqx_session.erl | 18 ++++++++++++++++-- 4 files changed, 32 insertions(+), 10 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 71e94f841..de1fbabde 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -313,8 +313,14 @@ format_channel_info({_Key, Info, Stats0}) -> inflight, max_inflight, awaiting_rel, max_awaiting_rel, mqueue_len, mqueue_dropped, max_mqueue, heap_size, reductions, mailbox_len, - recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, - send_msg, send_oct, send_pkt], NStats), + recv_cnt, + recv_msg, 'recv_msg.qos0', 'recv_msg.qos1', 'recv_msg.qos2', + 'recv_msg.dropped', 'recv_msg.dropped.expired', + recv_oct, recv_pkt, send_cnt, + send_msg, 'send_msg.qos0', 'send_msg.qos1', 'send_msg.qos2', + 'send_msg.dropped', 'send_msg.dropped.expired', + 'send_msg.dropped.queue_full', 'send_msg.dropped.too_large', + send_oct, send_pkt], NStats), maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), maps:with([clean_start, keepalive, expiry_interval, proto_name, proto_ver, peername, connected_at, disconnected_at], ConnInfo), @@ -373,7 +379,7 @@ match_fun(Ms, Fuzzy) -> run_fuzzy_match(_, []) -> true; -run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr}|Fuzzy]) -> +run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) -> Val = case maps:get(Key, ClientInfo, undefined) of undefined -> <<>>; V -> V diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 8e1016cc4..2e300b85e 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,6 +2,7 @@ {VSN, [{"4.4.0", [ {load_module,emqx_channel,brutal_purge,soft_purge,[]} + , {load_module,emqx_connection,brutal_purge,soft_purge,[]} , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} , {load_module,emqx_session,brutal_purge,soft_purge,[]} , {load_module,emqx_alarm,brutal_purge,soft_purge,[]} @@ -9,6 +10,7 @@ {<<".*">>,[]}], [{"4.4.0", [ {load_module,emqx_channel,brutal_purge,soft_purge,[]} + , {load_module,emqx_connection,brutal_purge,soft_purge,[]} , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} , {load_module,emqx_session,brutal_purge,soft_purge,[]} , {load_module,emqx_alarm,brutal_purge,soft_purge,[]} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 5eeb323b0..2c2cc472d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -176,7 +176,7 @@ start_link(Transport, Socket, Options) -> %%-------------------------------------------------------------------- %% @doc Get infos of the connection/channel. --spec(info(pid()|state()) -> emqx_types:infos()). +-spec(info(pid() | state()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); info(State = #state{channel = Channel}) -> @@ -205,7 +205,7 @@ info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). %% @doc Get stats of the connection/channel. --spec(stats(pid()|state()) -> emqx_types:stats()). +-spec(stats(pid() | state()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); stats(#state{transport = Transport, @@ -389,7 +389,7 @@ cancel_stats_timer(State) -> State. process_msg([], State) -> {ok, State}; -process_msg([Msg|More], State) -> +process_msg([Msg | More], State) -> try case handle_msg(Msg, State) of ok -> @@ -483,7 +483,7 @@ handle_msg({Passive, _Sock}, State) handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{active_n = ActiveN} = State) -> - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -657,7 +657,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> {Packets, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, - parse_incoming(Rest, [Packet|Packets], NState) + parse_incoming(Rest, [Packet | Packets], NState) catch error:proxy_protocol_config_disabled:_Stk -> ?LOG(error, diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 6d4b2af34..9648083d9 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -427,8 +427,8 @@ dequeue(ClientInfo, Cnt, Msgs, Q) -> {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of true -> - ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), ok = inc_delivery_expired_cnt(), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), dequeue(ClientInfo, Cnt, Msgs, Q1); false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg | Msgs], Q1) end @@ -503,11 +503,14 @@ log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), + ok = inc_pd('send_msg.dropped'), ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]); false -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.queue_full'), + ok = inc_pd('send_msg.dropped'), + ok = inc_pd('send_msg.dropped.queue_full'), ?LOG(warning, "Dropped msg due to mqueue is full: ~s", [emqx_message:format(Msg)]) end. @@ -568,7 +571,8 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %% Retry Delivery %%-------------------------------------------------------------------- --spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). +-spec(retry(emqx_types:clientinfo(), session()) -> + {ok, session()} | {ok, replies(), timeout(), session()}). retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; @@ -680,13 +684,23 @@ inc_delivery_expired_cnt() -> inc_delivery_expired_cnt(1). inc_delivery_expired_cnt(N) -> + ok = inc_pd('send_msg.dropped', N), + ok = inc_pd('send_msg.dropped.expired', N), ok = emqx_metrics:inc('delivery.dropped', N), emqx_metrics:inc('delivery.dropped.expired', N). inc_await_pubrel_timeout(N) -> + ok = inc_pd('recv_msg.dropped', N), + ok = inc_pd('recv_msg.dropped.expired', N), ok = emqx_metrics:inc('messages.dropped', N), emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N). +inc_pd(Key) -> + inc_pd(Key, 1). +inc_pd(Key, Inc) -> + _ = emqx_pd:inc_counter(Key, Inc), + ok. + %%-------------------------------------------------------------------- %% Next Packet Id %%-------------------------------------------------------------------- From 6278951d578b3a1e15c8174052a58b4ff2d55bcd Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Mon, 17 Jan 2022 15:42:34 +0800 Subject: [PATCH 3/3] fix: code style --- src/emqx_connection.erl | 1 - src/emqx_session.erl | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2c2cc472d..bd9a9fbe4 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -123,7 +123,6 @@ , 'recv_msg.qos2' , 'recv_msg.dropped' , 'recv_msg.dropped.expired' - , send_pkt , send_msg , 'send_msg.qos0' diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9648083d9..3ba286f54 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -571,8 +571,9 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %% Retry Delivery %%-------------------------------------------------------------------- --spec(retry(emqx_types:clientinfo(), session()) -> - {ok, session()} | {ok, replies(), timeout(), session()}). +-spec(retry(emqx_types:clientinfo(), session()) + -> {ok, session()} + | {ok, replies(), timeout(), session()}). retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session};