fix(sessds): ensure dup flag is on for replayed messages
This commit is contained in:
parent
ef7cfd0202
commit
9684e79ee0
|
@ -266,8 +266,9 @@ replay_range(
|
||||||
_ ->
|
_ ->
|
||||||
lists:nthtail(range_size(First, FirstUnacked), Messages)
|
lists:nthtail(range_size(First, FirstUnacked), Messages)
|
||||||
end,
|
end,
|
||||||
|
MessagesReplay = [emqx_message:set_flag(dup, true, Msg) || Msg <- MessagesUnacked],
|
||||||
%% Asserting that range is consistent with the message storage state.
|
%% Asserting that range is consistent with the message storage state.
|
||||||
{Replies, Until} = publish(FirstUnacked, MessagesUnacked),
|
{Replies, Until} = publish(FirstUnacked, MessagesReplay),
|
||||||
%% Again, we need to keep the iterator pointing past the end of the
|
%% Again, we need to keep the iterator pointing past the end of the
|
||||||
%% range, so that we can pick up where we left off.
|
%% range, so that we can pick up where we left off.
|
||||||
Range = Range0#ds_pubrange{iterator = ItNext},
|
Range = Range0#ds_pubrange{iterator = ItNext},
|
||||||
|
|
|
@ -675,6 +675,12 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
|
||||||
?assert(NMsgs2 > NPubs2, Msgs2),
|
?assert(NMsgs2 > NPubs2, Msgs2),
|
||||||
?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
|
?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
|
||||||
NSame = NMsgs2 - NPubs2,
|
NSame = NMsgs2 - NPubs2,
|
||||||
|
?assert(
|
||||||
|
lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame))
|
||||||
|
),
|
||||||
|
?assertNot(
|
||||||
|
lists:all(fun(#{dup := Dup}) -> Dup end, lists:nthtail(NSame, Msgs2))
|
||||||
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)],
|
[maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)],
|
||||||
[maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]
|
[maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]
|
||||||
|
|
Loading…
Reference in New Issue