From e843d9fd919f0a1afb9388000b05be5d3b8e1a1c Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 31 Jan 2024 00:20:54 +0100 Subject: [PATCH] fix(sessds): Stream scheduler must ignore fully replayed streams --- ...x_persistent_session_ds_stream_scheduler.erl | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 03a6fbf80..475f9f9fb 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -89,13 +89,16 @@ find_new_streams(S) -> Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), shuffle( emqx_persistent_session_ds_state:fold_streams( - fun(Key, Stream, Acc) -> - case is_fully_acked(Comm1, Comm2, Stream) of - true -> - [{Key, Stream} | Acc]; - false -> - Acc - end + fun + (_Key, #srs{it_end = end_of_stream}, Acc) -> + Acc; + (Key, Stream, Acc) -> + case is_fully_acked(Comm1, Comm2, Stream) of + true -> + [{Key, Stream} | Acc]; + false -> + Acc + end end, [], S