diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 7535e1a61..a4cc97c87 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -341,7 +341,11 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) -> end. accept_stream( - #{topic_filter := TopicFilter, stream := Stream, progress := Progress} = _Event, + #{ + topic_filter := TopicFilter, + stream := Stream, + progress := #{iterator := Iterator} = _Progress + } = _Event, S0 ) -> case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of @@ -361,7 +365,6 @@ accept_stream( end, case NeedCreateStream of true -> - Iterator = rewind_iterator(Progress), NewSRS = #srs{ rank_x = ?rank_x, @@ -377,52 +380,6 @@ accept_stream( end end. -%% Skip acked messages. -%% This may be a bit inefficient, and it is unclear how to handle errors. -%% -%% A better variant would be to wrap the iterator on `emqx_ds` level in a new one, -%% that will skip acked messages internally in `emqx_ds:next` function. -%% Unluckily, emqx_ds does not have a wrapping structure around iterators of -%% the underlying levels, so we cannot wrap it without a risk of confusion. - -rewind_iterator(#{iterator := Iterator, acked := true}) -> - Iterator; -rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := 0, qos2_acked := 0}) -> - Iterator0; -%% This should not happen, means the DS is consistent -rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := Q1, qos2_acked := Q2}) when - Q1 < 0 orelse Q2 < 0 --> - Iterator0; -rewind_iterator( - #{iterator := Iterator0, acked := false, qos1_acked := Q1Old, qos2_acked := Q2Old} = Progress -) -> - case emqx_ds:next(?PERSISTENT_MESSAGE_DB, Iterator0, Q1Old + Q2Old) of - {ok, Iterator1, Messages} -> - {Q1New, Q2New} = update_qos_acked(Q1Old, Q2Old, Messages), - rewind_iterator(Progress#{ - iterator => Iterator1, qos1_acked => Q1New, qos2_acked => Q2New - }); - {ok, end_of_stream} -> - end_of_stream; - {error, _, _} -> - %% What to do here? - %% In the wrapping variant we do not have this problem. - Iterator0 - end. - -update_qos_acked(Q1, Q2, []) -> - {Q1, Q2}; -update_qos_acked(Q1, Q2, [{_Key, Message} | Messages]) -> - case emqx_message:qos(Message) of - ?QOS_1 -> - update_qos_acked(Q1 - 1, Q2, Messages); - ?QOS_2 -> - update_qos_acked(Q1, Q2 - 1, Messages); - _ -> - update_qos_acked(Q1, Q2, Messages) - end. - revoke_stream( #{topic_filter := TopicFilter, stream := Stream}, S0 ) -> @@ -667,8 +624,8 @@ stream_progress( first_seqno_qos2 = StartQos2 } = SRS ) -> - Qos1Acked = seqno_diff(?QOS_1, CommQos1, StartQos1), - Qos2Acked = seqno_diff(?QOS_2, CommQos2, StartQos2), + Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1), + Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2), case is_stream_fully_acked(CommQos1, CommQos2, SRS) of true -> #{ @@ -683,10 +640,10 @@ stream_progress( #{ stream => Stream, progress => #{ - acked => false, - iterator => BeginIt, - qos1_acked => Qos1Acked, - qos2_acked => Qos2Acked + acked => true, + iterator => emqx_ds_skipping_iterator:update_or_new( + BeginIt, Qos1Acked, Qos2Acked + ) }, use_finished => is_use_finished(SRS) } @@ -753,6 +710,9 @@ is_stream_fully_acked(_, _, #srs{ is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> (Comm1 >= S1) andalso (Comm2 >= S2). +n_acked(Qos, A, B) -> + max(seqno_diff(Qos, A, B), 0). + -dialyzer({nowarn_function, seqno_diff/3}). seqno_diff(?QOS_1, A, B) -> %% For QoS1 messages we skip a seqno every time the epoch changes, diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 69de92325..6aaba205d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -401,10 +401,14 @@ make_iterator(DB, Stream, TopicFilter, StartTime) -> -spec update_iterator(db(), iterator(), message_key()) -> make_iterator_result(). +update_iterator(DB, ?skipping_iterator_match = OldIter, DSKey) -> + emqx_ds_skipping_iterator:update_iterator(DB, OldIter, DSKey); update_iterator(DB, OldIter, DSKey) -> ?module(DB):update_iterator(DB, OldIter, DSKey). -spec next(db(), iterator(), pos_integer()) -> next_result(). +next(DB, ?skipping_iterator_match = Iter, BatchSize) -> + emqx_ds_skipping_iterator:next(DB, Iter, BatchSize); next(DB, Iter, BatchSize) -> ?module(DB):next(DB, Iter, BatchSize). diff --git a/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl b/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl new file mode 100644 index 000000000..67d871e8a --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl @@ -0,0 +1,87 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_skipping_iterator). + +-include("emqx_ds_skipping_iterator.hrl"). +-include("emqx/include/emqx_mqtt.hrl"). + +-type t() :: ?skipping_iterator(emqx_ds:iterator(), non_neg_integer(), non_neg_integer()). + +-export([ + update_or_new/3, + update_iterator/3, + next/3 +]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec update_or_new(t() | emqx_ds:iterator(), non_neg_integer(), non_neg_integer()) -> t(). +update_or_new(?skipping_iterator_match(Iterator, Q1Skip0, Q2Skip0), Q1Skip, Q2Skip) when + Q1Skip >= 0 andalso Q2Skip >= 0 +-> + ?skipping_iterator(Iterator, Q1Skip0 + Q1Skip, Q2Skip0 + Q2Skip); +update_or_new(Iterator, Q1Skip, Q2Skip) when Q1Skip >= 0 andalso Q2Skip >= 0 -> + ?skipping_iterator(Iterator, Q1Skip, Q2Skip). + +-spec next(emqx_ds:db(), t(), pos_integer()) -> emqx_ds:next_result(t()). +next(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Count) -> + case emqx_ds:next(DB, Iterator0, Count) of + {error, _, _} = Error -> + Error; + {ok, end_of_stream} -> + {ok, end_of_stream}; + {ok, Iterator1, Messages0} -> + {Messages1, Q1Skip1, Q2Skip1} = skip(Messages0, Q1Skip0, Q2Skip0), + case {Q1Skip1, Q2Skip1} of + {0, 0} -> {ok, Iterator1, Messages1}; + _ -> {ok, ?skipping_iterator(Iterator1, Q1Skip1, Q2Skip1)} + end + end. + +-spec update_iterator(emqx_ds:db(), emqx_ds:iterator(), emqx_ds:message_key()) -> + emqx_ds:make_iterator_result(). +update_iterator(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Key) -> + case emqx_ds:update_iterator(DB, Iterator0, Key) of + {error, _, _} = Error -> Error; + {ok, Iterator1} -> {ok, ?skipping_iterator(Iterator1, Q1Skip0, Q2Skip0)} + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +skip(Messages, Q1Skip, Q2Skip) -> + skip(Messages, Q1Skip, Q2Skip, []). + +skip([], Q1Skip, Q2Skip, Agg) -> + {lists:reverse(Agg), Q1Skip, Q2Skip}; +skip([{Key, Message} | Messages], Q1Skip, Q2Skip, Agg) -> + Qos = emqx_message:qos(Message), + skip({Key, Message}, Qos, Messages, Q1Skip, Q2Skip, Agg). + +skip(_KeyMessage, ?QOS_0, Messages, Q1Skip, Q2Skip, Agg) -> + skip(Messages, Q1Skip, Q2Skip, Agg); +skip(_KeyMessage, ?QOS_1, Messages, Q1Skip, Q2Skip, Agg) when Q1Skip > 0 -> + skip(Messages, Q1Skip - 1, Q2Skip, Agg); +skip(KeyMessage, ?QOS_1, Messages, 0, Q2Skip, Agg) -> + skip(Messages, 0, Q2Skip, [KeyMessage | Agg]); +skip(_KeyMessage, ?QOS_2, Messages, Q1Skip, Q2Skip, Agg) when Q2Skip > 0 -> + skip(Messages, Q1Skip, Q2Skip - 1, Agg); +skip(KeyMessage, ?QOS_2, Messages, Q1Skip, 0, Agg) -> + skip(Messages, Q1Skip, 0, [KeyMessage | Agg]). diff --git a/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl b/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl new file mode 100644 index 000000000..2c0999fcc --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(tag, 1). +-define(it, 2). +-define(qos1_skip, 3). +-define(qos2_skip, 4). + +-define(IT, -1000). + +-define(skipping_iterator_match, #{?tag := ?IT}). + +-define(skipping_iterator_match(Iterator, Q1Skip, Q2Skip), #{ + ?tag := ?IT, ?it := Iterator, ?qos1_skip := Q1Skip, ?qos2_skip := Q2Skip +}). + +-define(skipping_iterator(Iterator, Q1Skip, Q2Skip), #{ + ?tag => ?IT, ?it => Iterator, ?qos1_skip => Q1Skip, ?qos2_skip => Q2Skip +}).