feat: add an ability to preserve and restore iterators

This will allow to persist iteration state and to periodically recreate
iterators during long replays.
This commit is contained in:
Andrew Mayorov 2023-01-05 22:48:10 +03:00
parent d6ee23e5b3
commit 7fd14fb404
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 43 additions and 1 deletions

View File

@ -97,6 +97,9 @@
-export([make_iterator/3]). -export([make_iterator/3]).
-export([next/1]). -export([next/1]).
-export([preserve_iterator/1]).
-export([restore_iterator/2]).
%% Debug/troubleshooting: %% Debug/troubleshooting:
%% Keymappers %% Keymappers
-export([ -export([
@ -168,12 +171,14 @@
-record(it, { -record(it, {
handle :: rocksdb:itr_handle(), handle :: rocksdb:itr_handle(),
filter :: keyspace_filter(), filter :: keyspace_filter(),
cursor :: binary() | undefined,
next_action :: {seek, binary()} | next next_action :: {seek, binary()} | next
}). }).
-record(filter, { -record(filter, {
keymapper :: keymapper(), keymapper :: keymapper(),
topic_filter :: emqx_topic:words(), topic_filter :: emqx_topic:words(),
start_time :: integer(),
hash_bitfilter :: integer(), hash_bitfilter :: integer(),
hash_bitmask :: integer(), hash_bitmask :: integer(),
time_bitfilter :: integer(), time_bitfilter :: integer(),
@ -287,7 +292,8 @@ next(It = #it{filter = #filter{keymapper = Keymapper}}) ->
Bitstring = extract(Key, Keymapper), Bitstring = extract(Key, Keymapper),
case match_next(Bitstring, Value, It#it.filter) of case match_next(Bitstring, Value, It#it.filter) of
{_Topic, Payload} -> {_Topic, Payload} ->
{value, Payload, It#it{next_action = next}}; % Preserve last seen key in the iterator so it could be restored later.
{value, Payload, It#it{cursor = Key, next_action = next}};
next -> next ->
next(It#it{next_action = next}); next(It#it{next_action = next});
NextBitstring when is_integer(NextBitstring) -> NextBitstring when is_integer(NextBitstring) ->
@ -302,6 +308,37 @@ next(It = #it{filter = #filter{keymapper = Keymapper}}) ->
{error, closed} {error, closed}
end. end.
-spec preserve_iterator(iterator()) -> binary().
preserve_iterator(#it{cursor = Cursor, filter = Filter}) ->
State = #{
v => 1,
cursor => Cursor,
filter => Filter#filter.topic_filter,
stime => Filter#filter.start_time
},
term_to_binary(State).
-spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}.
restore_iterator(DB, Serial) when is_binary(Serial) ->
State = binary_to_term(Serial),
restore_iterator(DB, State);
restore_iterator(DB, #{
v := 1,
cursor := Cursor,
filter := TopicFilter,
stime := StartTime
}) ->
case make_iterator(DB, TopicFilter, StartTime) of
{ok, It} when Cursor == undefined ->
% Iterator was preserved right after it has been made.
{ok, It};
{ok, It} ->
% Iterator was preserved mid-replay, seek right past the last seen key.
{ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}};
Err ->
Err
end.
%%================================================================================ %%================================================================================
%% Internal exports %% Internal exports
%%================================================================================ %%================================================================================
@ -365,6 +402,7 @@ make_keyspace_filter(TopicFilter, StartTime, Keymapper) ->
#filter{ #filter{
keymapper = Keymapper, keymapper = Keymapper,
topic_filter = TopicFilter, topic_filter = TopicFilter,
start_time = StartTime,
hash_bitfilter = HashBitfilter, hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask, hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter, time_bitfilter = TimeBitfilter,
@ -437,6 +475,10 @@ bitwise_concat(Acc, Item, ItemSize) ->
ones(Bits) -> ones(Bits) ->
1 bsl Bits - 1. 1 bsl Bits - 1.
-spec successor(key()) -> key().
successor(Key) ->
<<Key/binary, 0:8>>.
%% |123|345|678| %% |123|345|678|
%% foo bar baz %% foo bar baz