diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 2f5348938..64b9cabb4 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -266,8 +266,9 @@ replay_range( _ -> lists:nthtail(range_size(First, FirstUnacked), Messages) end, + MessagesReplay = [emqx_message:set_flag(dup, true, Msg) || Msg <- MessagesUnacked], %% Asserting that range is consistent with the message storage state. - {Replies, Until} = publish(FirstUnacked, MessagesUnacked), + {Replies, Until} = publish(FirstUnacked, MessagesReplay), %% Again, we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off. Range = Range0#ds_pubrange{iterator = ItNext}, diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 3f4cbcd28..77b625f05 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -675,6 +675,12 @@ t_publish_many_while_client_is_gone_qos1(Config) -> ?assert(NMsgs2 > NPubs2, Msgs2), ?assert(NMsgs2 >= NPubs - NAcked, Msgs2), NSame = NMsgs2 - NPubs2, + ?assert( + lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame)) + ), + ?assertNot( + lists:all(fun(#{dup := Dup}) -> Dup end, lists:nthtail(NSame, Msgs2)) + ), ?assertEqual( [maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)], [maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]