From 2c6a7763184355a679f25be73046bd24282f69c2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 26 Jan 2024 17:39:14 +0100 Subject: [PATCH] fix(sessds): Stricter checks for PacketIds --- apps/emqx/src/emqx_persistent_session_ds.erl | 26 +++++++++---------- .../test/emqx_persistent_messages_SUITE.erl | 2 +- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 9cc3aea94..1c1e78058 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -909,30 +909,28 @@ update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> SeqNo = packet_id_to_seqno(PacketId, S), case Track of puback -> - MinTrack = ?committed(?QOS_1), - MaxTrack = ?next(?QOS_1); + QoS = ?QOS_1, + SeqNoKey = ?committed(?QOS_1); pubrec -> - MinTrack = ?rec, - MaxTrack = ?next(?QOS_2); + QoS = ?QOS_2, + SeqNoKey = ?rec; pubcomp -> - MinTrack = ?committed(?QOS_2), - MaxTrack = ?next(?QOS_2) + QoS = ?QOS_2, + SeqNoKey = ?committed(?QOS_2) end, - Min = emqx_persistent_session_ds_state:get_seqno(MinTrack, S), - Max = emqx_persistent_session_ds_state:get_seqno(MaxTrack, S), - case Min =< SeqNo andalso SeqNo =< Max of - true -> + Current = emqx_persistent_session_ds_state:get_seqno(SeqNoKey, S), + case inc_seqno(QoS, Current) of + SeqNo -> %% TODO: we pass a bogus message into the hook: Msg = emqx_message:make(SessionId, <<>>, <<>>), - {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(MinTrack, SeqNo, S)}}; - false -> + {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S)}}; + Expected -> ?SLOG(warning, #{ msg => "out-of-order_commit", track => Track, packet_id => PacketId, seqno => SeqNo, - min => Min, - max => Max + expected => Expected }), {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 6da60b809..0ca1daa1c 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -495,7 +495,7 @@ consume(It) -> end. receive_messages(Count) -> - receive_messages(Count, 5_000). + receive_messages(Count, 10_000). receive_messages(Count, Timeout) -> lists:reverse(receive_messages(Count, [], Timeout)).