fix(connection): revert metric names for 'recv_msg.dropped.await_pubrel_timeout'
This commit is contained in:
parent
e220810b90
commit
9118cfb4ad
|
@ -124,35 +124,39 @@
|
||||||
|
|
||||||
-define(ACTIVE_N, 100).
|
-define(ACTIVE_N, 100).
|
||||||
|
|
||||||
-define(INFO_KEYS, [ socktype
|
-define(INFO_KEYS,
|
||||||
, peername
|
[ socktype
|
||||||
, sockname
|
, peername
|
||||||
, sockstate
|
, sockname
|
||||||
]).
|
, sockstate
|
||||||
|
]).
|
||||||
|
|
||||||
-define(CONN_STATS, [ recv_pkt
|
-define(CONN_STATS,
|
||||||
, recv_msg
|
[ recv_pkt
|
||||||
, 'recv_msg.qos0'
|
, recv_msg
|
||||||
, 'recv_msg.qos1'
|
, 'recv_msg.qos0'
|
||||||
, 'recv_msg.qos2'
|
, 'recv_msg.qos1'
|
||||||
, send_pkt
|
, 'recv_msg.qos2'
|
||||||
, send_msg
|
, 'recv_msg.dropped'
|
||||||
, 'send_msg.qos0'
|
, 'recv_msg.dropped.await_pubrel_timeout'
|
||||||
, 'send_msg.qos1'
|
, send_pkt
|
||||||
, 'send_msg.qos2'
|
, send_msg
|
||||||
, 'send_msg.dropped'
|
, 'send_msg.qos0'
|
||||||
, 'send_msg.dropped.await_pubrel_timeout'
|
, 'send_msg.qos1'
|
||||||
, 'send_msg.dropped.expired'
|
, 'send_msg.qos2'
|
||||||
, 'send_msg.dropped.queue_full'
|
, 'send_msg.dropped'
|
||||||
, 'send_msg.dropped.too_large'
|
, 'send_msg.dropped.expired'
|
||||||
]).
|
, 'send_msg.dropped.queue_full'
|
||||||
|
, 'send_msg.dropped.too_large'
|
||||||
|
]).
|
||||||
|
|
||||||
-define(SOCK_STATS, [ recv_oct
|
-define(SOCK_STATS,
|
||||||
, recv_cnt
|
[ recv_oct
|
||||||
, send_oct
|
, recv_cnt
|
||||||
, send_cnt
|
, send_oct
|
||||||
, send_pend
|
, send_cnt
|
||||||
]).
|
, send_pend
|
||||||
|
]).
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
|
|
|
@ -147,27 +147,29 @@
|
||||||
|
|
||||||
-type(replies() :: list(publish() | pubrel())).
|
-type(replies() :: list(publish() | pubrel())).
|
||||||
|
|
||||||
-define(INFO_KEYS, [id,
|
-define(INFO_KEYS,
|
||||||
is_persistent,
|
[ id
|
||||||
subscriptions,
|
, is_persistent
|
||||||
upgrade_qos,
|
, subscriptions
|
||||||
retry_interval,
|
, upgrade_qos
|
||||||
await_rel_timeout,
|
, retry_interval
|
||||||
created_at
|
, await_rel_timeout
|
||||||
]).
|
, created_at
|
||||||
|
]).
|
||||||
|
|
||||||
-define(STATS_KEYS, [subscriptions_cnt,
|
-define(STATS_KEYS,
|
||||||
subscriptions_max,
|
[ subscriptions_cnt
|
||||||
inflight_cnt,
|
, subscriptions_max
|
||||||
inflight_max,
|
, inflight_cnt
|
||||||
mqueue_len,
|
, inflight_max
|
||||||
mqueue_max,
|
, mqueue_len
|
||||||
mqueue_dropped,
|
, mqueue_max
|
||||||
next_pkt_id,
|
, mqueue_dropped
|
||||||
awaiting_rel_cnt,
|
, next_pkt_id
|
||||||
awaiting_rel_max,
|
, awaiting_rel_cnt
|
||||||
latency_stats
|
, awaiting_rel_max
|
||||||
]).
|
, latency_stats
|
||||||
|
]).
|
||||||
|
|
||||||
-define(DEFAULT_BATCH_N, 1000).
|
-define(DEFAULT_BATCH_N, 1000).
|
||||||
|
|
||||||
|
@ -730,8 +732,8 @@ inc_delivery_expired_cnt(N) ->
|
||||||
emqx_metrics:inc('delivery.dropped.expired', N).
|
emqx_metrics:inc('delivery.dropped.expired', N).
|
||||||
|
|
||||||
inc_await_pubrel_timeout(N) ->
|
inc_await_pubrel_timeout(N) ->
|
||||||
ok = inc_pd('send_msg.dropped', N),
|
ok = inc_pd('recv_msg.dropped', N),
|
||||||
ok = inc_pd('send_msg.dropped.await_pubrel_timeout', N),
|
ok = inc_pd('recv_msg.dropped.await_pubrel_timeout', N),
|
||||||
ok = emqx_metrics:inc('messages.dropped', N),
|
ok = emqx_metrics:inc('messages.dropped', N),
|
||||||
emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
|
emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
|
||||||
|
|
||||||
|
|
|
@ -126,19 +126,19 @@ t_unsubscribe(_) ->
|
||||||
t_publish_qos0(_) ->
|
t_publish_qos0(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
||||||
Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>),
|
Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>),
|
||||||
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
|
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
|
||||||
{ok, [], Session} = emqx_session:publish(clientinfo(), undefined, Msg, Session).
|
{ok, [], Session} = emqx_session:publish(clientinfo(), undefined, Msg, Session).
|
||||||
|
|
||||||
t_publish_qos1(_) ->
|
t_publish_qos1(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
||||||
Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>),
|
Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>),
|
||||||
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
|
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
|
||||||
{ok, [], Session} = emqx_session:publish(clientinfo(), 2, Msg, Session).
|
{ok, [], Session} = emqx_session:publish(clientinfo(), 2, Msg, Session).
|
||||||
|
|
||||||
t_publish_qos2(_) ->
|
t_publish_qos2(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
||||||
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
|
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
|
||||||
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, session()),
|
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, session()),
|
||||||
?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)),
|
?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)),
|
||||||
{ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session),
|
{ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session),
|
||||||
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)),
|
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)),
|
||||||
|
@ -150,10 +150,10 @@ t_publish_qos2_with_error_return(_) ->
|
||||||
awaiting_rel => #{1 => ts(millisecond)}
|
awaiting_rel => #{1 => ts(millisecond)}
|
||||||
}),
|
}),
|
||||||
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
|
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session),
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session),
|
||||||
{ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session),
|
{ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session),
|
||||||
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
|
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
|
||||||
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1).
|
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1).
|
||||||
|
|
||||||
t_is_awaiting_full_false(_) ->
|
t_is_awaiting_full_false(_) ->
|
||||||
Session = session(#{max_awaiting_rel => infinity}),
|
Session = session(#{max_awaiting_rel => infinity}),
|
||||||
|
|
|
@ -152,6 +152,10 @@ properties(client) ->
|
||||||
<<"Number of PUBLISH QoS1 packets received">>},
|
<<"Number of PUBLISH QoS1 packets received">>},
|
||||||
{'recv_msg.qos2', integer,
|
{'recv_msg.qos2', integer,
|
||||||
<<"Number of PUBLISH QoS2 packets received">>},
|
<<"Number of PUBLISH QoS2 packets received">>},
|
||||||
|
{'recv_msg.dropped', integer,
|
||||||
|
<<"Number of dropped PUBLISH messages">>},
|
||||||
|
{'recv_msg.dropped.await_pubrel_timeout', integer,
|
||||||
|
<<"Number of dropped PUBLISH messages due to waiting PUBREL timeout">>},
|
||||||
{recv_oct, integer,
|
{recv_oct, integer,
|
||||||
<<"Number of bytes received by EMQ X Broker (the same below)">>},
|
<<"Number of bytes received by EMQ X Broker (the same below)">>},
|
||||||
{recv_pkt, integer,
|
{recv_pkt, integer,
|
||||||
|
@ -170,8 +174,6 @@ properties(client) ->
|
||||||
<<"Number of PUBLISH QoS2 messages sent">>},
|
<<"Number of PUBLISH QoS2 messages sent">>},
|
||||||
{'send_msg.dropped', integer,
|
{'send_msg.dropped', integer,
|
||||||
<<"Number of dropped PUBLISH messages">>},
|
<<"Number of dropped PUBLISH messages">>},
|
||||||
{'send_msg.dropped.await_pubrel_timeout', integer,
|
|
||||||
<<"Number of dropped PUBLISH packets due to waiting PUBREL timeout">>},
|
|
||||||
{'send_msg.dropped.expired', integer,
|
{'send_msg.dropped.expired', integer,
|
||||||
<<"Number of dropped PUBLISH messages due to expired">>},
|
<<"Number of dropped PUBLISH messages due to expired">>},
|
||||||
{'send_msg.dropped.queue_full', integer,
|
{'send_msg.dropped.queue_full', integer,
|
||||||
|
|
Loading…
Reference in New Issue