From fe46434687ddf3508ab687f8d14c4cde13d466ff Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 31 Jan 2024 18:02:29 +0300 Subject: [PATCH] fix(sessds): fix renew stream logic --- ...persistent_session_ds_stream_scheduler.erl | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 475f9f9fb..315fcbc78 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -238,13 +238,18 @@ remove_fully_replayed_streams(S0) -> Groups = emqx_persistent_session_ds_state:fold_streams( fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) -> Key = {SubId, RankX}, - case is_fully_replayed(CommQos1, CommQos2, StreamState) of - true when is_map_key(Key, Acc) -> + case {is_fully_replayed(CommQos1, CommQos2, StreamState), Acc} of + {_, #{Key := false}} -> + Acc; + {true, #{Key := {true, RankY}}} -> + Acc; + {true, #{Key := {true, _RankYOther}}} -> + %% assert, should never happen + error(multiple_rank_y_for_rank_x); + {true, #{}} -> Acc#{Key => {true, RankY}}; - true -> - Acc#{Key => false}; - _ -> - Acc + {false, #{}} -> + Acc#{Key => false} end end, #{}, @@ -267,7 +272,7 @@ remove_fully_replayed_streams(S0) -> case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of undefined -> Acc; - MinRankY when RankY < MinRankY -> + MinRankY when RankY =< MinRankY -> ?SLOG(debug, #{ msg => del_fully_preplayed_stream, key => Key,