diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 2c0a74fe9..5bb57dc9f 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -119,7 +119,33 @@ quota_timer => expire_quota_limit }). --define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). +-define(CHANNEL_METRICS, + [ recv_pkt + , recv_msg + , 'recv_msg.qos0' + , 'recv_msg.qos1' + , 'recv_msg.qos2' + , 'recv_msg.dropped' + , 'recv_msg.dropped.await_pubrel_timeout' + , send_pkt + , send_msg + , 'send_msg.qos0' + , 'send_msg.qos1' + , 'send_msg.qos2' + , 'send_msg.dropped' + , 'send_msg.dropped.expired' + , 'send_msg.dropped.queue_full' + , 'send_msg.dropped.too_large' + ]). + +-define(INFO_KEYS, + [ conninfo + , conn_state + , clientinfo + , session + , will_msg + ]). + -define(LIMITER_ROUTING, message_routing). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). @@ -184,10 +210,9 @@ set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = Client Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), Channel#channel{session = Session1}. -%% TODO: Add more stats. -spec(stats(channel()) -> emqx_types:stats()). stats(#channel{session = Session})-> - emqx_session:stats(Session). + lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)). -spec(caps(channel()) -> emqx_types:caps()). caps(#channel{clientinfo = #{zone := Zone}}) -> diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 43566d1df..b554233a7 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -131,25 +131,6 @@ , sockstate ]). --define(CONN_STATS, - [ recv_pkt - , recv_msg - , 'recv_msg.qos0' - , 'recv_msg.qos1' - , 'recv_msg.qos2' - , 'recv_msg.dropped' - , 'recv_msg.dropped.await_pubrel_timeout' - , send_pkt - , send_msg - , 'send_msg.qos0' - , 'send_msg.qos1' - , 'send_msg.qos2' - , 'send_msg.dropped' - , 'send_msg.dropped.expired' - , 'send_msg.dropped.queue_full' - , 'send_msg.dropped.too_large' - ]). - -define(SOCK_STATS, [ recv_oct , recv_cnt @@ -236,10 +217,9 @@ stats(#state{transport = Transport, {ok, Ss} -> Ss; {error, _} -> [] end, - ConnStats = emqx_pd:get_counters(?CONN_STATS), ChanStats = emqx_channel:stats(Channel), ProcStats = emqx_misc:proc_stats(), - lists:append([SockStats, ConnStats, ChanStats, ProcStats]). + lists:append([SockStats, ChanStats, ProcStats]). %% @doc Set TCP keepalive socket options to override system defaults. %% Idle: The number of seconds a connection needs to be idle before @@ -1030,12 +1010,12 @@ inc_outgoing_stats({error, message_too_large}) -> inc_counter('send_msg.dropped.too_large', 1); inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_counter(send_pkt, 1), - case Type =:= ?PUBLISH of - true -> + case Type of + ?PUBLISH -> inc_counter(send_msg, 1), inc_counter(outgoing_pubs, 1), inc_qos_stats(send_msg, Packet); - false -> + _ -> ok end, emqx_metrics:inc_sent(Packet). diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 7a4dba368..cd5b7c515 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -112,7 +112,6 @@ -define(ACTIVE_N, 100). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). --define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(ENABLED(X), (X =/= undefined)). -define(LIMITER_BYTES_IN, bytes_in). @@ -163,10 +162,9 @@ stats(WsPid) when is_pid(WsPid) -> call(WsPid, stats); stats(#state{channel = Channel}) -> SockStats = emqx_pd:get_counters(?SOCK_STATS), - ConnStats = emqx_pd:get_counters(?CONN_STATS), ChanStats = emqx_channel:stats(Channel), ProcStats = emqx_misc:proc_stats(), - lists:append([SockStats, ConnStats, ChanStats, ProcStats]). + lists:append([SockStats, ChanStats, ProcStats]). %% kick|discard|takeover -spec(call(pid(), Req :: term()) -> Reply :: term()). @@ -725,6 +723,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> packet => emqx_packet:format(Packet)}), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), + ok = inc_outgoing_stats({error, message_too_large}), <<>>; Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}), ok = inc_outgoing_stats(Packet), @@ -762,19 +761,28 @@ inc_recv_stats(Cnt, Oct) -> inc_incoming_stats(Packet = ?PACKET(Type)) -> _ = emqx_pd:inc_counter(recv_pkt, 1), - if Type == ?PUBLISH -> - inc_counter(recv_msg, 1), - inc_counter(incoming_pubs, 1); - true -> ok + case Type of + ?PUBLISH -> + inc_counter(recv_msg, 1), + inc_qos_stats(recv_msg, Packet), + inc_counter(incoming_pubs, 1); + _ -> + ok end, emqx_metrics:inc_recv(Packet). +inc_outgoing_stats({error, message_too_large}) -> + inc_counter('send_msg.dropped', 1), + inc_counter('send_msg.dropped.too_large', 1); inc_outgoing_stats(Packet = ?PACKET(Type)) -> - _ = emqx_pd:inc_counter(send_pkt, 1), - if Type == ?PUBLISH -> - inc_counter(send_msg, 1), - inc_counter(outgoing_pubs, 1); - true -> ok + inc_counter(send_pkt, 1), + case Type of + ?PUBLISH -> + inc_counter(send_msg, 1), + inc_counter(outgoing_pubs, 1), + inc_qos_stats(send_msg, Packet); + _ -> + ok end, emqx_metrics:inc_sent(Packet). @@ -787,6 +795,25 @@ inc_sent_stats(Cnt, Oct) -> inc_counter(Name, Value) -> _ = emqx_pd:inc_counter(Name, Value), ok. + +inc_qos_stats(Type, Packet) -> + case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of + undefined -> + ignore; + Key -> + inc_counter(Key, 1) + end. + +inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0'; +inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1'; +inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2'; + +inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0'; +inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1'; +inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'; +%% for bad qos +inc_qos_stats_key(_, _) -> undefined. + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------