From 458101958b3d5e28e7dd92b9c05c022904412b31 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Dec 2022 12:45:40 +0800 Subject: [PATCH] fix: run `message.dropped` hook, inc `messages.dropped` metrics - when awaiting_rel full - packet identifier in use (QoS2 packet resend) --- changes/v4.4.12-en.md | 4 +++- changes/v4.4.12-zh.md | 4 +++- src/emqx.appup.src | 4 ++++ src/emqx_channel.erl | 2 -- src/emqx_session.erl | 14 ++++++++++++-- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/changes/v4.4.12-en.md b/changes/v4.4.12-en.md index 7f3b5cfeb..d7c50c34a 100644 --- a/changes/v4.4.12-en.md +++ b/changes/v4.4.12-en.md @@ -4,4 +4,6 @@ ### Bug Fixes -- Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx-enterprise/pull/9474). +- Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx/pull/9474). + +- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9486](https://github.com/emqx/emqx/pull/9486). diff --git a/changes/v4.4.12-zh.md b/changes/v4.4.12-zh.md index 66d8e5a8c..4dd86e913 100644 --- a/changes/v4.4.12-zh.md +++ b/changes/v4.4.12-zh.md @@ -5,4 +5,6 @@ ### 修复 -- 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx-enterprise/pull/9474). +- 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx/pull/9474). + +- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9486](https://github.com/emqx/emqx-enterprise/pull/9486)。 diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c43bc3ec9..afdf74062 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -6,6 +6,7 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.4.10", [{add_module,emqx_cover}, @@ -19,6 +20,7 @@ {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -380,6 +382,7 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {delete_module,emqx_cover}]}, {"4.4.10", [{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -389,6 +392,7 @@ {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 5b5fa26e5..791433b37 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -634,8 +634,6 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?LOG(warning, "Dropped the qos2 packet ~w " - "due to awaiting_rel is full.", [PacketId]), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(disconnect, RC, Channel) end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b4e15cefb..6a6f168dc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -325,15 +325,25 @@ publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), {ok, Results, Session#session{awaiting_rel = AwaitingRel1}}; true -> - {error, ?RC_PACKET_IDENTIFIER_IN_USE} + ?LOG(warning, "Dropped the qos2 packet ~w " + "due to packet ID in use.", [PacketId]), + drop_qos2_msg(Msg, ?RC_PACKET_IDENTIFIER_IN_USE) end; - true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + true -> + ?LOG(warning, "Dropped the qos2 packet ~w " + "due to awaiting_rel is full.", [PacketId]), + drop_qos2_msg(Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED) end; %% Publish QoS0/1 directly publish(_PacketId, Msg, Session) -> {ok, emqx_broker:publish(Msg), Session}. +drop_qos2_msg(Msg, ReasonCode) -> + ok = emqx_metrics:inc('messages.dropped'), + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(ReasonCode)]), + {error, ReasonCode}. + -compile({inline, [is_awaiting_full/1]}). is_awaiting_full(#session{max_awaiting_rel = 0}) -> false;