Merge pull request #9486 from JimMoen/fix-message-dropped-event-v44

Fix message dropped event v44
This commit is contained in:
Xinyu Liu 2022-12-07 18:00:29 +08:00 committed by GitHub
commit a54f88a1ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 49 additions and 16 deletions

View File

@ -159,7 +159,7 @@ File format:
password-protected private key files used for dashboard and password-protected private key files used for dashboard and
management HTTPS listeners. [#8129] management HTTPS listeners. [#8129]
- Add message republish supports using placeholder variables to specify QoS and Retain values. Set `${qos}` and `${flags.retain}` use the original QoS & Retain flag. - Add message republish supports using placeholder variables to specify QoS and Retain values. Set `${qos}` and `${flags.retain}` use the original QoS & Retain flag.
- Add supports specifying the network interface address of the cluster listener & rcp call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address. - Add supports specifying the network interface address of the cluster listener & rpc call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address.
- ExHook supports to customize the socket parameters for gRPC client. [#8314] - ExHook supports to customize the socket parameters for gRPC client. [#8314]
### Bug fixes ### Bug fixes

View File

@ -4,4 +4,6 @@
### Bug Fixes ### Bug Fixes
- Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx-enterprise/pull/9474). - Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx/pull/9474).
- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9486](https://github.com/emqx/emqx/pull/9486).

View File

@ -5,4 +5,6 @@
### 修复 ### 修复
- 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx-enterprise/pull/9474). - 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx/pull/9474).
- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9486](https://github.com/emqx/emqx-enterprise/pull/9486)。

View File

@ -6,6 +6,7 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.4.10", {"4.4.10",
[{add_module,emqx_cover}, [{add_module,emqx_cover},
@ -19,6 +20,7 @@
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -380,6 +382,7 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{delete_module,emqx_cover}]}, {delete_module,emqx_cover}]},
{"4.4.10", {"4.4.10",
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, [{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -389,6 +392,7 @@
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},

View File

@ -634,8 +634,6 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
ok = emqx_metrics:inc('packets.publish.inuse'), ok = emqx_metrics:inc('packets.publish.inuse'),
handle_out(pubrec, {PacketId, RC}, Channel); handle_out(pubrec, {PacketId, RC}, Channel);
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
?LOG(warning, "Dropped the qos2 packet ~w "
"due to awaiting_rel is full.", [PacketId]),
ok = emqx_metrics:inc('packets.publish.dropped'), ok = emqx_metrics:inc('packets.publish.dropped'),
handle_out(disconnect, RC, Channel) handle_out(disconnect, RC, Channel)
end. end.

View File

@ -325,15 +325,25 @@ publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel), AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
{ok, Results, Session#session{awaiting_rel = AwaitingRel1}}; {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
true -> true ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ?LOG(warning, "Dropped the qos2 packet ~w "
"due to packet ID in use.", [PacketId]),
drop_qos2_msg(Msg, ?RC_PACKET_IDENTIFIER_IN_USE)
end; end;
true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} true ->
?LOG(warning, "Dropped the qos2 packet ~w "
"due to awaiting_rel is full.", [PacketId]),
drop_qos2_msg(Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED)
end; end;
%% Publish QoS0/1 directly %% Publish QoS0/1 directly
publish(_PacketId, Msg, Session) -> publish(_PacketId, Msg, Session) ->
{ok, emqx_broker:publish(Msg), Session}. {ok, emqx_broker:publish(Msg), Session}.
drop_qos2_msg(Msg, ReasonCode) ->
ok = emqx_metrics:inc('messages.dropped'),
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(ReasonCode)]),
{error, ReasonCode}.
-compile({inline, [is_awaiting_full/1]}). -compile({inline, [is_awaiting_full/1]}).
is_awaiting_full(#session{max_awaiting_rel = 0}) -> is_awaiting_full(#session{max_awaiting_rel = 0}) ->
false; false;

View File

@ -106,7 +106,7 @@ setup_node(Node, #{} = Opts) ->
EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler), EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler),
%% apps need to be loaded before starting for ekka to find and create mnesia tables %% apps need to be loaded before starting for ekka to find and create mnesia tables
LoadApps = lists:usort([gen_rcp, emqx] ++ ?SLAVE_START_APPS), LoadApps = lists:usort([gen_rpc, emqx] ++ ?SLAVE_START_APPS),
lists:foreach(fun(App) -> lists:foreach(fun(App) ->
rpc:call(Node, application, load, [App]) rpc:call(Node, application, load, [App])
end, LoadApps), end, LoadApps),

View File

@ -142,14 +142,31 @@ t_publish_qos2(_) ->
t_publish_qos2_with_error_return(_) -> t_publish_qos2_with_error_return(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Session = session(#{max_awaiting_rel => 2, ok = meck:expect(emqx_hooks, run, fun('message.dropped', [Msg, _By, Reason]) ->
awaiting_rel => #{1 => ts(millisecond)} persistent_term:put({'message.dropped', Reason}, Msg);
}), (_Hook, _Arg) ->
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), ok
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session), end),
{ok, [], Session1} = emqx_session:publish(2, Msg, Session),
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{PacketId1 = 1 => ts(millisecond)}}),
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1). begin
Msg1 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload1">>),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(PacketId1, Msg1, Session),
KeyIdentifierInUse = {'message.dropped', emqx_reason_codes:name(?RC_PACKET_IDENTIFIER_IN_USE)},
?assertEqual(Msg1, persistent_term:get(KeyIdentifierInUse)),
persistent_term:erase(KeyIdentifierInUse)
end,
begin
Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>),
{ok, [], Session1} = emqx_session:publish(PacketId2 = 2, Msg2, Session),
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(PacketId2, Msg2, Session1),
KeyMaxExceeded = {'message.dropped', emqx_reason_codes:name(?RC_RECEIVE_MAXIMUM_EXCEEDED)},
?assertEqual(Msg2, persistent_term:get(KeyMaxExceeded)),
persistent_term:erase(KeyMaxExceeded)
end,
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end).
t_is_awaiting_full_false(_) -> t_is_awaiting_full_false(_) ->
Session = session(#{max_awaiting_rel => 0}), Session = session(#{max_awaiting_rel => 0}),