From 1f033f92b553ac6afff95154bb50a1352410a7d5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 9 Feb 2023 19:22:18 +0300 Subject: [PATCH] feat(replay): allow to preserve / restore iterators in the db So that we could guarantee replay consistency / availability under the assumption that nodes may be restarted or even lost occasionally. --- apps/emqx_replay/src/emqx_replay.erl | 8 + .../src/emqx_replay_local_store.erl | 177 ++++++++++++++---- .../src/emqx_replay_message_storage.erl | 43 ++--- .../test/emqx_replay_local_store_SUITE.erl | 4 +- .../emqx_replay_message_storage_shim.erl | 6 +- .../props/prop_replay_message_storage.erl | 61 +++--- 6 files changed, 208 insertions(+), 91 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay.erl b/apps/emqx_replay/src/emqx_replay.erl index fb1ec39c6..ed790697f 100644 --- a/apps/emqx_replay/src/emqx_replay.erl +++ b/apps/emqx_replay/src/emqx_replay.erl @@ -19,6 +19,7 @@ -export([]). -export_type([topic/0, time/0]). +-export_type([replay_id/0, replay/0]). %%================================================================================ %% Type declarations @@ -32,6 +33,13 @@ %% TODO granularity? -type time() :: non_neg_integer(). +-type replay_id() :: binary(). + +-type replay() :: { + _TopicFilter :: topic(), + _StartTime :: time() +}. + %%================================================================================ %% API funcions %%================================================================================ diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 7148308b2..8a74f248e 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -21,7 +21,11 @@ -export([start_link/1]). -export([create_generation/3]). --export([store/5, make_iterator/3, next/1]). +-export([store/5]). + +-export([make_iterator/2, next/1]). + +-export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]). %% behavior callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -57,14 +61,14 @@ -record(s, { zone :: emqx_types:zone(), db :: rocksdb:db_handle(), - column_families :: cf_refs() + cf_iterator :: rocksdb:cf_handle(), + cf_generations :: cf_refs() }). -record(it, { zone :: emqx_types:zone(), gen :: gen_id(), - filter :: emqx_topic:words(), - start_time :: emqx_replay:time(), + replay :: emqx_replay:replay(), module :: module(), data :: term() }). @@ -79,8 +83,17 @@ %% [{<<"genNN">>, #generation{}}, ..., %% {<<"current">>, GenID}] +-define(DEFAULT_CF, "default"). -define(DEFAULT_CF_OPTS, []). +-define(ITERATOR_CF, "$iterators"). + +%% TODO +%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`. +%% 2. Supposedly might be compressed _very_ effectively. +%% 3. `inplace_update_support`? +-define(ITERATOR_CF_OPTS, []). + -define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}). %%================================================================================ @@ -102,15 +115,14 @@ store(Zone, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) -> +-spec make_iterator(emqx_types:zone(), emqx_replay:replay()) -> {ok, iterator()} | {error, _TODO}. -make_iterator(Zone, TopicFilter, StartTime) -> +make_iterator(Zone, Replay = {_, StartTime}) -> {GenId, Gen} = meta_lookup_gen(Zone, StartTime), open_iterator(Gen, #it{ zone = Zone, gen = GenId, - filter = TopicFilter, - start_time = StartTime + replay = Replay }). -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. @@ -131,20 +143,37 @@ next(It = #it{module = Mod, data = ItData}) -> end end. +-spec preserve_iterator(iterator(), emqx_replay:replay_id()) -> + ok | {error, _TODO}. +preserve_iterator(It = #it{}, ReplayID) -> + iterator_put_state(ReplayID, It). + +-spec restore_iterator(emqx_types:zone(), emqx_replay:replay_id()) -> + {ok, iterator()} | {error, _TODO}. +restore_iterator(Zone, ReplayID) -> + case iterator_get_state(Zone, ReplayID) of + {ok, Serial} -> + restore_iterator_state(Zone, Serial); + not_found -> + {error, not_found}; + {error, _Reason} = Error -> + Error + end. + +-spec discard_iterator(emqx_types:zone(), emqx_replay:replay_id()) -> + ok | {error, _TODO}. +discard_iterator(Zone, ReplayID) -> + iterator_delete(Zone, ReplayID). + %%================================================================================ %% behavior callbacks %%================================================================================ init([Zone]) -> process_flag(trap_exit, true), - {ok, DBHandle, CFRefs} = open_db(Zone), - S0 = #s{ - zone = Zone, - db = DBHandle, - column_families = CFRefs - }, + {ok, S0} = open_db(Zone), S = ensure_current_generation(S0), - read_metadata(S), + ok = populate_metadata(S), {ok, S}. handle_call({create_generation, Since, Config}, _From, S) -> @@ -171,13 +200,16 @@ terminate(_Reason, #s{db = DB, zone = Zone}) -> %% Internal functions %%================================================================================ --spec read_metadata(state()) -> ok. -read_metadata(S = #s{db = DBHandle}) -> - Current = schema_get_current(DBHandle), - lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)). +-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}). --spec read_metadata(gen_id(), state()) -> ok. -read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) -> +-spec populate_metadata(state()) -> ok. +populate_metadata(S = #s{zone = Zone, db = DBHandle, cf_iterator = CFIterator}) -> + ok = meta_put(Zone, db, #db{handle = DBHandle, cf_iterator = CFIterator}), + Current = schema_get_current(DBHandle), + lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)). + +-spec populate_metadata(gen_id(), state()) -> ok. +populate_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) -> Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), meta_register_gen(Zone, GenId, Gen). @@ -193,7 +225,7 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> end. -spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> - {ok, gen_id(), state()}. + {ok, gen_id(), state()} | {error, nonmonotonic}. create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> GenId = get_next_id(meta_get_current(Zone)), GenId = get_next_id(schema_get_current(DBHandle)), @@ -211,7 +243,7 @@ create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> -spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> {ok, generation(), state()}. -create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) -> +create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) -> % TODO: Backend implementation should ensure idempotency. {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), Gen = #{ @@ -219,24 +251,38 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_familie data => Schema, since => Since }, - {ok, Gen, S#s{column_families = NewCFs ++ CFs}}. + {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. +-spec open_db(emqx_types:zone()) -> {ok, state()} | {error, _TODO}. open_db(Zone) -> Filename = atom_to_list(Zone), - DBOptions = emqx_replay_conf:db_options(), - ColumnFamiles = + DBOptions = [ + {create_if_missing, true}, + {create_missing_column_families, true} + | emqx_replay_conf:db_options() + ], + ExistingCFs = case rocksdb:list_column_families(Filename, DBOptions) of - {ok, ColumnFamiles0} -> - [{I, []} || I <- ColumnFamiles0]; + {ok, CFs} -> + [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; % DB is not present. First start {error, {db_open, _}} -> - [{"default", ?DEFAULT_CF_OPTS}] + [] end, - case rocksdb:open(Filename, [{create_if_missing, true} | DBOptions], ColumnFamiles) of - {ok, Handle, CFRefs} -> - {CFNames, _} = lists:unzip(ColumnFamiles), - {ok, Handle, lists:zip(CFNames, CFRefs)}; + ColumnFamilies = [ + {?DEFAULT_CF, ?DEFAULT_CF_OPTS}, + {?ITERATOR_CF, ?ITERATOR_CF_OPTS} + | ExistingCFs + ], + case rocksdb:open(Filename, DBOptions, ColumnFamilies) of + {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> + {CFNames, _} = lists:unzip(ExistingCFs), + {ok, #s{ + zone = Zone, + db = DBHandle, + cf_iterator = CFIterator, + cf_generations = lists:zip(CFNames, CFRefs) + }}; Error -> Error end. @@ -245,7 +291,7 @@ open_db(Zone) -> open_gen( GenId, Gen = #{module := Mod, data := Data}, - #s{zone = Zone, db = DBHandle, column_families = CFs} + #s{zone = Zone, db = DBHandle, cf_generations = CFs} ) -> DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), Gen#{data := DB}. @@ -261,13 +307,72 @@ open_next_iterator(Gen = #{}, It) -> -spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. open_iterator(#{module := Mod, data := Data}, It = #it{}) -> - case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of + case Mod:make_iterator(Data, It#it.replay) of {ok, ItData} -> {ok, It#it{module = Mod, data = ItData}}; Err -> Err end. +-spec open_restore_iterator(generation(), iterator(), binary()) -> + {ok, iterator()} | {error, _Reason}. +open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) -> + case Mod:restore_iterator(Data, Replay, Serial) of + {ok, ItData} -> + {ok, It#it{module = Mod, data = ItData}}; + Err -> + Err + end. + +%% + +-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>). + +-define(ITERATION_WRITE_OPTS, []). +-define(ITERATION_READ_OPTS, []). + +iterator_get_state(Zone, ReplayID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db), + rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS). + +iterator_put_state(ID, It = #it{zone = Zone}) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db), + Serial = preserve_iterator_state(It), + rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS). + +iterator_delete(Zone, ID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db), + rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS). + +preserve_iterator_state(#it{ + gen = Gen, + replay = {TopicFilter, StartTime}, + module = Mod, + data = ItData +}) -> + term_to_binary(#{ + v => 1, + gen => Gen, + filter => TopicFilter, + start => StartTime, + st => Mod:preserve_iterator(ItData) + }). + +restore_iterator_state(Zone, Serial) when is_binary(Serial) -> + restore_iterator_state(Zone, binary_to_term(Serial)); +restore_iterator_state( + Zone, + #{ + v := 1, + gen := Gen, + filter := TopicFilter, + start := StartTime, + st := State + } +) -> + It = #it{zone = Zone, gen = Gen, replay = {TopicFilter, StartTime}}, + open_restore_iterator(meta_get_gen(Zone, Gen), It, State). + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index fb96863d1..fbeb452c9 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -94,12 +94,12 @@ -export([make_keymapper/1]). -export([store/5]). +-export([make_iterator/2]). -export([make_iterator/3]). --export([make_iterator/4]). -export([next/1]). -export([preserve_iterator/1]). --export([restore_iterator/2]). +-export([restore_iterator/3]). -export([refresh_iterator/1]). %% Debug/troubleshooting: @@ -114,7 +114,7 @@ %% Keyspace filters -export([ - make_keyspace_filter/3, + make_keyspace_filter/2, compute_initial_seek/1, compute_next_seek/2, compute_time_seek/3, @@ -289,20 +289,20 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_topic:words(), time()) -> +-spec make_iterator(db(), emqx_replay:replay()) -> {ok, iterator()} | {error, _TODO}. -make_iterator(DB, TopicFilter, StartTime) -> +make_iterator(DB, Replay) -> Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), - make_iterator(DB, TopicFilter, StartTime, Options). + make_iterator(DB, Replay, Options). --spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) -> +-spec make_iterator(db(), emqx_replay:replay(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. -make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) -> +make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), + Filter = make_keyspace_filter(Replay, DB#db.keymapper), InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), {ok, #it{ @@ -342,26 +342,23 @@ next(It0 = #it{filter = #filter{keymapper = Keymapper}}) -> end. -spec preserve_iterator(iterator()) -> binary(). -preserve_iterator(#it{cursor = Cursor, filter = Filter}) -> +preserve_iterator(#it{cursor = Cursor}) -> State = #{ v => 1, - cursor => Cursor, - filter => Filter#filter.topic_filter, - stime => Filter#filter.start_time + cursor => Cursor }, term_to_binary(State). --spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}. -restore_iterator(DB, Serial) when is_binary(Serial) -> +-spec restore_iterator(db(), emqx_replay:replay(), binary()) -> + {ok, iterator()} | {error, _TODO}. +restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> State = binary_to_term(Serial), - restore_iterator(DB, State); -restore_iterator(DB, #{ + restore_iterator(DB, Replay, State); +restore_iterator(DB, Replay, #{ v := 1, - cursor := Cursor, - filter := TopicFilter, - stime := StartTime + cursor := Cursor }) -> - case make_iterator(DB, TopicFilter, StartTime) of + case make_iterator(DB, Replay) of {ok, It} when Cursor == undefined -> % Iterator was preserved right after it has been made. {ok, It}; @@ -434,8 +431,8 @@ hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). --spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter(). -make_keyspace_filter(TopicFilter, StartTime, Keymapper) -> +-spec make_keyspace_filter(emqx_replay:replay(), keymapper()) -> keyspace_filter(). +make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), TimeBitmask = compute_time_bitmask(Keymapper), diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index edda0f7f6..f36dda267 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -73,7 +73,7 @@ t_iterate(_Config) -> %% Iterate through individual topics: [ begin - {ok, It} = emqx_replay_local_store:make_iterator(?ZONE, Topic, 0), + {ok, It} = emqx_replay_local_store:make_iterator(?ZONE, {Topic, 0}), Values = iterate(It), ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) end @@ -195,7 +195,7 @@ store(Zone, PublishedAt, Topic, Payload) -> emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). iterate(DB, TopicFilter, StartTime) -> - {ok, It} = emqx_replay_local_store:make_iterator(DB, parse_topic(TopicFilter), StartTime), + {ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), iterate(It). iterate(It) -> diff --git a/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl index 125c9a9fc..160451cb9 100644 --- a/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl +++ b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl @@ -19,7 +19,7 @@ -export([open/0]). -export([close/1]). -export([store/5]). --export([iterate/3]). +-export([iterate/2]). -type topic() :: list(binary()). -type time() :: integer(). @@ -41,9 +41,9 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) -> true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), ok. --spec iterate(t(), emqx_topic:words(), time()) -> +-spec iterate(t(), emqx_replay:replay()) -> [binary()]. -iterate(Tab, TopicFilter, StartTime) -> +iterate(Tab, {TopicFilter, StartTime}) -> ets:foldr( fun({{PublishedAt, _}, Topic, Payload}, Acc) -> case emqx_topic:match(Topic, TopicFilter) of diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index c468097c7..ede6dc336 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -21,6 +21,8 @@ -define(WORK_DIR, ["_build", "test"]). -define(RUN_ID, {?MODULE, testrun_id}). + +-define(ZONE, ?MODULE). -define(GEN_ID, 42). %%-------------------------------------------------------------------- @@ -51,8 +53,7 @@ prop_next_seek_monotonic() -> {topic_filter(), pos_integer(), keymapper()}, begin Filter = emqx_replay_message_storage:make_keyspace_filter( - TopicFilter, - StartTime, + {TopicFilter, StartTime}, Keymapper ), ?FORALL( @@ -99,8 +100,9 @@ prop_iterate_messages() -> }, begin TopicFilter = make_topic_filter(Pattern, Topic), - Messages = iterate_db(DB, TopicFilter, StartTime), - Reference = iterate_shim(Shim, TopicFilter, StartTime), + Iteration = {TopicFilter, StartTime}, + Messages = iterate_db(DB, Iteration), + Reference = iterate_shim(Shim, Iteration), ok = close_db(Handle), ok = emqx_replay_message_storage_shim:close(Shim), ?WHENFAIL( @@ -143,10 +145,11 @@ prop_iterate_eq_iterate_with_preserve_restore() -> shuffled(flat([non_empty(list({preserve, restore})), list(iterate)])) }, begin - TopicFilter = make_topic_filter(Pat, Topic), - Iterator = make_iterator(DB, TopicFilter, StartTime), - Messages = run_iterator_commands(Commands, Iterator, DB), - equals(Messages, iterate_db(DB, TopicFilter, StartTime)) + Replay = {make_topic_filter(Pat, Topic), StartTime}, + Iterator = make_iterator(DB, Replay), + Ctx = #{db => DB, replay => Replay}, + Messages = run_iterator_commands(Commands, Iterator, Ctx), + equals(Messages, iterate_db(DB, Replay)) end ) end). @@ -177,11 +180,11 @@ prop_iterate_eq_iterate_with_refresh() -> pos_integer() }, ?TIMEOUT(5000, begin - TopicFilter = make_topic_filter(Pat, Topic), + Replay = {make_topic_filter(Pat, Topic), StartTime}, IterationOptions = #{iterator_refresh => {every, RefreshEvery}}, - Iterator = make_iterator(DB, TopicFilter, StartTime, IterationOptions), + Iterator = make_iterator(DB, Replay, IterationOptions), Messages = iterate_db(Iterator), - equals(Messages, iterate_db(DB, TopicFilter, StartTime)) + equals(Messages, iterate_db(DB, Replay)) end) ) end). @@ -205,8 +208,8 @@ store_db(DB, Messages) -> Messages ). -iterate_db(DB, TopicFilter, StartTime) -> - iterate_db(make_iterator(DB, TopicFilter, StartTime)). +iterate_db(DB, Iteration) -> + iterate_db(make_iterator(DB, Iteration)). iterate_db(It) -> case emqx_replay_message_storage:next(It) of @@ -216,26 +219,30 @@ iterate_db(It) -> [] end. -make_iterator(DB, TopicFilter, StartTime) -> - {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime), +make_iterator(DB, Replay) -> + {ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay), It. -make_iterator(DB, TopicFilter, StartTime, Options) -> - {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime, Options), +make_iterator(DB, Replay, Options) -> + {ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay, Options), It. -run_iterator_commands([iterate | Rest], It, DB) -> +run_iterator_commands([iterate | Rest], It, Ctx) -> case emqx_replay_message_storage:next(It) of {value, Payload, ItNext} -> - [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, DB)]; + [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)]; none -> [] end; -run_iterator_commands([{preserve, restore} | Rest], It, DB) -> +run_iterator_commands([{preserve, restore} | Rest], It, Ctx) -> + #{ + db := DB, + replay := Replay + } = Ctx, Serial = emqx_replay_message_storage:preserve_iterator(It), - {ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Serial), - run_iterator_commands(Rest, ItNext, DB); -run_iterator_commands([], It, _DB) -> + {ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Replay, Serial), + run_iterator_commands(Rest, ItNext, Ctx); +run_iterator_commands([], It, _Ctx) -> iterate_db(It). store_shim(Shim, Messages) -> @@ -247,10 +254,10 @@ store_shim(Shim, Messages) -> Messages ). -iterate_shim(Shim, TopicFilter, StartTime) -> +iterate_shim(Shim, Iteration) -> lists:map( fun binary_to_term/1, - emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime) + emqx_replay_message_storage_shim:iterate(Shim, Iteration) ). %%-------------------------------------------------------------------- @@ -260,7 +267,7 @@ iterate_shim(Shim, TopicFilter, StartTime) -> open_db(Filepath, Options) -> {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), {Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options), - DB = emqx_replay_message_storage:open(Handle, ?GEN_ID, CFRefs, Schema), + DB = emqx_replay_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema), {DB, Handle}. close_db(Handle) -> @@ -384,7 +391,7 @@ keyspace_filter() -> ?LET( {TopicFilter, StartTime, Keymapper}, {topic_filter(), pos_integer(), keymapper()}, - emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper) + emqx_replay_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ). messages(Topic) ->