From 9e5e7a23c5cfb20af003b1319753335acdec790f Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 9 Jul 2024 14:14:22 +0300 Subject: [PATCH] feat(queue): remove unnecessary acked flag --- ...emqx_persistent_session_ds_shared_subs.erl | 45 +++++++------------ .../src/emqx_ds_shared_sub_leader.erl | 6 +-- 2 files changed, 17 insertions(+), 34 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index a4cc97c87..6cf5cc40b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -68,14 +68,7 @@ -type progress() :: #{ - acked := true, iterator := emqx_ds:iterator() - } - | #{ - acked := false, - iterator := emqx_ds:iterator(), - qos1_acked := boolean(), - qos2_acked := boolean() }. -type scheduled_action() :: #{ @@ -626,28 +619,22 @@ stream_progress( ) -> Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1), Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2), - case is_stream_fully_acked(CommQos1, CommQos2, SRS) of - true -> - #{ - stream => Stream, - progress => #{ - acked => true, - iterator => EndIt - }, - use_finished => is_use_finished(SRS) - }; - false -> - #{ - stream => Stream, - progress => #{ - acked => true, - iterator => emqx_ds_skipping_iterator:update_or_new( - BeginIt, Qos1Acked, Qos2Acked - ) - }, - use_finished => is_use_finished(SRS) - } - end. + Iterator = + case is_stream_fully_acked(CommQos1, CommQos2, SRS) of + true -> + EndIt; + false -> + emqx_ds_skipping_iterator:update_or_new( + BeginIt, Qos1Acked, Qos2Acked + ) + end, + #{ + stream => Stream, + progress => #{ + iterator => Iterator + }, + use_finished => is_use_finished(SRS) + }. fold_shared_subs(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions( diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 976ce2437..e98c74b27 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -314,8 +314,7 @@ update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) -> ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), Progress = #{ - iterator => It, - acked => true + iterator => It }, { NewStreamStatesAcc#{Stream => #{progress => Progress, rank => Rank}}, @@ -708,9 +707,6 @@ clean_revoked_streams( ( #{ stream := Stream, - progress := #{ - acked := true - }, use_finished := true } ) ->