Merge pull request #9947 from feat/EMQX-8744/iterators-cf

feat(replay): allow to preserve / restore iterators in the db
This commit is contained in:
Andrew Mayorov 2023-04-11 15:20:59 +03:00 committed by GitHub
commit 6423083895
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 260 additions and 92 deletions

View File

@ -19,6 +19,7 @@
-export([]). -export([]).
-export_type([topic/0, time/0]). -export_type([topic/0, time/0]).
-export_type([replay_id/0, replay/0]).
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
@ -32,6 +33,13 @@
%% TODO granularity? %% TODO granularity?
-type time() :: non_neg_integer(). -type time() :: non_neg_integer().
-type replay_id() :: binary().
-type replay() :: {
_TopicFilter :: topic(),
_StartTime :: time()
}.
%%================================================================================ %%================================================================================
%% API funcions %% API funcions
%%================================================================================ %%================================================================================

View File

@ -21,7 +21,11 @@
-export([start_link/1]). -export([start_link/1]).
-export([create_generation/3]). -export([create_generation/3]).
-export([store/5, make_iterator/3, next/1]). -export([store/5]).
-export([make_iterator/2, next/1]).
-export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]).
%% behavior callbacks: %% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -57,14 +61,14 @@
-record(s, { -record(s, {
zone :: emqx_types:zone(), zone :: emqx_types:zone(),
db :: rocksdb:db_handle(), db :: rocksdb:db_handle(),
column_families :: cf_refs() cf_iterator :: rocksdb:cf_handle(),
cf_generations :: cf_refs()
}). }).
-record(it, { -record(it, {
zone :: emqx_types:zone(), zone :: emqx_types:zone(),
gen :: gen_id(), gen :: gen_id(),
filter :: emqx_topic:words(), replay :: emqx_replay:replay(),
start_time :: emqx_replay:time(),
module :: module(), module :: module(),
data :: term() data :: term()
}). }).
@ -79,8 +83,17 @@
%% [{<<"genNN">>, #generation{}}, ..., %% [{<<"genNN">>, #generation{}}, ...,
%% {<<"current">>, GenID}] %% {<<"current">>, GenID}]
-define(DEFAULT_CF, "default").
-define(DEFAULT_CF_OPTS, []). -define(DEFAULT_CF_OPTS, []).
-define(ITERATOR_CF, "$iterators").
%% TODO
%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`.
%% 2. Supposedly might be compressed _very_ effectively.
%% 3. `inplace_update_support`?
-define(ITERATOR_CF_OPTS, []).
-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}). -define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}).
%%================================================================================ %%================================================================================
@ -102,15 +115,14 @@ store(Zone, GUID, Time, Topic, Msg) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time), {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time),
Mod:store(Data, GUID, Time, Topic, Msg). Mod:store(Data, GUID, Time, Topic, Msg).
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) -> -spec make_iterator(emqx_types:zone(), emqx_replay:replay()) ->
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
make_iterator(Zone, TopicFilter, StartTime) -> make_iterator(Zone, Replay = {_, StartTime}) ->
{GenId, Gen} = meta_lookup_gen(Zone, StartTime), {GenId, Gen} = meta_lookup_gen(Zone, StartTime),
open_iterator(Gen, #it{ open_iterator(Gen, #it{
zone = Zone, zone = Zone,
gen = GenId, gen = GenId,
filter = TopicFilter, replay = Replay
start_time = StartTime
}). }).
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
@ -131,20 +143,37 @@ next(It = #it{module = Mod, data = ItData}) ->
end end
end. end.
-spec preserve_iterator(iterator(), emqx_replay:replay_id()) ->
ok | {error, _TODO}.
preserve_iterator(It = #it{}, ReplayID) ->
iterator_put_state(ReplayID, It).
-spec restore_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
{ok, iterator()} | {error, _TODO}.
restore_iterator(Zone, ReplayID) ->
case iterator_get_state(Zone, ReplayID) of
{ok, Serial} ->
restore_iterator_state(Zone, Serial);
not_found ->
{error, not_found};
{error, _Reason} = Error ->
Error
end.
-spec discard_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
ok | {error, _TODO}.
discard_iterator(Zone, ReplayID) ->
iterator_delete(Zone, ReplayID).
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
%%================================================================================ %%================================================================================
init([Zone]) -> init([Zone]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, DBHandle, CFRefs} = open_db(Zone), {ok, S0} = open_db(Zone),
S0 = #s{
zone = Zone,
db = DBHandle,
column_families = CFRefs
},
S = ensure_current_generation(S0), S = ensure_current_generation(S0),
read_metadata(S), ok = populate_metadata(S),
{ok, S}. {ok, S}.
handle_call({create_generation, Since, Config}, _From, S) -> handle_call({create_generation, Since, Config}, _From, S) ->
@ -171,13 +200,16 @@ terminate(_Reason, #s{db = DB, zone = Zone}) ->
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
-spec read_metadata(state()) -> ok. -record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
read_metadata(S = #s{db = DBHandle}) ->
Current = schema_get_current(DBHandle),
lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)).
-spec read_metadata(gen_id(), state()) -> ok. -spec populate_metadata(state()) -> ok.
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) -> populate_metadata(S = #s{zone = Zone, db = DBHandle, cf_iterator = CFIterator}) ->
ok = meta_put(Zone, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
Current = schema_get_current(DBHandle),
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
-spec populate_metadata(gen_id(), state()) -> ok.
populate_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) ->
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
meta_register_gen(Zone, GenId, Gen). meta_register_gen(Zone, GenId, Gen).
@ -193,7 +225,7 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
end. end.
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> -spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, gen_id(), state()}. {ok, gen_id(), state()} | {error, nonmonotonic}.
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Zone)), GenId = get_next_id(meta_get_current(Zone)),
GenId = get_next_id(schema_get_current(DBHandle)), GenId = get_next_id(schema_get_current(DBHandle)),
@ -211,7 +243,7 @@ create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> -spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, generation(), state()}. {ok, generation(), state()}.
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) -> create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
% TODO: Backend implementation should ensure idempotency. % TODO: Backend implementation should ensure idempotency.
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
Gen = #{ Gen = #{
@ -219,24 +251,38 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_familie
data => Schema, data => Schema,
since => Since since => Since
}, },
{ok, Gen, S#s{column_families = NewCFs ++ CFs}}. {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. -spec open_db(emqx_types:zone()) -> {ok, state()} | {error, _TODO}.
open_db(Zone) -> open_db(Zone) ->
Filename = atom_to_list(Zone), Filename = atom_to_list(Zone),
DBOptions = emqx_replay_conf:db_options(), DBOptions = [
ColumnFamiles = {create_if_missing, true},
{create_missing_column_families, true}
| emqx_replay_conf:db_options()
],
ExistingCFs =
case rocksdb:list_column_families(Filename, DBOptions) of case rocksdb:list_column_families(Filename, DBOptions) of
{ok, ColumnFamiles0} -> {ok, CFs} ->
[{I, []} || I <- ColumnFamiles0]; [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
% DB is not present. First start % DB is not present. First start
{error, {db_open, _}} -> {error, {db_open, _}} ->
[{"default", ?DEFAULT_CF_OPTS}] []
end, end,
case rocksdb:open(Filename, [{create_if_missing, true} | DBOptions], ColumnFamiles) of ColumnFamilies = [
{ok, Handle, CFRefs} -> {?DEFAULT_CF, ?DEFAULT_CF_OPTS},
{CFNames, _} = lists:unzip(ColumnFamiles), {?ITERATOR_CF, ?ITERATOR_CF_OPTS}
{ok, Handle, lists:zip(CFNames, CFRefs)}; | ExistingCFs
],
case rocksdb:open(Filename, DBOptions, ColumnFamilies) of
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
{CFNames, _} = lists:unzip(ExistingCFs),
{ok, #s{
zone = Zone,
db = DBHandle,
cf_iterator = CFIterator,
cf_generations = lists:zip(CFNames, CFRefs)
}};
Error -> Error ->
Error Error
end. end.
@ -245,7 +291,7 @@ open_db(Zone) ->
open_gen( open_gen(
GenId, GenId,
Gen = #{module := Mod, data := Data}, Gen = #{module := Mod, data := Data},
#s{zone = Zone, db = DBHandle, column_families = CFs} #s{zone = Zone, db = DBHandle, cf_generations = CFs}
) -> ) ->
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
Gen#{data := DB}. Gen#{data := DB}.
@ -261,13 +307,72 @@ open_next_iterator(Gen = #{}, It) ->
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. -spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
open_iterator(#{module := Mod, data := Data}, It = #it{}) -> open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of case Mod:make_iterator(Data, It#it.replay) of
{ok, ItData} -> {ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}}; {ok, It#it{module = Mod, data = ItData}};
Err -> Err ->
Err Err
end. end.
-spec open_restore_iterator(generation(), iterator(), binary()) ->
{ok, iterator()} | {error, _Reason}.
open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) ->
case Mod:restore_iterator(Data, Replay, Serial) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
Err
end.
%%
-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>).
-define(ITERATION_WRITE_OPTS, []).
-define(ITERATION_READ_OPTS, []).
iterator_get_state(Zone, ReplayID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
iterator_put_state(ID, It = #it{zone = Zone}) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
Serial = preserve_iterator_state(It),
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
iterator_delete(Zone, ID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
preserve_iterator_state(#it{
gen = Gen,
replay = {TopicFilter, StartTime},
module = Mod,
data = ItData
}) ->
term_to_binary(#{
v => 1,
gen => Gen,
filter => TopicFilter,
start => StartTime,
st => Mod:preserve_iterator(ItData)
}).
restore_iterator_state(Zone, Serial) when is_binary(Serial) ->
restore_iterator_state(Zone, binary_to_term(Serial));
restore_iterator_state(
Zone,
#{
v := 1,
gen := Gen,
filter := TopicFilter,
start := StartTime,
st := State
}
) ->
It = #it{zone = Zone, gen = Gen, replay = {TopicFilter, StartTime}},
open_restore_iterator(meta_get_gen(Zone, Gen), It, State).
%% Functions for dealing with the metadata stored persistently in rocksdb %% Functions for dealing with the metadata stored persistently in rocksdb
-define(CURRENT_GEN, <<"current">>). -define(CURRENT_GEN, <<"current">>).

View File

@ -94,12 +94,12 @@
-export([make_keymapper/1]). -export([make_keymapper/1]).
-export([store/5]). -export([store/5]).
-export([make_iterator/2]).
-export([make_iterator/3]). -export([make_iterator/3]).
-export([make_iterator/4]).
-export([next/1]). -export([next/1]).
-export([preserve_iterator/1]). -export([preserve_iterator/1]).
-export([restore_iterator/2]). -export([restore_iterator/3]).
-export([refresh_iterator/1]). -export([refresh_iterator/1]).
%% Debug/troubleshooting: %% Debug/troubleshooting:
@ -114,7 +114,7 @@
%% Keyspace filters %% Keyspace filters
-export([ -export([
make_keyspace_filter/3, make_keyspace_filter/2,
compute_initial_seek/1, compute_initial_seek/1,
compute_next_seek/2, compute_next_seek/2,
compute_time_seek/3, compute_time_seek/3,
@ -289,20 +289,20 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
Value = make_message_value(Topic, MessagePayload), Value = make_message_value(Topic, MessagePayload),
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-spec make_iterator(db(), emqx_topic:words(), time()) -> -spec make_iterator(db(), emqx_replay:replay()) ->
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
make_iterator(DB, TopicFilter, StartTime) -> make_iterator(DB, Replay) ->
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
make_iterator(DB, TopicFilter, StartTime, Options). make_iterator(DB, Replay, Options).
-spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) -> -spec make_iterator(db(), emqx_replay:replay(), iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time % {error, invalid_start_time}? might just start from the beginning of time
% and call it a day: client violated the contract anyway. % and call it a day: client violated the contract anyway.
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) -> make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) ->
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
{ok, ITHandle} -> {ok, ITHandle} ->
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), Filter = make_keyspace_filter(Replay, DB#db.keymapper),
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
{ok, #it{ {ok, #it{
@ -342,26 +342,23 @@ next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
end. end.
-spec preserve_iterator(iterator()) -> binary(). -spec preserve_iterator(iterator()) -> binary().
preserve_iterator(#it{cursor = Cursor, filter = Filter}) -> preserve_iterator(#it{cursor = Cursor}) ->
State = #{ State = #{
v => 1, v => 1,
cursor => Cursor, cursor => Cursor
filter => Filter#filter.topic_filter,
stime => Filter#filter.start_time
}, },
term_to_binary(State). term_to_binary(State).
-spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}. -spec restore_iterator(db(), emqx_replay:replay(), binary()) ->
restore_iterator(DB, Serial) when is_binary(Serial) -> {ok, iterator()} | {error, _TODO}.
restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
State = binary_to_term(Serial), State = binary_to_term(Serial),
restore_iterator(DB, State); restore_iterator(DB, Replay, State);
restore_iterator(DB, #{ restore_iterator(DB, Replay, #{
v := 1, v := 1,
cursor := Cursor, cursor := Cursor
filter := TopicFilter,
stime := StartTime
}) -> }) ->
case make_iterator(DB, TopicFilter, StartTime) of case make_iterator(DB, Replay) of
{ok, It} when Cursor == undefined -> {ok, It} when Cursor == undefined ->
% Iterator was preserved right after it has been made. % Iterator was preserved right after it has been made.
{ok, It}; {ok, It};
@ -434,8 +431,8 @@ hash(Input, Bits) ->
% at most 32 bits % at most 32 bits
erlang:phash2(Input, 1 bsl Bits). erlang:phash2(Input, 1 bsl Bits).
-spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter(). -spec make_keyspace_filter(emqx_replay:replay(), keymapper()) -> keyspace_filter().
make_keyspace_filter(TopicFilter, StartTime, Keymapper) -> make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
TimeBitmask = compute_time_bitmask(Keymapper), TimeBitmask = compute_time_bitmask(Keymapper),

View File

@ -73,7 +73,7 @@ t_iterate(_Config) ->
%% Iterate through individual topics: %% Iterate through individual topics:
[ [
begin begin
{ok, It} = emqx_replay_local_store:make_iterator(?ZONE, Topic, 0), {ok, It} = emqx_replay_local_store:make_iterator(?ZONE, {Topic, 0}),
Values = iterate(It), Values = iterate(It),
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
end end
@ -190,13 +190,49 @@ t_iterate_multigen(_Config) ->
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)]) lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
). ).
t_iterate_multigen_preserve_restore(_Config) ->
ReplayID = atom_to_binary(?FUNCTION_NAME),
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 100, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
Timestamps = lists:seq(1, 100),
TopicFilter = "foo/#",
TopicsMatching = ["foo/bar", "foo/bar/baz"],
_ = [
store(?ZONE, TS, Topic, term_to_binary({Topic, TS}))
|| Topic <- Topics, TS <- Timestamps
],
It0 = iterator(?ZONE, TopicFilter, 0),
{It1, Res10} = iterate(It0, 10),
% preserve mid-generation
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID),
{ok, It2} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
{It3, Res100} = iterate(It2, 88),
% preserve on the generation boundary
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID),
{ok, It4} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
{It5, Res200} = iterate(It4, 1000),
?assertEqual(none, It5),
?assertEqual(
lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
),
?assertEqual(
ok,
emqx_replay_local_store:discard_iterator(?ZONE, ReplayID)
),
?assertEqual(
{error, not_found},
emqx_replay_local_store:restore_iterator(?ZONE, ReplayID)
).
store(Zone, PublishedAt, Topic, Payload) -> store(Zone, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(), ID = emqx_guid:gen(),
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
iterate(DB, TopicFilter, StartTime) -> iterate(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_local_store:make_iterator(DB, parse_topic(TopicFilter), StartTime), iterate(iterator(DB, TopicFilter, StartTime)).
iterate(It).
iterate(It) -> iterate(It) ->
case emqx_replay_local_store:next(It) of case emqx_replay_local_store:next(It) of
@ -206,6 +242,21 @@ iterate(It) ->
[] []
end. end.
iterate(It, 0) ->
{It, []};
iterate(It, N) ->
case emqx_replay_local_store:next(It) of
{value, Payload, ItNext} ->
{ItFinal, Ps} = iterate(ItNext, N - 1),
{ItFinal, [Payload | Ps]};
none ->
{none, []}
end.
iterator(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
It.
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
Topic; Topic;
parse_topic(Topic) -> parse_topic(Topic) ->

View File

@ -19,7 +19,7 @@
-export([open/0]). -export([open/0]).
-export([close/1]). -export([close/1]).
-export([store/5]). -export([store/5]).
-export([iterate/3]). -export([iterate/2]).
-type topic() :: list(binary()). -type topic() :: list(binary()).
-type time() :: integer(). -type time() :: integer().
@ -41,9 +41,9 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) ->
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
ok. ok.
-spec iterate(t(), emqx_topic:words(), time()) -> -spec iterate(t(), emqx_replay:replay()) ->
[binary()]. [binary()].
iterate(Tab, TopicFilter, StartTime) -> iterate(Tab, {TopicFilter, StartTime}) ->
ets:foldr( ets:foldr(
fun({{PublishedAt, _}, Topic, Payload}, Acc) -> fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
case emqx_topic:match(Topic, TopicFilter) of case emqx_topic:match(Topic, TopicFilter) of

View File

@ -21,6 +21,8 @@
-define(WORK_DIR, ["_build", "test"]). -define(WORK_DIR, ["_build", "test"]).
-define(RUN_ID, {?MODULE, testrun_id}). -define(RUN_ID, {?MODULE, testrun_id}).
-define(ZONE, ?MODULE).
-define(GEN_ID, 42). -define(GEN_ID, 42).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -51,8 +53,7 @@ prop_next_seek_monotonic() ->
{topic_filter(), pos_integer(), keymapper()}, {topic_filter(), pos_integer(), keymapper()},
begin begin
Filter = emqx_replay_message_storage:make_keyspace_filter( Filter = emqx_replay_message_storage:make_keyspace_filter(
TopicFilter, {TopicFilter, StartTime},
StartTime,
Keymapper Keymapper
), ),
?FORALL( ?FORALL(
@ -99,8 +100,9 @@ prop_iterate_messages() ->
}, },
begin begin
TopicFilter = make_topic_filter(Pattern, Topic), TopicFilter = make_topic_filter(Pattern, Topic),
Messages = iterate_db(DB, TopicFilter, StartTime), Iteration = {TopicFilter, StartTime},
Reference = iterate_shim(Shim, TopicFilter, StartTime), Messages = iterate_db(DB, Iteration),
Reference = iterate_shim(Shim, Iteration),
ok = close_db(Handle), ok = close_db(Handle),
ok = emqx_replay_message_storage_shim:close(Shim), ok = emqx_replay_message_storage_shim:close(Shim),
?WHENFAIL( ?WHENFAIL(
@ -143,10 +145,11 @@ prop_iterate_eq_iterate_with_preserve_restore() ->
shuffled(flat([non_empty(list({preserve, restore})), list(iterate)])) shuffled(flat([non_empty(list({preserve, restore})), list(iterate)]))
}, },
begin begin
TopicFilter = make_topic_filter(Pat, Topic), Replay = {make_topic_filter(Pat, Topic), StartTime},
Iterator = make_iterator(DB, TopicFilter, StartTime), Iterator = make_iterator(DB, Replay),
Messages = run_iterator_commands(Commands, Iterator, DB), Ctx = #{db => DB, replay => Replay},
equals(Messages, iterate_db(DB, TopicFilter, StartTime)) Messages = run_iterator_commands(Commands, Iterator, Ctx),
equals(Messages, iterate_db(DB, Replay))
end end
) )
end). end).
@ -177,11 +180,11 @@ prop_iterate_eq_iterate_with_refresh() ->
pos_integer() pos_integer()
}, },
?TIMEOUT(5000, begin ?TIMEOUT(5000, begin
TopicFilter = make_topic_filter(Pat, Topic), Replay = {make_topic_filter(Pat, Topic), StartTime},
IterationOptions = #{iterator_refresh => {every, RefreshEvery}}, IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
Iterator = make_iterator(DB, TopicFilter, StartTime, IterationOptions), Iterator = make_iterator(DB, Replay, IterationOptions),
Messages = iterate_db(Iterator), Messages = iterate_db(Iterator),
equals(Messages, iterate_db(DB, TopicFilter, StartTime)) equals(Messages, iterate_db(DB, Replay))
end) end)
) )
end). end).
@ -205,8 +208,8 @@ store_db(DB, Messages) ->
Messages Messages
). ).
iterate_db(DB, TopicFilter, StartTime) -> iterate_db(DB, Iteration) ->
iterate_db(make_iterator(DB, TopicFilter, StartTime)). iterate_db(make_iterator(DB, Iteration)).
iterate_db(It) -> iterate_db(It) ->
case emqx_replay_message_storage:next(It) of case emqx_replay_message_storage:next(It) of
@ -216,26 +219,30 @@ iterate_db(It) ->
[] []
end. end.
make_iterator(DB, TopicFilter, StartTime) -> make_iterator(DB, Replay) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime), {ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay),
It. It.
make_iterator(DB, TopicFilter, StartTime, Options) -> make_iterator(DB, Replay, Options) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime, Options), {ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay, Options),
It. It.
run_iterator_commands([iterate | Rest], It, DB) -> run_iterator_commands([iterate | Rest], It, Ctx) ->
case emqx_replay_message_storage:next(It) of case emqx_replay_message_storage:next(It) of
{value, Payload, ItNext} -> {value, Payload, ItNext} ->
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, DB)]; [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)];
none -> none ->
[] []
end; end;
run_iterator_commands([{preserve, restore} | Rest], It, DB) -> run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
#{
db := DB,
replay := Replay
} = Ctx,
Serial = emqx_replay_message_storage:preserve_iterator(It), Serial = emqx_replay_message_storage:preserve_iterator(It),
{ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Serial), {ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Replay, Serial),
run_iterator_commands(Rest, ItNext, DB); run_iterator_commands(Rest, ItNext, Ctx);
run_iterator_commands([], It, _DB) -> run_iterator_commands([], It, _Ctx) ->
iterate_db(It). iterate_db(It).
store_shim(Shim, Messages) -> store_shim(Shim, Messages) ->
@ -247,10 +254,10 @@ store_shim(Shim, Messages) ->
Messages Messages
). ).
iterate_shim(Shim, TopicFilter, StartTime) -> iterate_shim(Shim, Iteration) ->
lists:map( lists:map(
fun binary_to_term/1, fun binary_to_term/1,
emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime) emqx_replay_message_storage_shim:iterate(Shim, Iteration)
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -260,7 +267,7 @@ iterate_shim(Shim, TopicFilter, StartTime) ->
open_db(Filepath, Options) -> open_db(Filepath, Options) ->
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
{Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options), {Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options),
DB = emqx_replay_message_storage:open(Handle, ?GEN_ID, CFRefs, Schema), DB = emqx_replay_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
{DB, Handle}. {DB, Handle}.
close_db(Handle) -> close_db(Handle) ->
@ -384,7 +391,7 @@ keyspace_filter() ->
?LET( ?LET(
{TopicFilter, StartTime, Keymapper}, {TopicFilter, StartTime, Keymapper},
{topic_filter(), pos_integer(), keymapper()}, {topic_filter(), pos_integer(), keymapper()},
emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper) emqx_replay_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
). ).
messages(Topic) -> messages(Topic) ->