fix: run `message.dropped` hook, inc `messages.dropped` metrics
- when awaiting_rel full - packet identifier in use (QoS2 packet resend)
This commit is contained in:
parent
c021443f6e
commit
458101958b
|
@ -4,4 +4,6 @@
|
||||||
|
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
|
||||||
- Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx-enterprise/pull/9474).
|
- Fixed load bootstrap file when no bootstrap user in `mqtt_app` [#9474](https://github.com/emqx/emqx/pull/9474).
|
||||||
|
|
||||||
|
- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9486](https://github.com/emqx/emqx/pull/9486).
|
||||||
|
|
|
@ -5,4 +5,6 @@
|
||||||
|
|
||||||
### 修复
|
### 修复
|
||||||
|
|
||||||
- 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx-enterprise/pull/9474).
|
- 修复 mqtt_app 表内没有 boostrap user 里未导入用户的问题 [#9474](https://github.com/emqx/emqx/pull/9474).
|
||||||
|
|
||||||
|
- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9486](https://github.com/emqx/emqx-enterprise/pull/9486)。
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.10",
|
{"4.4.10",
|
||||||
[{add_module,emqx_cover},
|
[{add_module,emqx_cover},
|
||||||
|
@ -19,6 +20,7 @@
|
||||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -380,6 +382,7 @@
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_cover}]},
|
{delete_module,emqx_cover}]},
|
||||||
{"4.4.10",
|
{"4.4.10",
|
||||||
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||||
|
@ -389,6 +392,7 @@
|
||||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -634,8 +634,6 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||||
ok = emqx_metrics:inc('packets.publish.inuse'),
|
ok = emqx_metrics:inc('packets.publish.inuse'),
|
||||||
handle_out(pubrec, {PacketId, RC}, Channel);
|
handle_out(pubrec, {PacketId, RC}, Channel);
|
||||||
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
||||||
?LOG(warning, "Dropped the qos2 packet ~w "
|
|
||||||
"due to awaiting_rel is full.", [PacketId]),
|
|
||||||
ok = emqx_metrics:inc('packets.publish.dropped'),
|
ok = emqx_metrics:inc('packets.publish.dropped'),
|
||||||
handle_out(disconnect, RC, Channel)
|
handle_out(disconnect, RC, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -325,15 +325,25 @@ publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
|
||||||
AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
|
AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
|
||||||
{ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
|
{ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
|
||||||
true ->
|
true ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
?LOG(warning, "Dropped the qos2 packet ~w "
|
||||||
|
"due to packet ID in use.", [PacketId]),
|
||||||
|
drop_qos2_msg(Msg, ?RC_PACKET_IDENTIFIER_IN_USE)
|
||||||
end;
|
end;
|
||||||
true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
|
true ->
|
||||||
|
?LOG(warning, "Dropped the qos2 packet ~w "
|
||||||
|
"due to awaiting_rel is full.", [PacketId]),
|
||||||
|
drop_qos2_msg(Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% Publish QoS0/1 directly
|
%% Publish QoS0/1 directly
|
||||||
publish(_PacketId, Msg, Session) ->
|
publish(_PacketId, Msg, Session) ->
|
||||||
{ok, emqx_broker:publish(Msg), Session}.
|
{ok, emqx_broker:publish(Msg), Session}.
|
||||||
|
|
||||||
|
drop_qos2_msg(Msg, ReasonCode) ->
|
||||||
|
ok = emqx_metrics:inc('messages.dropped'),
|
||||||
|
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(ReasonCode)]),
|
||||||
|
{error, ReasonCode}.
|
||||||
|
|
||||||
-compile({inline, [is_awaiting_full/1]}).
|
-compile({inline, [is_awaiting_full/1]}).
|
||||||
is_awaiting_full(#session{max_awaiting_rel = 0}) ->
|
is_awaiting_full(#session{max_awaiting_rel = 0}) ->
|
||||||
false;
|
false;
|
||||||
|
|
Loading…
Reference in New Issue