feat(ds): Enable periodic iterator refresh

This might be helpful during replays taking multiple tens of seconds so
that underlying iterators won't hold onto in-memory / on-disk data
structures for too long, preventing rocksdb from recycling them.
This commit is contained in:
Andrew Mayorov 2023-01-10 11:57:04 +03:00 committed by ieQu1
parent 7f408da251
commit d875fa49d3
2 changed files with 100 additions and 10 deletions

View File

@ -95,10 +95,12 @@
-export([store/5]).
-export([make_iterator/3]).
-export([make_iterator/4]).
-export([next/1]).
-export([preserve_iterator/1]).
-export([restore_iterator/2]).
-export([refresh_iterator/1]).
%% Debug/troubleshooting:
%% Keymappers
@ -159,9 +161,18 @@
topic_bits_per_level := bits_per_level(),
%% Maximum granularity of iteration over time.
epoch := time(),
cf_options => emqx_replay_local_store:db_cf_options()
}.
-type iteration_options() :: #{
%% Request periodic iterator refresh.
%% This might be helpful during replays taking a lot of time (e.g. tens of seconds).
%% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not
%% the same as 1000 replayed messages.
iterator_refresh => {every, _NumOperations :: pos_integer()}
}.
%% Persistent configuration of the generation, it is used to create db
%% record when the database is reopened
-record(schema, {keymapper :: keymapper()}).
@ -173,14 +184,16 @@
cf :: rocksdb:cf_handle(),
keymapper :: keymapper(),
write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(),
read_options = [] :: emqx_replay_local_store:db_write_options()
read_options = [] :: emqx_replay_local_store:db_write_options(),
iteration_options = #{} :: iteration_options()
}).
-record(it, {
handle :: rocksdb:itr_handle(),
filter :: keyspace_filter(),
cursor :: binary() | undefined,
next_action :: {seek, binary()} | next
next_action :: {seek, binary()} | next,
refresh_counter :: {non_neg_integer(), pos_integer()} | undefined
}).
-record(filter, {
@ -274,41 +287,51 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-spec make_iterator(db(), emqx_topic:words(), time() | earliest) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(DB, TopicFilter, StartTime) ->
% TODO wire it up somehow to the upper level
make_iterator(DB, TopicFilter, StartTime, DB#db.iteration_options).
-spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time
% and call it a day: client violated the contract anyway.
{ok, iterator()} | {error, _TODO}.
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) ->
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
{ok, ITHandle} ->
% TODO earliest
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
{ok, #it{
handle = ITHandle,
filter = Filter,
next_action = {seek, InitialSeek}
next_action = {seek, InitialSeek},
refresh_counter = RefreshCounter
}};
Err ->
Err
end.
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(It = #it{filter = #filter{keymapper = Keymapper}}) ->
next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
It = maybe_refresh_iterator(It0),
case rocksdb:iterator_move(It#it.handle, It#it.next_action) of
% spec says `{ok, Key}` is also possible but the implementation says it's not
{ok, Key, Value} ->
% Preserve last seen key in the iterator so it could be restored / refreshed later.
ItNext = It#it{cursor = Key},
Bitstring = extract(Key, Keymapper),
case match_next(Bitstring, Value, It#it.filter) of
{_Topic, Payload} ->
% Preserve last seen key in the iterator so it could be restored later.
{value, Payload, It#it{cursor = Key, next_action = next}};
{value, Payload, ItNext#it{next_action = next}};
next ->
next(It#it{next_action = next});
next(ItNext#it{next_action = next});
NextBitstring when is_integer(NextBitstring) ->
NextSeek = combine(NextBitstring, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}});
next(ItNext#it{next_action = {seek, NextSeek}});
none ->
stop_iteration(It)
stop_iteration(ItNext)
end;
{error, invalid_iterator} ->
stop_iteration(It);
@ -347,6 +370,22 @@ restore_iterator(DB, #{
Err
end.
-spec refresh_iterator(iterator()) -> iterator().
refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) ->
case rocksdb:iterator_refresh(Handle) of
ok when Action =:= next ->
% Now the underlying iterator is invalid, need to seek instead.
It#it{next_action = {seek, successor(Cursor)}};
ok ->
% Now the underlying iterator is invalid, but will seek soon anyway.
It;
{error, _} ->
% Implementation could in theory return an {error, ...} tuple.
% Supposedly our best bet is to ignore it.
% TODO logging?
It
end.
%%================================================================================
%% Internal exports
%%================================================================================
@ -687,6 +726,18 @@ substring(I, Offset, Size) ->
data_cf(GenId) ->
?MODULE_STRING ++ integer_to_list(GenId).
make_refresh_counter({every, N}) when is_integer(N), N > 0 ->
{0, N};
make_refresh_counter(undefined) ->
undefined.
maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) ->
refresh_iterator(It#it{refresh_counter = {0, N}});
maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) ->
It#it{refresh_counter = {M + 1, N}};
maybe_refresh_iterator(It = #it{refresh_counter = undefined}) ->
It.
stop_iteration(It) ->
ok = rocksdb:iterator_close(It#it.handle),
none.

View File

@ -150,6 +150,41 @@ prop_iterate_eq_iterate_with_preserve_restore() ->
)
end).
prop_iterate_eq_iterate_with_refresh() ->
TBPL = [4, 8, 16, 12],
Options = #{
timestamp_bits => 32,
topic_bits_per_level => TBPL,
epoch => 500
},
{DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
% TODO
% This proptest is also impure, see above.
ok = store_db(DB, Stream),
?FORALL(
{
{Topic, _},
Pat,
StartTime,
RefreshEvery
},
{
nth(Stream),
topic_filter_pattern(),
start_time(),
pos_integer()
},
?TIMEOUT(5000, begin
TopicFilter = make_topic_filter(Pat, Topic),
IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
Iterator = make_iterator(DB, TopicFilter, StartTime, IterationOptions),
Messages = iterate_db(Iterator),
equals(Messages, iterate_db(DB, TopicFilter, StartTime))
end)
)
end).
% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
% MessageID = emqx_guid:gen(),
% PublishedAt = ChunkNum,
@ -184,6 +219,10 @@ make_iterator(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime),
It.
make_iterator(DB, TopicFilter, StartTime, Options) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime, Options),
It.
run_iterator_commands([iterate | Rest], It, DB) ->
case emqx_replay_message_storage:next(It) of
{value, Payload, ItNext} ->