diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 3b18c20cb..b2d44847e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -752,15 +752,6 @@ do_publish( ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?SLOG( - warning, - #{ - msg => "dropped_qos2_packet", - reason => emqx_reason_codes:name(RC), - packet_id => PacketId - }, - #{topic => Msg#message.topic} - ), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(disconnect, RC, Channel) end. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 5e98ebbeb..6e1060414 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -390,15 +390,29 @@ publish( AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), {ok, Results, Session#session{awaiting_rel = AwaitingRel1}}; true -> - {error, ?RC_PACKET_IDENTIFIER_IN_USE} + drop_qos2_msg(PacketId, Msg, ?RC_PACKET_IDENTIFIER_IN_USE) end; true -> - {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + drop_qos2_msg(PacketId, Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED) end; %% Publish QoS0/1 directly publish(_ClientInfo, _PacketId, Msg, Session) -> {ok, emqx_broker:publish(Msg), Session}. +drop_qos2_msg(PacketId, Msg, RC) -> + ?SLOG( + warning, + #{ + msg => "dropped_qos2_packet", + reason => emqx_reason_codes:name(RC), + packet_id => PacketId + }, + #{topic => Msg#message.topic} + ), + ok = emqx_metrics:inc('messages.dropped'), + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(RC)]), + {error, RC}. + is_awaiting_full(#session{max_awaiting_rel = infinity}) -> false; is_awaiting_full(#session{ diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 6547c520b..c171382c7 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -165,15 +165,45 @@ t_publish_qos2(_) -> t_publish_qos2_with_error_return(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), - Session = session(#{ - max_awaiting_rel => 2, - awaiting_rel => #{1 => ts(millisecond)} - }), - Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session), - {ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session), - ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), - {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1). + ok = meck:expect(emqx_hooks, run, fun + ('message.dropped', [Msg, _By, ReasonName]) -> + self() ! {'message.dropped', ReasonName, Msg}, + ok; + (_Hook, _Arg) -> + ok + end), + + Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{PacketId1 = 1 => ts(millisecond)}}), + begin + Msg1 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload1">>), + {error, RC1 = ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish( + clientinfo(), PacketId1, Msg1, Session + ), + receive + {'message.dropped', Reason1, RecMsg1} -> + ?assertEqual(Reason1, emqx_reason_codes:name(RC1)), + ?assertEqual(RecMsg1, Msg1) + after 1000 -> + ct:fail(?FUNCTION_NAME) + end + end, + + begin + Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>), + {ok, [], Session1} = emqx_session:publish(clientinfo(), PacketId2 = 2, Msg2, Session), + ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), + {error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish( + clientinfo(), _PacketId3 = 3, Msg2, Session1 + ), + receive + {'message.dropped', Reason2, RecMsg2} -> + ?assertEqual(Reason2, emqx_reason_codes:name(RC2)), + ?assertEqual(RecMsg2, Msg2) + after 1000 -> + ct:fail(?FUNCTION_NAME) + end + end, + ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end). t_is_awaiting_full_false(_) -> Session = session(#{max_awaiting_rel => infinity}), diff --git a/changes/v5.0.10-en.md b/changes/v5.0.10-en.md index 42c3f6610..b91ec30f7 100644 --- a/changes/v5.0.10-en.md +++ b/changes/v5.0.10-en.md @@ -24,6 +24,11 @@ - Binary packages for all platforms are now built on Erlang/OTP version 24.3.4.2 [#9293](https://github.com/emqx/emqx/pull/9293). +- Added more `client.disconnected` events (and counter bumps) [#9327](https://github.com/emqx/emqx/pull/9327). + Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client + performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a + stale connection had to be 'discarded' (for clean session) or 'takeovered' (for non-clean session) by new connection. + ## Bug fixes - Fix error log message when `mechanism` is missing in authentication config [#8924](https://github.com/emqx/emqx/pull/8924). diff --git a/changes/v5.0.10-zh.md b/changes/v5.0.10-zh.md index bbaa758b3..92cc1ccab 100644 --- a/changes/v5.0.10-zh.md +++ b/changes/v5.0.10-zh.md @@ -23,6 +23,10 @@ - 为所有平台的二进制包升级了 Erlang/OTP 到 24.3.4.2 [#9293](https://github.com/emqx/emqx/pull/9293)。 +- 增加更多类型的 `client.disconnected` 事件(及计数器触发) [#9327](https://github.com/emqx/emqx/pull/9327)。 + 此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发, + 但不会在旧 session 被新连接废弃时 (clean_session = true) ,或旧 session 被新连接接管时 (clean_session = false) 被触发。 + ## Bug fixes - 优化认认证配置中 `mechanism` 字段缺失情况下的错误日志 [#8924](https://github.com/emqx/emqx/pull/8924)。 diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 7f4ad90ae..23d3840b3 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -11,3 +11,5 @@ - Refactor: use `POST` not `PUT` for `/users/{name}/change_pwd` [#9533](https://github.com/emqx/emqx/pull/9533). ## Bug fixes + +- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index e0e818b45..ce25bc35d 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -11,3 +11,5 @@ - 重构: `/users/{name}/change_pwd` 的请求方式从 `PUT` 改为了 `POST` [#9533](https://github.com/emqx/emqx/pull/9533)。 ## 修复 + +- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。