From 7fd14fb404ea7c83385631c700128f07277904eb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 5 Jan 2023 22:48:10 +0300 Subject: [PATCH] feat: add an ability to preserve and restore iterators This will allow to persist iteration state and to periodically recreate iterators during long replays. --- .../src/emqx_replay_message_storage.erl | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 3988e97dc..1c91066cf 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -97,6 +97,9 @@ -export([make_iterator/3]). -export([next/1]). +-export([preserve_iterator/1]). +-export([restore_iterator/2]). + %% Debug/troubleshooting: %% Keymappers -export([ @@ -168,12 +171,14 @@ -record(it, { handle :: rocksdb:itr_handle(), filter :: keyspace_filter(), + cursor :: binary() | undefined, next_action :: {seek, binary()} | next }). -record(filter, { keymapper :: keymapper(), topic_filter :: emqx_topic:words(), + start_time :: integer(), hash_bitfilter :: integer(), hash_bitmask :: integer(), time_bitfilter :: integer(), @@ -287,7 +292,8 @@ next(It = #it{filter = #filter{keymapper = Keymapper}}) -> Bitstring = extract(Key, Keymapper), case match_next(Bitstring, Value, It#it.filter) of {_Topic, Payload} -> - {value, Payload, It#it{next_action = next}}; + % Preserve last seen key in the iterator so it could be restored later. + {value, Payload, It#it{cursor = Key, next_action = next}}; next -> next(It#it{next_action = next}); NextBitstring when is_integer(NextBitstring) -> @@ -302,6 +308,37 @@ next(It = #it{filter = #filter{keymapper = Keymapper}}) -> {error, closed} end. +-spec preserve_iterator(iterator()) -> binary(). +preserve_iterator(#it{cursor = Cursor, filter = Filter}) -> + State = #{ + v => 1, + cursor => Cursor, + filter => Filter#filter.topic_filter, + stime => Filter#filter.start_time + }, + term_to_binary(State). + +-spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}. +restore_iterator(DB, Serial) when is_binary(Serial) -> + State = binary_to_term(Serial), + restore_iterator(DB, State); +restore_iterator(DB, #{ + v := 1, + cursor := Cursor, + filter := TopicFilter, + stime := StartTime +}) -> + case make_iterator(DB, TopicFilter, StartTime) of + {ok, It} when Cursor == undefined -> + % Iterator was preserved right after it has been made. + {ok, It}; + {ok, It} -> + % Iterator was preserved mid-replay, seek right past the last seen key. + {ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}}; + Err -> + Err + end. + %%================================================================================ %% Internal exports %%================================================================================ @@ -365,6 +402,7 @@ make_keyspace_filter(TopicFilter, StartTime, Keymapper) -> #filter{ keymapper = Keymapper, topic_filter = TopicFilter, + start_time = StartTime, hash_bitfilter = HashBitfilter, hash_bitmask = HashBitmask, time_bitfilter = TimeBitfilter, @@ -437,6 +475,10 @@ bitwise_concat(Acc, Item, ItemSize) -> ones(Bits) -> 1 bsl Bits - 1. +-spec successor(key()) -> key(). +successor(Key) -> + <>. + %% |123|345|678| %% foo bar baz