feat(queue): remove unnecessary acked flag

This commit is contained in:
Ilya Averyanov 2024-07-09 14:14:22 +03:00
parent 143086b0ef
commit 9e5e7a23c5
2 changed files with 17 additions and 34 deletions

View File

@ -68,14 +68,7 @@
-type progress() :: -type progress() ::
#{ #{
acked := true,
iterator := emqx_ds:iterator() iterator := emqx_ds:iterator()
}
| #{
acked := false,
iterator := emqx_ds:iterator(),
qos1_acked := boolean(),
qos2_acked := boolean()
}. }.
-type scheduled_action() :: #{ -type scheduled_action() :: #{
@ -626,28 +619,22 @@ stream_progress(
) -> ) ->
Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1), Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1),
Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2), Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2),
Iterator =
case is_stream_fully_acked(CommQos1, CommQos2, SRS) of case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
true -> true ->
#{ EndIt;
stream => Stream,
progress => #{
acked => true,
iterator => EndIt
},
use_finished => is_use_finished(SRS)
};
false -> false ->
#{ emqx_ds_skipping_iterator:update_or_new(
stream => Stream,
progress => #{
acked => true,
iterator => emqx_ds_skipping_iterator:update_or_new(
BeginIt, Qos1Acked, Qos2Acked BeginIt, Qos1Acked, Qos2Acked
) )
end,
#{
stream => Stream,
progress => #{
iterator => Iterator
}, },
use_finished => is_use_finished(SRS) use_finished => is_use_finished(SRS)
} }.
end.
fold_shared_subs(Fun, Acc, S) -> fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions( emqx_persistent_session_ds_state:fold_subscriptions(

View File

@ -314,8 +314,7 @@ update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) ->
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
), ),
Progress = #{ Progress = #{
iterator => It, iterator => It
acked => true
}, },
{ {
NewStreamStatesAcc#{Stream => #{progress => Progress, rank => Rank}}, NewStreamStatesAcc#{Stream => #{progress => Progress, rank => Rank}},
@ -708,9 +707,6 @@ clean_revoked_streams(
( (
#{ #{
stream := Stream, stream := Stream,
progress := #{
acked := true
},
use_finished := true use_finished := true
} }
) -> ) ->