refactor(ds): Move end_of_stream detection logic for delete_next
This commit is contained in:
parent
b565976794
commit
0f2c19b656
|
@ -36,7 +36,7 @@
|
||||||
make_delete_iterator/5,
|
make_delete_iterator/5,
|
||||||
update_iterator/4,
|
update_iterator/4,
|
||||||
next/6,
|
next/6,
|
||||||
delete_next/6,
|
delete_next/7,
|
||||||
|
|
||||||
handle_event/4
|
handle_event/4
|
||||||
]).
|
]).
|
||||||
|
@ -495,14 +495,19 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
|
||||||
rocksdb:iterator_close(ITHandle)
|
rocksdb:iterator_close(ITHandle)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) ->
|
delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now, IsCurrent) ->
|
||||||
%% Compute safe cutoff time.
|
%% Compute safe cutoff time.
|
||||||
%% It's the point in time where the last complete epoch ends, so we need to know
|
%% It's the point in time where the last complete epoch ends, so we need to know
|
||||||
%% the current time to compute it.
|
%% the current time to compute it.
|
||||||
init_counters(),
|
init_counters(),
|
||||||
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
|
SafeCutoffTime = ?EPOCH(Schema, Now) bsl TSOffset,
|
||||||
try
|
try
|
||||||
delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize)
|
case delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) of
|
||||||
|
{ok, _It, 0, 0} when not IsCurrent ->
|
||||||
|
{ok, end_of_stream};
|
||||||
|
Result ->
|
||||||
|
Result
|
||||||
|
end
|
||||||
after
|
after
|
||||||
report_counters(Shard)
|
report_counters(Shard)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -261,6 +261,11 @@
|
||||||
) ->
|
) ->
|
||||||
[_Stream].
|
[_Stream].
|
||||||
|
|
||||||
|
-callback get_delete_streams(
|
||||||
|
shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
|
) ->
|
||||||
|
[_Stream].
|
||||||
|
|
||||||
-callback make_iterator(
|
-callback make_iterator(
|
||||||
shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
|
shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
|
@ -282,9 +287,10 @@
|
||||||
DeleteIterator,
|
DeleteIterator,
|
||||||
emqx_ds:delete_selector(),
|
emqx_ds:delete_selector(),
|
||||||
pos_integer(),
|
pos_integer(),
|
||||||
emqx_ds:time()
|
emqx_ds:time(),
|
||||||
|
_IsCurrentGeneration :: boolean()
|
||||||
) ->
|
) ->
|
||||||
{ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
|
{ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()} | emqx_ds:error(_).
|
||||||
|
|
||||||
-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
|
-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
|
||||||
[CustomEvent].
|
[CustomEvent].
|
||||||
|
@ -511,15 +517,12 @@ delete_next(
|
||||||
) ->
|
) ->
|
||||||
case generation_get(Shard, GenId) of
|
case generation_get(Shard, GenId) of
|
||||||
#{module := Mod, data := GenData} ->
|
#{module := Mod, data := GenData} ->
|
||||||
Current = generation_current(Shard),
|
IsCurrent = GenId =:= generation_current(Shard),
|
||||||
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of
|
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now, IsCurrent) of
|
||||||
{ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
|
|
||||||
%% This is a past generation. Storage layer won't write
|
|
||||||
%% any more messages here. The iterator reached the end:
|
|
||||||
%% the stream has been fully replayed.
|
|
||||||
{ok, end_of_stream};
|
|
||||||
{ok, GenIter, NumDeleted, _IteratedOver} ->
|
{ok, GenIter, NumDeleted, _IteratedOver} ->
|
||||||
{ok, Iter#{?enc := GenIter}, NumDeleted};
|
{ok, Iter#{?enc := GenIter}, NumDeleted};
|
||||||
|
EOS = {ok, end_of_stream} ->
|
||||||
|
EOS;
|
||||||
Error = {error, _} ->
|
Error = {error, _} ->
|
||||||
Error
|
Error
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -39,7 +39,7 @@
|
||||||
make_delete_iterator/5,
|
make_delete_iterator/5,
|
||||||
update_iterator/4,
|
update_iterator/4,
|
||||||
next/6,
|
next/6,
|
||||||
delete_next/6
|
delete_next/7
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
|
@ -169,7 +169,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) ->
|
||||||
{ok, It, lists:reverse(Messages)}
|
{ok, It, lists:reverse(Messages)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
|
delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurrent) ->
|
||||||
#delete_it{
|
#delete_it{
|
||||||
topic_filter = TopicFilter,
|
topic_filter = TopicFilter,
|
||||||
start_time = StartTime,
|
start_time = StartTime,
|
||||||
|
@ -198,7 +198,12 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
|
||||||
),
|
),
|
||||||
rocksdb:iterator_close(ITHandle),
|
rocksdb:iterator_close(ITHandle),
|
||||||
It = It0#delete_it{last_seen_message_key = Key},
|
It = It0#delete_it{last_seen_message_key = Key},
|
||||||
{ok, It, NumDeleted, NumIterated}.
|
case IsCurrent of
|
||||||
|
false when NumDeleted =:= 0, NumIterated =:= 0 ->
|
||||||
|
{ok, end_of_stream};
|
||||||
|
_ ->
|
||||||
|
{ok, It, NumDeleted, NumIterated}
|
||||||
|
end.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
|
Loading…
Reference in New Issue