diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9f9f28676..c79d60d07 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -491,7 +491,7 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, ?IF_STORAGE_RUNNING( ShardId, - emqx_ds_storage_layer:next(ShardId, Iter, BatchSize) + emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us()) ). -spec do_delete_next_v4( @@ -503,7 +503,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ) -> emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> - emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize). + emqx_ds_storage_layer:delete_next( + {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us() + ). -spec do_add_generation_v2(emqx_ds:db()) -> no_return(). do_add_generation_v2(_DB) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 80264da79..5947b2300 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -34,8 +34,8 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5, + next/5, + delete_next/6, post_creation_actions/1 ]). @@ -354,8 +354,9 @@ update_iterator( next( Shard, Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, - It = #{?topic_filter := TF, ?storage_key := Stream}, - BatchSize + It = #{?storage_key := Stream}, + BatchSize, + Now ) -> init_counters(), %% Compute safe cutoff time. It's the point in time where the last @@ -370,7 +371,6 @@ next( SafeCutoffTime = case IsWildcard of true -> - Now = emqx_ds:timestamp_us(), (Now bsr TSOffset) bsl TSOffset; false -> %% Iterators scanning streams without varying topic @@ -415,12 +415,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, rocksdb:iterator_close(ITHandle) end. -delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. init_counters(), - Now = emqx_message:timestamp_now(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, try delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 4981c3fc1..36dc813e5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -31,8 +31,8 @@ make_iterator/4, make_delete_iterator/4, update_iterator/3, - next/3, - delete_next/4, + next/4, + delete_next/5, %% Generations update_config/3, @@ -223,9 +223,14 @@ ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer()) -> +-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) -> {ok, Iter, [emqx_types:message()]} | {error, _}. +-callback delete_next( + shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> + {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. + -callback post_creation_actions(post_creation_context()) -> _Data. -optional_callbacks([post_creation_actions/1]). @@ -377,13 +382,13 @@ update_iterator( {error, unrecoverable, generation_not_found} end. --spec next(shard_id(), iterator(), pos_integer()) -> +-spec next(shard_id(), iterator(), pos_integer(), emqx_ds:time()) -> emqx_ds:next_result(iterator()). -next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> +next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:next(Shard, GenData, GenIter0, BatchSize) of + case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of {ok, _GenIter, []} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: @@ -399,18 +404,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch {error, unrecoverable, generation_not_found} end. --spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> +-spec delete_next( + shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> emqx_ds:delete_next_result(delete_iterator()). delete_next( Shard, Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0}, Selector, - BatchSize + BatchSize, + Now ) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of + case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 7aa54b9f3..3caf2c732 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -37,8 +37,8 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5 + next/5, + delete_next/6 ]). %% internal exports: @@ -154,7 +154,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) -> last_seen_message_key = DSKey }}. -next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> +next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), Action = @@ -170,7 +170,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> It = It0#it{last_seen_message_key = Key}, {ok, It, lists:reverse(Messages)}. -delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) -> +delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> #delete_it{ topic_filter = TopicFilter, start_time = StartTime, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 78838e675..bb6d0f917 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -73,13 +73,15 @@ t_iterate(_Config) -> begin [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), - {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100), + {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next( + ?SHARD, It, 100, emqx_ds:timestamp_us() + ), Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys], ?assertEqual( lists:map(fun integer_to_binary/1, Timestamps), payloads(Messages) ), - {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100) + {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100, emqx_ds:timestamp_us()) end || Topic <- Topics ], @@ -370,7 +372,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> F(It, 0) -> error({too_many_iterations, It}); F(It, N) -> - case emqx_ds_storage_layer:next(Shard, It, BatchSize) of + case emqx_ds_storage_layer:next(Shard, It, BatchSize, emqx_ds:timestamp_us()) of end_of_stream -> []; {ok, _NextIt, []} -> @@ -542,7 +544,11 @@ delete(_Shard, [], _Selector) -> delete(Shard, Iterators, Selector) -> {NewIterators0, N} = lists:foldl( fun(Iterator0, {AccIterators, NAcc}) -> - case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of + case + emqx_ds_storage_layer:delete_next( + Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us() + ) + of {ok, end_of_stream} -> {AccIterators, NAcc}; {ok, _Iterator1, 0} -> @@ -573,7 +579,7 @@ replay(_Shard, []) -> replay(Shard, Iterators) -> {NewIterators0, Messages0} = lists:foldl( fun(Iterator0, {AccIterators, AccMessages}) -> - case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of + case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of {ok, end_of_stream} -> {AccIterators, AccMessages}; {ok, _Iterator1, []} -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index be4f7bcdf..f54752230 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -85,8 +85,14 @@ consume_stream(DB, Stream, TopicFilter, StartTime) -> consume_iter(DB, It) -> consume_iter(DB, It, #{}). -consume_iter(DB, It, Opts) -> - consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts). +consume_iter(DB, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds:next(DB, It, BatchSize) + end, + It0, + Opts + ). storage_consume(ShardId, TopicFilter) -> storage_consume(ShardId, TopicFilter, 0). @@ -108,16 +114,22 @@ storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) -> storage_consume_iter(ShardId, It) -> storage_consume_iter(ShardId, It, #{}). -storage_consume_iter(ShardId, It, Opts) -> - consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts). +storage_consume_iter(ShardId, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds_storage_layer:next(ShardId, It, BatchSize, emqx_ds:timestamp_us()) + end, + It0, + Opts + ). -consume_iter_with(NextFun, Args, It0, Opts) -> +consume_iter_with(NextFun, It0, Opts) -> BatchSize = maps:get(batch_size, Opts, 5), - case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of + case NextFun(It0, BatchSize) of {ok, It, _Msgs = []} -> {ok, It, []}; {ok, It1, Batch} -> - {ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts), + {ok, It, Msgs} = consume_iter_with(NextFun, It1, Opts), {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs}; {ok, Eos = end_of_stream} -> {ok, Eos, []};