From 9684e79ee0d0aea29cf480cb870d163097d1f24d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 20 Nov 2023 22:54:40 +0700 Subject: [PATCH] fix(sessds): ensure dup flag is on for replayed messages --- apps/emqx/src/emqx_persistent_message_ds_replayer.erl | 3 ++- apps/emqx/test/emqx_persistent_session_SUITE.erl | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 2f5348938..64b9cabb4 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -266,8 +266,9 @@ replay_range( _ -> lists:nthtail(range_size(First, FirstUnacked), Messages) end, + MessagesReplay = [emqx_message:set_flag(dup, true, Msg) || Msg <- MessagesUnacked], %% Asserting that range is consistent with the message storage state. - {Replies, Until} = publish(FirstUnacked, MessagesUnacked), + {Replies, Until} = publish(FirstUnacked, MessagesReplay), %% Again, we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off. Range = Range0#ds_pubrange{iterator = ItNext}, diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 3f4cbcd28..77b625f05 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -675,6 +675,12 @@ t_publish_many_while_client_is_gone_qos1(Config) -> ?assert(NMsgs2 > NPubs2, Msgs2), ?assert(NMsgs2 >= NPubs - NAcked, Msgs2), NSame = NMsgs2 - NPubs2, + ?assert( + lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame)) + ), + ?assertNot( + lists:all(fun(#{dup := Dup}) -> Dup end, lists:nthtail(NSame, Msgs2)) + ), ?assertEqual( [maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)], [maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]