From a9c55f7568b37577928be7fbd12422cce47ab6c4 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 6 Feb 2024 01:46:11 +0100 Subject: [PATCH] feat(sessds): Consider #srs with only QoS0 messages fully acked --- .../src/emqx_persistent_session_ds_stream_scheduler.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 45bf6ede1..6aa4ab005 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -168,7 +168,6 @@ del_subscription(SubId, S0) -> %%================================================================================ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> - %% TODO: hash collisions Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> @@ -303,6 +302,12 @@ compare_streams( is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) -> It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S). +is_fully_acked(_, _, #srs{ + first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2 +}) -> + %% Streams where the last chunk doesn't contain any QoS1 and 2 + %% messages are considered fully acked: + true; is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> (Comm1 >= S1) andalso (Comm2 >= S2).