From 458101958b3d5e28e7dd92b9c05c022904412b31 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Dec 2022 12:45:40 +0800 Subject: [PATCH 1/3] fix: run `message.dropped` hook, inc `messages.dropped` metrics - when awaiting_rel full - packet identifier in use (QoS2 packet resend) --- changes/v4.4.12-en.md | 4 +++- changes/v4.4.12-zh.md | 4 +++- src/emqx.appup.src | 4 ++++ src/emqx_channel.erl | 2 -- src/emqx_session.erl | 14 ++++++++++++-- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/changes/v4.4.12-en.md b/changes/v4.4.12-en.md index 7f3b5cfeb..d7c50c34a 100644 --- a/changes/v4.4.12-en.md +++ b/changes/v4.4.12-en.md @@ -4,4 +4,6 @@ ### Bug Fixes -- Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx-enterprise/pull/9474). +- Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx/pull/9474). + +- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9486](https://github.com/emqx/emqx/pull/9486). diff --git a/changes/v4.4.12-zh.md b/changes/v4.4.12-zh.md index 66d8e5a8c..4dd86e913 100644 --- a/changes/v4.4.12-zh.md +++ b/changes/v4.4.12-zh.md @@ -5,4 +5,6 @@ ### 修复 -- 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx-enterprise/pull/9474). +- 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx/pull/9474). + +- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9486](https://github.com/emqx/emqx-enterprise/pull/9486)。 diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c43bc3ec9..afdf74062 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -6,6 +6,7 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.4.10", [{add_module,emqx_cover}, @@ -19,6 +20,7 @@ {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -380,6 +382,7 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {delete_module,emqx_cover}]}, {"4.4.10", [{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -389,6 +392,7 @@ {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 5b5fa26e5..791433b37 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -634,8 +634,6 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> - ?LOG(warning, "Dropped the qos2 packet ~w " - "due to awaiting_rel is full.", [PacketId]), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(disconnect, RC, Channel) end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b4e15cefb..6a6f168dc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -325,15 +325,25 @@ publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}, AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), {ok, Results, Session#session{awaiting_rel = AwaitingRel1}}; true -> - {error, ?RC_PACKET_IDENTIFIER_IN_USE} + ?LOG(warning, "Dropped the qos2 packet ~w " + "due to packet ID in use.", [PacketId]), + drop_qos2_msg(Msg, ?RC_PACKET_IDENTIFIER_IN_USE) end; - true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + true -> + ?LOG(warning, "Dropped the qos2 packet ~w " + "due to awaiting_rel is full.", [PacketId]), + drop_qos2_msg(Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED) end; %% Publish QoS0/1 directly publish(_PacketId, Msg, Session) -> {ok, emqx_broker:publish(Msg), Session}. +drop_qos2_msg(Msg, ReasonCode) -> + ok = emqx_metrics:inc('messages.dropped'), + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(ReasonCode)]), + {error, ReasonCode}. + -compile({inline, [is_awaiting_full/1]}). is_awaiting_full(#session{max_awaiting_rel = 0}) -> false; From fe0f2bc4e704731b0a3189b5e03e93a660297b51 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Dec 2022 12:48:14 +0800 Subject: [PATCH 2/3] test(session): ensure 'message.dropped' hook ran with named reason --- test/emqx_session_SUITE.erl | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index f15fadb82..d481bc2f2 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -142,14 +142,31 @@ t_publish_qos2(_) -> t_publish_qos2_with_error_return(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), - Session = session(#{max_awaiting_rel => 2, - awaiting_rel => #{1 => ts(millisecond)} - }), - Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session), - {ok, [], Session1} = emqx_session:publish(2, Msg, Session), - ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), - {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1). + ok = meck:expect(emqx_hooks, run, fun('message.dropped', [Msg, _By, Reason]) -> + persistent_term:put({'message.dropped', Reason}, Msg); + (_Hook, _Arg) -> + ok + end), + + Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{PacketId1 = 1 => ts(millisecond)}}), + begin + Msg1 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload1">>), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(PacketId1, Msg1, Session), + KeyIdentifierInUse = {'message.dropped', emqx_reason_codes:name(?RC_PACKET_IDENTIFIER_IN_USE)}, + ?assertEqual(Msg1, persistent_term:get(KeyIdentifierInUse)), + persistent_term:erase(KeyIdentifierInUse) + end, + + begin + Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>), + {ok, [], Session1} = emqx_session:publish(PacketId2 = 2, Msg2, Session), + ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(PacketId2, Msg2, Session1), + KeyMaxExceeded = {'message.dropped', emqx_reason_codes:name(?RC_RECEIVE_MAXIMUM_EXCEEDED)}, + ?assertEqual(Msg2, persistent_term:get(KeyMaxExceeded)), + persistent_term:erase(KeyMaxExceeded) + end, + ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end). t_is_awaiting_full_false(_) -> Session = session(#{max_awaiting_rel => 0}), From 97bfe359d11956ba3f864be255ecd99eb97c5097 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Dec 2022 15:56:42 +0800 Subject: [PATCH 3/3] chore: fix typo --- CHANGES-4.3.md | 2 +- test/emqx_node_helpers.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index aa958ad12..dcb858545 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -159,7 +159,7 @@ File format: password-protected private key files used for dashboard and management HTTPS listeners. [#8129] - Add message republish supports using placeholder variables to specify QoS and Retain values. Set `${qos}` and `${flags.retain}` use the original QoS & Retain flag. -- Add supports specifying the network interface address of the cluster listener & rcp call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address. +- Add supports specifying the network interface address of the cluster listener & rpc call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address. - ExHook supports to customize the socket parameters for gRPC client. [#8314] ### Bug fixes diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index c2d16275e..42ed71b1a 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -106,7 +106,7 @@ setup_node(Node, #{} = Opts) -> EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler), %% apps need to be loaded before starting for ekka to find and create mnesia tables - LoadApps = lists:usort([gen_rcp, emqx] ++ ?SLAVE_START_APPS), + LoadApps = lists:usort([gen_rpc, emqx] ++ ?SLAVE_START_APPS), lists:foreach(fun(App) -> rpc:call(Node, application, load, [App]) end, LoadApps),