diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index a4cc97c87..6cf5cc40b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -68,14 +68,7 @@ -type progress() :: #{ - acked := true, iterator := emqx_ds:iterator() - } - | #{ - acked := false, - iterator := emqx_ds:iterator(), - qos1_acked := boolean(), - qos2_acked := boolean() }. -type scheduled_action() :: #{ @@ -626,28 +619,22 @@ stream_progress( ) -> Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1), Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2), - case is_stream_fully_acked(CommQos1, CommQos2, SRS) of - true -> - #{ - stream => Stream, - progress => #{ - acked => true, - iterator => EndIt - }, - use_finished => is_use_finished(SRS) - }; - false -> - #{ - stream => Stream, - progress => #{ - acked => true, - iterator => emqx_ds_skipping_iterator:update_or_new( - BeginIt, Qos1Acked, Qos2Acked - ) - }, - use_finished => is_use_finished(SRS) - } - end. + Iterator = + case is_stream_fully_acked(CommQos1, CommQos2, SRS) of + true -> + EndIt; + false -> + emqx_ds_skipping_iterator:update_or_new( + BeginIt, Qos1Acked, Qos2Acked + ) + end, + #{ + stream => Stream, + progress => #{ + iterator => Iterator + }, + use_finished => is_use_finished(SRS) + }. fold_shared_subs(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions( diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 976ce2437..e98c74b27 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -314,8 +314,7 @@ update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) -> ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), Progress = #{ - iterator => It, - acked => true + iterator => It }, { NewStreamStatesAcc#{Stream => #{progress => Progress, rank => Rank}}, @@ -708,9 +707,6 @@ clean_revoked_streams( ( #{ stream := Stream, - progress := #{ - acked := true - }, use_finished := true } ) ->