diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index ebbcde17c..8acb6e529 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -35,7 +35,7 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/5, + next/6, delete_next/6, post_creation_actions/1, @@ -424,23 +424,21 @@ next( Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, It = #{?storage_key := Stream}, BatchSize, - Now + Now, + IsCurrent ) -> init_counters(), %% Compute safe cutoff time. It's the point in time where the last %% complete epoch ends, so we need to know the current time to %% compute it. This is needed because new keys can be added before %% the iterator. - IsWildcard = + %% + %% This is needed to avoid situations when the iterator advances + %% to position k1, and then a new message with k2, such that k2 < + %% k1 is inserted. k2 would be missed. + HasCutoff = case Stream of - {_StaticKey, []} -> false; - _ -> true - end, - SafeCutoffTime = - case IsWildcard of - true -> - (Now bsr TSOffset) bsl TSOffset; - false -> + {_StaticKey, []} -> %% Iterators scanning streams without varying topic %% levels can operate on incomplete epochs, since new %% matching keys for the single topic are added in @@ -450,10 +448,27 @@ next( %% filters operating on streams with varying parts: %% iterator can jump to the next topic and then it %% won't backtrack. + false; + _ -> + %% New batches are only added to the current + %% generation. We can ignore cutoff time for old + %% generations: + IsCurrent + end, + SafeCutoffTime = + case HasCutoff of + true -> + (Now bsr TSOffset) bsl TSOffset; + false -> 1 bsl TSBits - 1 end, try - next_until(Schema, It, SafeCutoffTime, BatchSize) + case next_until(Schema, It, SafeCutoffTime, BatchSize) of + {ok, _, []} when not IsCurrent -> + {ok, end_of_stream}; + Result -> + Result + end after report_counters(Shard) end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e93780ba2..71bf6fa6e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -248,8 +248,8 @@ ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) -> - {ok, Iter, [emqx_types:message()]} | {error, _}. +-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) -> + {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. -callback delete_next( shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() @@ -449,15 +449,12 @@ update_iterator( next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> - Current = generation_current(Shard), - case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of - {ok, _GenIter, []} when GenId < Current -> - %% This is a past generation. Storage layer won't write - %% any more messages here. The iterator reached the end: - %% the stream has been fully replayed. - {ok, end_of_stream}; + IsCurrent = GenId =:= generation_current(Shard), + case Mod:next(Shard, GenData, GenIter0, BatchSize, Now, IsCurrent) of {ok, GenIter, Batch} -> {ok, Iter#{?enc := GenIter}, Batch}; + {ok, end_of_stream} -> + {ok, end_of_stream}; Error = {error, _, _} -> Error end; diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 10007488c..1c506390e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -38,7 +38,7 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/5, + next/6, delete_next/6 ]). @@ -148,7 +148,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) -> last_seen_message_key = DSKey }}. -next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> +next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), Action = @@ -162,7 +162,12 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []), rocksdb:iterator_close(ITHandle), It = It0#it{last_seen_message_key = Key}, - {ok, It, lists:reverse(Messages)}. + case Messages of + [] when not IsCurrent -> + {ok, end_of_stream}; + _ -> + {ok, It, lists:reverse(Messages)} + end. delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> #delete_it{