feat(metrics): session metrics & api format

This commit is contained in:
DDDHuang 2022-01-17 15:12:08 +08:00
parent 1ce77de080
commit 5397d80680
4 changed files with 32 additions and 10 deletions

View File

@ -313,8 +313,14 @@ format_channel_info({_Key, Info, Stats0}) ->
inflight, max_inflight, awaiting_rel,
max_awaiting_rel, mqueue_len, mqueue_dropped,
max_mqueue, heap_size, reductions, mailbox_len,
recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt,
send_msg, send_oct, send_pkt], NStats),
recv_cnt,
recv_msg, 'recv_msg.qos0', 'recv_msg.qos1', 'recv_msg.qos2',
'recv_msg.dropped', 'recv_msg.dropped.expired',
recv_oct, recv_pkt, send_cnt,
send_msg, 'send_msg.qos0', 'send_msg.qos1', 'send_msg.qos2',
'send_msg.dropped', 'send_msg.dropped.expired',
'send_msg.dropped.queue_full', 'send_msg.dropped.too_large',
send_oct, send_pkt], NStats),
maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo),
maps:with([clean_start, keepalive, expiry_interval, proto_name,
proto_ver, peername, connected_at, disconnected_at], ConnInfo),
@ -373,7 +379,7 @@ match_fun(Ms, Fuzzy) ->
run_fuzzy_match(_, []) ->
true;
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr}|Fuzzy]) ->
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, undefined) of
undefined -> <<>>;
V -> V

View File

@ -2,6 +2,7 @@
{VSN,
[{"4.4.0",
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
@ -9,6 +10,7 @@
{<<".*">>,[]}],
[{"4.4.0",
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}

View File

@ -176,7 +176,7 @@ start_link(Transport, Socket, Options) ->
%%--------------------------------------------------------------------
%% @doc Get infos of the connection/channel.
-spec(info(pid()|state()) -> emqx_types:infos()).
-spec(info(pid() | state()) -> emqx_types:infos()).
info(CPid) when is_pid(CPid) ->
call(CPid, info);
info(State = #state{channel = Channel}) ->
@ -205,7 +205,7 @@ info(limiter, #state{limiter = Limiter}) ->
maybe_apply(fun emqx_limiter:info/1, Limiter).
%% @doc Get stats of the connection/channel.
-spec(stats(pid()|state()) -> emqx_types:stats()).
-spec(stats(pid() | state()) -> emqx_types:stats()).
stats(CPid) when is_pid(CPid) ->
call(CPid, stats);
stats(#state{transport = Transport,
@ -389,7 +389,7 @@ cancel_stats_timer(State) -> State.
process_msg([], State) ->
{ok, State};
process_msg([Msg|More], State) ->
process_msg([Msg | More], State) ->
try
case handle_msg(Msg, State) of
ok ->
@ -483,7 +483,7 @@ handle_msg({Passive, _Sock}, State)
handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
@ -657,7 +657,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
{Packets, State#state{parse_state = NParseState}};
{ok, Packet, Rest, NParseState} ->
NState = State#state{parse_state = NParseState},
parse_incoming(Rest, [Packet|Packets], NState)
parse_incoming(Rest, [Packet | Packets], NState)
catch
error:proxy_protocol_config_disabled:_Stk ->
?LOG(error,

View File

@ -427,8 +427,8 @@ dequeue(ClientInfo, Cnt, Msgs, Q) ->
{{value, Msg}, Q1} ->
case emqx_message:is_expired(Msg) of
true ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
ok = inc_delivery_expired_cnt(),
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
dequeue(ClientInfo, Cnt, Msgs, Q1);
false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg | Msgs], Q1)
end
@ -503,11 +503,14 @@ log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
ok = inc_pd('send_msg.dropped'),
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
false ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
ok = inc_pd('send_msg.dropped'),
ok = inc_pd('send_msg.dropped.queue_full'),
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
[emqx_message:format(Msg)])
end.
@ -568,7 +571,8 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
%% Retry Delivery
%%--------------------------------------------------------------------
-spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
-spec(retry(emqx_types:clientinfo(), session()) ->
{ok, session()} | {ok, replies(), timeout(), session()}).
retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
case emqx_inflight:is_empty(Inflight) of
true -> {ok, Session};
@ -680,13 +684,23 @@ inc_delivery_expired_cnt() ->
inc_delivery_expired_cnt(1).
inc_delivery_expired_cnt(N) ->
ok = inc_pd('send_msg.dropped', N),
ok = inc_pd('send_msg.dropped.expired', N),
ok = emqx_metrics:inc('delivery.dropped', N),
emqx_metrics:inc('delivery.dropped.expired', N).
inc_await_pubrel_timeout(N) ->
ok = inc_pd('recv_msg.dropped', N),
ok = inc_pd('recv_msg.dropped.expired', N),
ok = emqx_metrics:inc('messages.dropped', N),
emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
inc_pd(Key) ->
inc_pd(Key, 1).
inc_pd(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
%%--------------------------------------------------------------------
%% Next Packet Id
%%--------------------------------------------------------------------