fix(sessds): fix renew stream logic

This commit is contained in:
Ilya Averyanov 2024-01-31 18:02:29 +03:00
parent 2680fae9b6
commit fe46434687
1 changed files with 12 additions and 7 deletions

View File

@ -238,13 +238,18 @@ remove_fully_replayed_streams(S0) ->
Groups = emqx_persistent_session_ds_state:fold_streams( Groups = emqx_persistent_session_ds_state:fold_streams(
fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) -> fun({SubId, _Stream}, StreamState = #srs{rank_x = RankX, rank_y = RankY}, Acc) ->
Key = {SubId, RankX}, Key = {SubId, RankX},
case is_fully_replayed(CommQos1, CommQos2, StreamState) of case {is_fully_replayed(CommQos1, CommQos2, StreamState), Acc} of
true when is_map_key(Key, Acc) -> {_, #{Key := false}} ->
Acc;
{true, #{Key := {true, RankY}}} ->
Acc;
{true, #{Key := {true, _RankYOther}}} ->
%% assert, should never happen
error(multiple_rank_y_for_rank_x);
{true, #{}} ->
Acc#{Key => {true, RankY}}; Acc#{Key => {true, RankY}};
true -> {false, #{}} ->
Acc#{Key => false}; Acc#{Key => false}
_ ->
Acc
end end
end, end,
#{}, #{},
@ -267,7 +272,7 @@ remove_fully_replayed_streams(S0) ->
case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of
undefined -> undefined ->
Acc; Acc;
MinRankY when RankY < MinRankY -> MinRankY when RankY =< MinRankY ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => del_fully_preplayed_stream, msg => del_fully_preplayed_stream,
key => Key, key => Key,