From b88398c3c6ea5723d9d3cef54813c9c259229fb7 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Dec 2022 13:49:15 +0800 Subject: [PATCH] fix: run `message.dropped` hook, inc `messages.dropped` metrics - when awaiting_rel full - packet identifier in use (QoS2 packet resend) --- apps/emqx/src/emqx_channel.erl | 9 --------- apps/emqx/src/emqx_session.erl | 18 ++++++++++++++++-- changes/v5.0.13-en.md | 2 ++ changes/v5.0.13-zh.md | 2 ++ 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 3b18c20cb..b2d44847e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -752,15 +752,6 @@ do_publish( ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?SLOG( - warning, - #{ - msg => "dropped_qos2_packet", - reason => emqx_reason_codes:name(RC), - packet_id => PacketId - }, - #{topic => Msg#message.topic} - ), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(disconnect, RC, Channel) end. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 5e98ebbeb..6e1060414 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -390,15 +390,29 @@ publish( AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), {ok, Results, Session#session{awaiting_rel = AwaitingRel1}}; true -> - {error, ?RC_PACKET_IDENTIFIER_IN_USE} + drop_qos2_msg(PacketId, Msg, ?RC_PACKET_IDENTIFIER_IN_USE) end; true -> - {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + drop_qos2_msg(PacketId, Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED) end; %% Publish QoS0/1 directly publish(_ClientInfo, _PacketId, Msg, Session) -> {ok, emqx_broker:publish(Msg), Session}. +drop_qos2_msg(PacketId, Msg, RC) -> + ?SLOG( + warning, + #{ + msg => "dropped_qos2_packet", + reason => emqx_reason_codes:name(RC), + packet_id => PacketId + }, + #{topic => Msg#message.topic} + ), + ok = emqx_metrics:inc('messages.dropped'), + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(RC)]), + {error, RC}. + is_awaiting_full(#session{max_awaiting_rel = infinity}) -> false; is_awaiting_full(#session{ diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 7f4ad90ae..23d3840b3 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -11,3 +11,5 @@ - Refactor: use `POST` not `PUT` for `/users/{name}/change_pwd` [#9533](https://github.com/emqx/emqx/pull/9533). ## Bug fixes + +- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index e0e818b45..ce25bc35d 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -11,3 +11,5 @@ - 重构: `/users/{name}/change_pwd` 的请求方式从 `PUT` 改为了 `POST` [#9533](https://github.com/emqx/emqx/pull/9533)。 ## 修复 + +- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。