From d875fa49d315c34ee2fa9e0136cafc8a1ba8d163 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 10 Jan 2023 11:57:04 +0300 Subject: [PATCH] feat(ds): Enable periodic iterator refresh This might be helpful during replays taking multiple tens of seconds so that underlying iterators won't hold onto in-memory / on-disk data structures for too long, preventing rocksdb from recycling them. --- .../src/emqx_replay_message_storage.erl | 71 ++++++++++++++++--- .../props/prop_replay_message_storage.erl | 39 ++++++++++ 2 files changed, 100 insertions(+), 10 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index fe0a0e08a..f58c006cd 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -95,10 +95,12 @@ -export([store/5]). -export([make_iterator/3]). +-export([make_iterator/4]). -export([next/1]). -export([preserve_iterator/1]). -export([restore_iterator/2]). +-export([refresh_iterator/1]). %% Debug/troubleshooting: %% Keymappers @@ -159,9 +161,18 @@ topic_bits_per_level := bits_per_level(), %% Maximum granularity of iteration over time. epoch := time(), + cf_options => emqx_replay_local_store:db_cf_options() }. +-type iteration_options() :: #{ + %% Request periodic iterator refresh. + %% This might be helpful during replays taking a lot of time (e.g. tens of seconds). + %% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not + %% the same as 1000 replayed messages. + iterator_refresh => {every, _NumOperations :: pos_integer()} +}. + %% Persistent configuration of the generation, it is used to create db %% record when the database is reopened -record(schema, {keymapper :: keymapper()}). @@ -173,14 +184,16 @@ cf :: rocksdb:cf_handle(), keymapper :: keymapper(), write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), - read_options = [] :: emqx_replay_local_store:db_write_options() + read_options = [] :: emqx_replay_local_store:db_write_options(), + iteration_options = #{} :: iteration_options() }). -record(it, { handle :: rocksdb:itr_handle(), filter :: keyspace_filter(), cursor :: binary() | undefined, - next_action :: {seek, binary()} | next + next_action :: {seek, binary()} | next, + refresh_counter :: {non_neg_integer(), pos_integer()} | undefined }). -record(filter, { @@ -274,41 +287,51 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). -spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> + {ok, iterator()} | {error, _TODO}. +make_iterator(DB, TopicFilter, StartTime) -> + % TODO wire it up somehow to the upper level + make_iterator(DB, TopicFilter, StartTime, DB#db.iteration_options). + +-spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. -make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> +make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> % TODO earliest Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), + RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), {ok, #it{ handle = ITHandle, filter = Filter, - next_action = {seek, InitialSeek} + next_action = {seek, InitialSeek}, + refresh_counter = RefreshCounter }}; Err -> Err end. -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(It = #it{filter = #filter{keymapper = Keymapper}}) -> +next(It0 = #it{filter = #filter{keymapper = Keymapper}}) -> + It = maybe_refresh_iterator(It0), case rocksdb:iterator_move(It#it.handle, It#it.next_action) of % spec says `{ok, Key}` is also possible but the implementation says it's not {ok, Key, Value} -> + % Preserve last seen key in the iterator so it could be restored / refreshed later. + ItNext = It#it{cursor = Key}, Bitstring = extract(Key, Keymapper), case match_next(Bitstring, Value, It#it.filter) of {_Topic, Payload} -> - % Preserve last seen key in the iterator so it could be restored later. - {value, Payload, It#it{cursor = Key, next_action = next}}; + {value, Payload, ItNext#it{next_action = next}}; next -> - next(It#it{next_action = next}); + next(ItNext#it{next_action = next}); NextBitstring when is_integer(NextBitstring) -> NextSeek = combine(NextBitstring, <<>>, Keymapper), - next(It#it{next_action = {seek, NextSeek}}); + next(ItNext#it{next_action = {seek, NextSeek}}); none -> - stop_iteration(It) + stop_iteration(ItNext) end; {error, invalid_iterator} -> stop_iteration(It); @@ -347,6 +370,22 @@ restore_iterator(DB, #{ Err end. +-spec refresh_iterator(iterator()) -> iterator(). +refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) -> + case rocksdb:iterator_refresh(Handle) of + ok when Action =:= next -> + % Now the underlying iterator is invalid, need to seek instead. + It#it{next_action = {seek, successor(Cursor)}}; + ok -> + % Now the underlying iterator is invalid, but will seek soon anyway. + It; + {error, _} -> + % Implementation could in theory return an {error, ...} tuple. + % Supposedly our best bet is to ignore it. + % TODO logging? + It + end. + %%================================================================================ %% Internal exports %%================================================================================ @@ -687,6 +726,18 @@ substring(I, Offset, Size) -> data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). +make_refresh_counter({every, N}) when is_integer(N), N > 0 -> + {0, N}; +make_refresh_counter(undefined) -> + undefined. + +maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) -> + refresh_iterator(It#it{refresh_counter = {0, N}}); +maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) -> + It#it{refresh_counter = {M + 1, N}}; +maybe_refresh_iterator(It = #it{refresh_counter = undefined}) -> + It. + stop_iteration(It) -> ok = rocksdb:iterator_close(It#it.handle), none. diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index 9619c4f05..20c897c2a 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -150,6 +150,41 @@ prop_iterate_eq_iterate_with_preserve_restore() -> ) end). +prop_iterate_eq_iterate_with_refresh() -> + TBPL = [4, 8, 16, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 500 + }, + {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), + ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin + % TODO + % This proptest is also impure, see above. + ok = store_db(DB, Stream), + ?FORALL( + { + {Topic, _}, + Pat, + StartTime, + RefreshEvery + }, + { + nth(Stream), + topic_filter_pattern(), + start_time(), + pos_integer() + }, + ?TIMEOUT(5000, begin + TopicFilter = make_topic_filter(Pat, Topic), + IterationOptions = #{iterator_refresh => {every, RefreshEvery}}, + Iterator = make_iterator(DB, TopicFilter, StartTime, IterationOptions), + Messages = iterate_db(Iterator), + equals(Messages, iterate_db(DB, TopicFilter, StartTime)) + end) + ) + end). + % store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> % MessageID = emqx_guid:gen(), % PublishedAt = ChunkNum, @@ -184,6 +219,10 @@ make_iterator(DB, TopicFilter, StartTime) -> {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime), It. +make_iterator(DB, TopicFilter, StartTime, Options) -> + {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime, Options), + It. + run_iterator_commands([iterate | Rest], It, DB) -> case emqx_replay_message_storage:next(It) of {value, Payload, ItNext} ->