Merge pull request #6764 from DDDHuang/client_metrics
feat: client metrics with message dropped & timeout
This commit is contained in:
commit
9385d9473d
|
@ -313,8 +313,14 @@ format_channel_info({_Key, Info, Stats0}) ->
|
||||||
inflight, max_inflight, awaiting_rel,
|
inflight, max_inflight, awaiting_rel,
|
||||||
max_awaiting_rel, mqueue_len, mqueue_dropped,
|
max_awaiting_rel, mqueue_len, mqueue_dropped,
|
||||||
max_mqueue, heap_size, reductions, mailbox_len,
|
max_mqueue, heap_size, reductions, mailbox_len,
|
||||||
recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt,
|
recv_cnt,
|
||||||
send_msg, send_oct, send_pkt], NStats),
|
recv_msg, 'recv_msg.qos0', 'recv_msg.qos1', 'recv_msg.qos2',
|
||||||
|
'recv_msg.dropped', 'recv_msg.dropped.expired',
|
||||||
|
recv_oct, recv_pkt, send_cnt,
|
||||||
|
send_msg, 'send_msg.qos0', 'send_msg.qos1', 'send_msg.qos2',
|
||||||
|
'send_msg.dropped', 'send_msg.dropped.expired',
|
||||||
|
'send_msg.dropped.queue_full', 'send_msg.dropped.too_large',
|
||||||
|
send_oct, send_pkt], NStats),
|
||||||
maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo),
|
maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo),
|
||||||
maps:with([clean_start, keepalive, expiry_interval, proto_name,
|
maps:with([clean_start, keepalive, expiry_interval, proto_name,
|
||||||
proto_ver, peername, connected_at, disconnected_at], ConnInfo),
|
proto_ver, peername, connected_at, disconnected_at], ConnInfo),
|
||||||
|
@ -373,7 +379,7 @@ match_fun(Ms, Fuzzy) ->
|
||||||
|
|
||||||
run_fuzzy_match(_, []) ->
|
run_fuzzy_match(_, []) ->
|
||||||
true;
|
true;
|
||||||
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr}|Fuzzy]) ->
|
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) ->
|
||||||
Val = case maps:get(Key, ClientInfo, undefined) of
|
Val = case maps:get(Key, ClientInfo, undefined) of
|
||||||
undefined -> <<>>;
|
undefined -> <<>>;
|
||||||
V -> V
|
V -> V
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.4.0",
|
[{"4.4.0",
|
||||||
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||||
|
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
||||||
|
@ -9,6 +10,7 @@
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.4.0",
|
[{"4.4.0",
|
||||||
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||||
|
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
||||||
|
|
|
@ -108,9 +108,38 @@
|
||||||
-type(state() :: #state{}).
|
-type(state() :: #state{}).
|
||||||
|
|
||||||
-define(ACTIVE_N, 100).
|
-define(ACTIVE_N, 100).
|
||||||
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
|
||||||
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
-define(INFO_KEYS, [ socktype
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
, peername
|
||||||
|
, sockname
|
||||||
|
, sockstate
|
||||||
|
, active_n
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(CONN_STATS, [ recv_pkt
|
||||||
|
, recv_msg
|
||||||
|
, 'recv_msg.qos0'
|
||||||
|
, 'recv_msg.qos1'
|
||||||
|
, 'recv_msg.qos2'
|
||||||
|
, 'recv_msg.dropped'
|
||||||
|
, 'recv_msg.dropped.expired'
|
||||||
|
, send_pkt
|
||||||
|
, send_msg
|
||||||
|
, 'send_msg.qos0'
|
||||||
|
, 'send_msg.qos1'
|
||||||
|
, 'send_msg.qos2'
|
||||||
|
, 'send_msg.dropped'
|
||||||
|
, 'send_msg.dropped.expired'
|
||||||
|
, 'send_msg.dropped.queue_full'
|
||||||
|
, 'send_msg.dropped.too_large'
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(SOCK_STATS, [ recv_oct
|
||||||
|
, recv_cnt
|
||||||
|
, send_oct
|
||||||
|
, send_cnt
|
||||||
|
, send_pend
|
||||||
|
]).
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
|
@ -146,7 +175,7 @@ start_link(Transport, Socket, Options) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Get infos of the connection/channel.
|
%% @doc Get infos of the connection/channel.
|
||||||
-spec(info(pid()|state()) -> emqx_types:infos()).
|
-spec(info(pid() | state()) -> emqx_types:infos()).
|
||||||
info(CPid) when is_pid(CPid) ->
|
info(CPid) when is_pid(CPid) ->
|
||||||
call(CPid, info);
|
call(CPid, info);
|
||||||
info(State = #state{channel = Channel}) ->
|
info(State = #state{channel = Channel}) ->
|
||||||
|
@ -175,7 +204,7 @@ info(limiter, #state{limiter = Limiter}) ->
|
||||||
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
||||||
|
|
||||||
%% @doc Get stats of the connection/channel.
|
%% @doc Get stats of the connection/channel.
|
||||||
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
-spec(stats(pid() | state()) -> emqx_types:stats()).
|
||||||
stats(CPid) when is_pid(CPid) ->
|
stats(CPid) when is_pid(CPid) ->
|
||||||
call(CPid, stats);
|
call(CPid, stats);
|
||||||
stats(#state{transport = Transport,
|
stats(#state{transport = Transport,
|
||||||
|
@ -359,7 +388,7 @@ cancel_stats_timer(State) -> State.
|
||||||
|
|
||||||
process_msg([], State) ->
|
process_msg([], State) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
process_msg([Msg|More], State) ->
|
process_msg([Msg | More], State) ->
|
||||||
try
|
try
|
||||||
case handle_msg(Msg, State) of
|
case handle_msg(Msg, State) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -453,7 +482,7 @@ handle_msg({Passive, _Sock}, State)
|
||||||
|
|
||||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
||||||
#state{active_n = ActiveN} = State) ->
|
#state{active_n = ActiveN} = State) ->
|
||||||
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)],
|
||||||
with_channel(handle_deliver, [Delivers], State);
|
with_channel(handle_deliver, [Delivers], State);
|
||||||
|
|
||||||
%% Something sent
|
%% Something sent
|
||||||
|
@ -627,7 +656,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
{Packets, State#state{parse_state = NParseState}};
|
{Packets, State#state{parse_state = NParseState}};
|
||||||
{ok, Packet, Rest, NParseState} ->
|
{ok, Packet, Rest, NParseState} ->
|
||||||
NState = State#state{parse_state = NParseState},
|
NState = State#state{parse_state = NParseState},
|
||||||
parse_incoming(Rest, [Packet|Packets], NState)
|
parse_incoming(Rest, [Packet | Packets], NState)
|
||||||
catch
|
catch
|
||||||
error:proxy_protocol_config_disabled:_Stk ->
|
error:proxy_protocol_config_disabled:_Stk ->
|
||||||
?LOG(error,
|
?LOG(error,
|
||||||
|
@ -691,6 +720,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
[emqx_packet:format(Packet)]),
|
[emqx_packet:format(Packet)]),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped'),
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
|
ok = inc_outgoing_stats({error, message_too_large}),
|
||||||
<<>>;
|
<<>>;
|
||||||
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
|
||||||
ok = inc_outgoing_stats(Packet),
|
ok = inc_outgoing_stats(Packet),
|
||||||
|
@ -835,17 +865,31 @@ inc_incoming_stats(Packet = ?PACKET(Type)) ->
|
||||||
emqx_metrics:inc_recv(Packet).
|
emqx_metrics:inc_recv(Packet).
|
||||||
|
|
||||||
-compile({inline, [inc_outgoing_stats/1]}).
|
-compile({inline, [inc_outgoing_stats/1]}).
|
||||||
|
inc_outgoing_stats({error, message_too_large}) ->
|
||||||
|
inc_counter('send_msg.dropped', 1),
|
||||||
|
inc_counter('send_msg.dropped.too_large', 1);
|
||||||
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
|
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
|
||||||
inc_counter(send_pkt, 1),
|
inc_counter(send_pkt, 1),
|
||||||
case Type =:= ?PUBLISH of
|
case Type =:= ?PUBLISH of
|
||||||
true ->
|
true ->
|
||||||
inc_counter(send_msg, 1),
|
inc_counter(send_msg, 1),
|
||||||
inc_counter(outgoing_pubs, 1);
|
inc_counter(outgoing_pubs, 1),
|
||||||
|
inc_qos_stats(send_msg, Packet);
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
emqx_metrics:inc_sent(Packet).
|
emqx_metrics:inc_sent(Packet).
|
||||||
|
|
||||||
|
inc_qos_stats(Type, #mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) ->
|
||||||
|
inc_counter(inc_qos_stats_key(Type, QoS), 1).
|
||||||
|
|
||||||
|
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
|
||||||
|
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
|
||||||
|
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
|
||||||
|
|
||||||
|
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
|
||||||
|
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
|
||||||
|
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
|
|
||||||
|
|
|
@ -427,8 +427,8 @@ dequeue(ClientInfo, Cnt, Msgs, Q) ->
|
||||||
{{value, Msg}, Q1} ->
|
{{value, Msg}, Q1} ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
|
||||||
ok = inc_delivery_expired_cnt(),
|
ok = inc_delivery_expired_cnt(),
|
||||||
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
||||||
dequeue(ClientInfo, Cnt, Msgs, Q1);
|
dequeue(ClientInfo, Cnt, Msgs, Q1);
|
||||||
false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg | Msgs], Q1)
|
false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg | Msgs], Q1)
|
||||||
end
|
end
|
||||||
|
@ -503,11 +503,14 @@ log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
|
||||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
|
||||||
ok = emqx_metrics:inc('delivery.dropped'),
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
||||||
|
ok = inc_pd('send_msg.dropped'),
|
||||||
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
|
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
|
||||||
ok = emqx_metrics:inc('delivery.dropped'),
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
||||||
|
ok = inc_pd('send_msg.dropped'),
|
||||||
|
ok = inc_pd('send_msg.dropped.queue_full'),
|
||||||
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
|
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
|
||||||
[emqx_message:format(Msg)])
|
[emqx_message:format(Msg)])
|
||||||
end.
|
end.
|
||||||
|
@ -568,7 +571,9 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
||||||
%% Retry Delivery
|
%% Retry Delivery
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
|
-spec(retry(emqx_types:clientinfo(), session())
|
||||||
|
-> {ok, session()}
|
||||||
|
| {ok, replies(), timeout(), session()}).
|
||||||
retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
|
retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
|
||||||
case emqx_inflight:is_empty(Inflight) of
|
case emqx_inflight:is_empty(Inflight) of
|
||||||
true -> {ok, Session};
|
true -> {ok, Session};
|
||||||
|
@ -680,13 +685,23 @@ inc_delivery_expired_cnt() ->
|
||||||
inc_delivery_expired_cnt(1).
|
inc_delivery_expired_cnt(1).
|
||||||
|
|
||||||
inc_delivery_expired_cnt(N) ->
|
inc_delivery_expired_cnt(N) ->
|
||||||
|
ok = inc_pd('send_msg.dropped', N),
|
||||||
|
ok = inc_pd('send_msg.dropped.expired', N),
|
||||||
ok = emqx_metrics:inc('delivery.dropped', N),
|
ok = emqx_metrics:inc('delivery.dropped', N),
|
||||||
emqx_metrics:inc('delivery.dropped.expired', N).
|
emqx_metrics:inc('delivery.dropped.expired', N).
|
||||||
|
|
||||||
inc_await_pubrel_timeout(N) ->
|
inc_await_pubrel_timeout(N) ->
|
||||||
|
ok = inc_pd('recv_msg.dropped', N),
|
||||||
|
ok = inc_pd('recv_msg.dropped.expired', N),
|
||||||
ok = emqx_metrics:inc('messages.dropped', N),
|
ok = emqx_metrics:inc('messages.dropped', N),
|
||||||
emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
|
emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
|
||||||
|
|
||||||
|
inc_pd(Key) ->
|
||||||
|
inc_pd(Key, 1).
|
||||||
|
inc_pd(Key, Inc) ->
|
||||||
|
_ = emqx_pd:inc_counter(Key, Inc),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Next Packet Id
|
%% Next Packet Id
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue