Merge pull request #6969 from DDDHuang/ws_client_metrics

feat(metrics): ws client support more send&recv metrics
This commit is contained in:
DDDHuang 2022-02-11 11:14:56 +08:00 committed by GitHub
commit fbefc92178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 41 deletions

View File

@ -119,7 +119,33 @@
quota_timer => expire_quota_limit quota_timer => expire_quota_limit
}). }).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). -define(CHANNEL_METRICS,
[ recv_pkt
, recv_msg
, 'recv_msg.qos0'
, 'recv_msg.qos1'
, 'recv_msg.qos2'
, 'recv_msg.dropped'
, 'recv_msg.dropped.await_pubrel_timeout'
, send_pkt
, send_msg
, 'send_msg.qos0'
, 'send_msg.qos1'
, 'send_msg.qos2'
, 'send_msg.dropped'
, 'send_msg.dropped.expired'
, 'send_msg.dropped.queue_full'
, 'send_msg.dropped.too_large'
]).
-define(INFO_KEYS,
[ conninfo
, conn_state
, clientinfo
, session
, will_msg
]).
-define(LIMITER_ROUTING, message_routing). -define(LIMITER_ROUTING, message_routing).
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@ -184,10 +210,9 @@ set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = Client
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
Channel#channel{session = Session1}. Channel#channel{session = Session1}.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()). -spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})-> stats(#channel{session = Session})->
emqx_session:stats(Session). lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)).
-spec(caps(channel()) -> emqx_types:caps()). -spec(caps(channel()) -> emqx_types:caps()).
caps(#channel{clientinfo = #{zone := Zone}}) -> caps(#channel{clientinfo = #{zone := Zone}}) ->

View File

@ -131,25 +131,6 @@
, sockstate , sockstate
]). ]).
-define(CONN_STATS,
[ recv_pkt
, recv_msg
, 'recv_msg.qos0'
, 'recv_msg.qos1'
, 'recv_msg.qos2'
, 'recv_msg.dropped'
, 'recv_msg.dropped.await_pubrel_timeout'
, send_pkt
, send_msg
, 'send_msg.qos0'
, 'send_msg.qos1'
, 'send_msg.qos2'
, 'send_msg.dropped'
, 'send_msg.dropped.expired'
, 'send_msg.dropped.queue_full'
, 'send_msg.dropped.too_large'
]).
-define(SOCK_STATS, -define(SOCK_STATS,
[ recv_oct [ recv_oct
, recv_cnt , recv_cnt
@ -236,10 +217,9 @@ stats(#state{transport = Transport,
{ok, Ss} -> Ss; {ok, Ss} -> Ss;
{error, _} -> [] {error, _} -> []
end, end,
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(Channel), ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(), ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ChanStats, ProcStats]).
%% @doc Set TCP keepalive socket options to override system defaults. %% @doc Set TCP keepalive socket options to override system defaults.
%% Idle: The number of seconds a connection needs to be idle before %% Idle: The number of seconds a connection needs to be idle before
@ -1030,12 +1010,12 @@ inc_outgoing_stats({error, message_too_large}) ->
inc_counter('send_msg.dropped.too_large', 1); inc_counter('send_msg.dropped.too_large', 1);
inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_outgoing_stats(Packet = ?PACKET(Type)) ->
inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
case Type =:= ?PUBLISH of case Type of
true -> ?PUBLISH ->
inc_counter(send_msg, 1), inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1), inc_counter(outgoing_pubs, 1),
inc_qos_stats(send_msg, Packet); inc_qos_stats(send_msg, Packet);
false -> _ ->
ok ok
end, end,
emqx_metrics:inc_sent(Packet). emqx_metrics:inc_sent(Packet).

View File

@ -112,7 +112,6 @@
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(ENABLED(X), (X =/= undefined)). -define(ENABLED(X), (X =/= undefined)).
-define(LIMITER_BYTES_IN, bytes_in). -define(LIMITER_BYTES_IN, bytes_in).
@ -163,10 +162,9 @@ stats(WsPid) when is_pid(WsPid) ->
call(WsPid, stats); call(WsPid, stats);
stats(#state{channel = Channel}) -> stats(#state{channel = Channel}) ->
SockStats = emqx_pd:get_counters(?SOCK_STATS), SockStats = emqx_pd:get_counters(?SOCK_STATS),
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(Channel), ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(), ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ChanStats, ProcStats]).
%% kick|discard|takeover %% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()). -spec(call(pid(), Req :: term()) -> Reply :: term()).
@ -725,6 +723,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
packet => emqx_packet:format(Packet)}), packet => emqx_packet:format(Packet)}),
ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped'),
ok = inc_outgoing_stats({error, message_too_large}),
<<>>; <<>>;
Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}), Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}),
ok = inc_outgoing_stats(Packet), ok = inc_outgoing_stats(Packet),
@ -762,19 +761,28 @@ inc_recv_stats(Cnt, Oct) ->
inc_incoming_stats(Packet = ?PACKET(Type)) -> inc_incoming_stats(Packet = ?PACKET(Type)) ->
_ = emqx_pd:inc_counter(recv_pkt, 1), _ = emqx_pd:inc_counter(recv_pkt, 1),
if Type == ?PUBLISH -> case Type of
?PUBLISH ->
inc_counter(recv_msg, 1), inc_counter(recv_msg, 1),
inc_qos_stats(recv_msg, Packet),
inc_counter(incoming_pubs, 1); inc_counter(incoming_pubs, 1);
true -> ok _ ->
ok
end, end,
emqx_metrics:inc_recv(Packet). emqx_metrics:inc_recv(Packet).
inc_outgoing_stats({error, message_too_large}) ->
inc_counter('send_msg.dropped', 1),
inc_counter('send_msg.dropped.too_large', 1);
inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_outgoing_stats(Packet = ?PACKET(Type)) ->
_ = emqx_pd:inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
if Type == ?PUBLISH -> case Type of
?PUBLISH ->
inc_counter(send_msg, 1), inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1); inc_counter(outgoing_pubs, 1),
true -> ok inc_qos_stats(send_msg, Packet);
_ ->
ok
end, end,
emqx_metrics:inc_sent(Packet). emqx_metrics:inc_sent(Packet).
@ -787,6 +795,25 @@ inc_sent_stats(Cnt, Oct) ->
inc_counter(Name, Value) -> inc_counter(Name, Value) ->
_ = emqx_pd:inc_counter(Name, Value), _ = emqx_pd:inc_counter(Name, Value),
ok. ok.
inc_qos_stats(Type, Packet) ->
case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
undefined ->
ignore;
Key ->
inc_counter(Key, 1)
end.
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
%% for bad qos
inc_qos_stats_key(_, _) -> undefined.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -178,8 +178,9 @@ t_stats(_) ->
end end
end), end),
Stats = ?ws_conn:call(WsPid, stats), Stats = ?ws_conn:call(WsPid, stats),
[?assert(lists:member(V, Stats)) || V <-
[{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0}, [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0},
{recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}|_] = Stats. {recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}]].
t_call(_) -> t_call(_) ->
Info = ?ws_conn:info(st()), Info = ?ws_conn:info(st()),