From 0f2c19b65661573d44ffc84baa546b97ff946f57 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 1 Jul 2024 00:18:40 +0200 Subject: [PATCH] refactor(ds): Move end_of_stream detection logic for delete_next --- .../src/emqx_ds_storage_bitfield_lts.erl | 13 ++++++++---- .../src/emqx_ds_storage_layer.erl | 21 +++++++++++-------- .../src/emqx_ds_storage_reference.erl | 11 +++++++--- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d5834546f..a161cb8d8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -36,7 +36,7 @@ make_delete_iterator/5, update_iterator/4, next/6, - delete_next/6, + delete_next/7, handle_event/4 ]). @@ -495,14 +495,19 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, rocksdb:iterator_close(ITHandle) end. -delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now, IsCurrent) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. init_counters(), - SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, + SafeCutoffTime = ?EPOCH(Schema, Now) bsl TSOffset, try - delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) + case delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) of + {ok, _It, 0, 0} when not IsCurrent -> + {ok, end_of_stream}; + Result -> + Result + end after report_counters(Shard) end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index fe1d36a35..db3b4e5c3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -261,6 +261,11 @@ ) -> [_Stream]. +-callback get_delete_streams( + shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time() +) -> + [_Stream]. + -callback make_iterator( shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time() ) -> @@ -282,9 +287,10 @@ DeleteIterator, emqx_ds:delete_selector(), pos_integer(), - emqx_ds:time() + emqx_ds:time(), + _IsCurrentGeneration :: boolean() ) -> - {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. + {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()} | emqx_ds:error(_). -callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. @@ -511,15 +517,12 @@ delete_next( ) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> - Current = generation_current(Shard), - case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of - {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> - %% This is a past generation. Storage layer won't write - %% any more messages here. The iterator reached the end: - %% the stream has been fully replayed. - {ok, end_of_stream}; + IsCurrent = GenId =:= generation_current(Shard), + case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now, IsCurrent) of {ok, GenIter, NumDeleted, _IteratedOver} -> {ok, Iter#{?enc := GenIter}, NumDeleted}; + EOS = {ok, end_of_stream} -> + EOS; Error = {error, _} -> Error end; diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index ca29c11a8..cfd6f30ac 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -39,7 +39,7 @@ make_delete_iterator/5, update_iterator/4, next/6, - delete_next/6 + delete_next/7 ]). %% internal exports: @@ -169,7 +169,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) -> {ok, It, lists:reverse(Messages)} end. -delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> +delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurrent) -> #delete_it{ topic_filter = TopicFilter, start_time = StartTime, @@ -198,7 +198,12 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> ), rocksdb:iterator_close(ITHandle), It = It0#delete_it{last_seen_message_key = Key}, - {ok, It, NumDeleted, NumIterated}. + case IsCurrent of + false when NumDeleted =:= 0, NumIterated =:= 0 -> + {ok, end_of_stream}; + _ -> + {ok, It, NumDeleted, NumIterated} + end. %%================================================================================ %% Internal functions