feat(replay): allow to preserve / restore iterators in the db
So that we could guarantee replay consistency / availability under the assumption that nodes may be restarted or even lost occasionally.
This commit is contained in:
parent
c7aeb98466
commit
1f033f92b5
|
@ -19,6 +19,7 @@
|
||||||
-export([]).
|
-export([]).
|
||||||
|
|
||||||
-export_type([topic/0, time/0]).
|
-export_type([topic/0, time/0]).
|
||||||
|
-export_type([replay_id/0, replay/0]).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
|
@ -32,6 +33,13 @@
|
||||||
%% TODO granularity?
|
%% TODO granularity?
|
||||||
-type time() :: non_neg_integer().
|
-type time() :: non_neg_integer().
|
||||||
|
|
||||||
|
-type replay_id() :: binary().
|
||||||
|
|
||||||
|
-type replay() :: {
|
||||||
|
_TopicFilter :: topic(),
|
||||||
|
_StartTime :: time()
|
||||||
|
}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
|
@ -21,7 +21,11 @@
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
-export([create_generation/3]).
|
-export([create_generation/3]).
|
||||||
|
|
||||||
-export([store/5, make_iterator/3, next/1]).
|
-export([store/5]).
|
||||||
|
|
||||||
|
-export([make_iterator/2, next/1]).
|
||||||
|
|
||||||
|
-export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
@ -57,14 +61,14 @@
|
||||||
-record(s, {
|
-record(s, {
|
||||||
zone :: emqx_types:zone(),
|
zone :: emqx_types:zone(),
|
||||||
db :: rocksdb:db_handle(),
|
db :: rocksdb:db_handle(),
|
||||||
column_families :: cf_refs()
|
cf_iterator :: rocksdb:cf_handle(),
|
||||||
|
cf_generations :: cf_refs()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(it, {
|
-record(it, {
|
||||||
zone :: emqx_types:zone(),
|
zone :: emqx_types:zone(),
|
||||||
gen :: gen_id(),
|
gen :: gen_id(),
|
||||||
filter :: emqx_topic:words(),
|
replay :: emqx_replay:replay(),
|
||||||
start_time :: emqx_replay:time(),
|
|
||||||
module :: module(),
|
module :: module(),
|
||||||
data :: term()
|
data :: term()
|
||||||
}).
|
}).
|
||||||
|
@ -79,8 +83,17 @@
|
||||||
%% [{<<"genNN">>, #generation{}}, ...,
|
%% [{<<"genNN">>, #generation{}}, ...,
|
||||||
%% {<<"current">>, GenID}]
|
%% {<<"current">>, GenID}]
|
||||||
|
|
||||||
|
-define(DEFAULT_CF, "default").
|
||||||
-define(DEFAULT_CF_OPTS, []).
|
-define(DEFAULT_CF_OPTS, []).
|
||||||
|
|
||||||
|
-define(ITERATOR_CF, "$iterators").
|
||||||
|
|
||||||
|
%% TODO
|
||||||
|
%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`.
|
||||||
|
%% 2. Supposedly might be compressed _very_ effectively.
|
||||||
|
%% 3. `inplace_update_support`?
|
||||||
|
-define(ITERATOR_CF_OPTS, []).
|
||||||
|
|
||||||
-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}).
|
-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -102,15 +115,14 @@ store(Zone, GUID, Time, Topic, Msg) ->
|
||||||
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time),
|
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time),
|
||||||
Mod:store(Data, GUID, Time, Topic, Msg).
|
Mod:store(Data, GUID, Time, Topic, Msg).
|
||||||
|
|
||||||
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) ->
|
-spec make_iterator(emqx_types:zone(), emqx_replay:replay()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(Zone, TopicFilter, StartTime) ->
|
make_iterator(Zone, Replay = {_, StartTime}) ->
|
||||||
{GenId, Gen} = meta_lookup_gen(Zone, StartTime),
|
{GenId, Gen} = meta_lookup_gen(Zone, StartTime),
|
||||||
open_iterator(Gen, #it{
|
open_iterator(Gen, #it{
|
||||||
zone = Zone,
|
zone = Zone,
|
||||||
gen = GenId,
|
gen = GenId,
|
||||||
filter = TopicFilter,
|
replay = Replay
|
||||||
start_time = StartTime
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
|
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
|
||||||
|
@ -131,20 +143,37 @@ next(It = #it{module = Mod, data = ItData}) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec preserve_iterator(iterator(), emqx_replay:replay_id()) ->
|
||||||
|
ok | {error, _TODO}.
|
||||||
|
preserve_iterator(It = #it{}, ReplayID) ->
|
||||||
|
iterator_put_state(ReplayID, It).
|
||||||
|
|
||||||
|
-spec restore_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
|
||||||
|
{ok, iterator()} | {error, _TODO}.
|
||||||
|
restore_iterator(Zone, ReplayID) ->
|
||||||
|
case iterator_get_state(Zone, ReplayID) of
|
||||||
|
{ok, Serial} ->
|
||||||
|
restore_iterator_state(Zone, Serial);
|
||||||
|
not_found ->
|
||||||
|
{error, not_found};
|
||||||
|
{error, _Reason} = Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec discard_iterator(emqx_types:zone(), emqx_replay:replay_id()) ->
|
||||||
|
ok | {error, _TODO}.
|
||||||
|
discard_iterator(Zone, ReplayID) ->
|
||||||
|
iterator_delete(Zone, ReplayID).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behavior callbacks
|
%% behavior callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
init([Zone]) ->
|
init([Zone]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
{ok, DBHandle, CFRefs} = open_db(Zone),
|
{ok, S0} = open_db(Zone),
|
||||||
S0 = #s{
|
|
||||||
zone = Zone,
|
|
||||||
db = DBHandle,
|
|
||||||
column_families = CFRefs
|
|
||||||
},
|
|
||||||
S = ensure_current_generation(S0),
|
S = ensure_current_generation(S0),
|
||||||
read_metadata(S),
|
ok = populate_metadata(S),
|
||||||
{ok, S}.
|
{ok, S}.
|
||||||
|
|
||||||
handle_call({create_generation, Since, Config}, _From, S) ->
|
handle_call({create_generation, Since, Config}, _From, S) ->
|
||||||
|
@ -171,13 +200,16 @@ terminate(_Reason, #s{db = DB, zone = Zone}) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec read_metadata(state()) -> ok.
|
-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
|
||||||
read_metadata(S = #s{db = DBHandle}) ->
|
|
||||||
Current = schema_get_current(DBHandle),
|
|
||||||
lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)).
|
|
||||||
|
|
||||||
-spec read_metadata(gen_id(), state()) -> ok.
|
-spec populate_metadata(state()) -> ok.
|
||||||
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) ->
|
populate_metadata(S = #s{zone = Zone, db = DBHandle, cf_iterator = CFIterator}) ->
|
||||||
|
ok = meta_put(Zone, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
|
||||||
|
Current = schema_get_current(DBHandle),
|
||||||
|
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
|
||||||
|
|
||||||
|
-spec populate_metadata(gen_id(), state()) -> ok.
|
||||||
|
populate_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) ->
|
||||||
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
|
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
|
||||||
meta_register_gen(Zone, GenId, Gen).
|
meta_register_gen(Zone, GenId, Gen).
|
||||||
|
|
||||||
|
@ -193,7 +225,7 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
|
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
|
||||||
{ok, gen_id(), state()}.
|
{ok, gen_id(), state()} | {error, nonmonotonic}.
|
||||||
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
|
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
|
||||||
GenId = get_next_id(meta_get_current(Zone)),
|
GenId = get_next_id(meta_get_current(Zone)),
|
||||||
GenId = get_next_id(schema_get_current(DBHandle)),
|
GenId = get_next_id(schema_get_current(DBHandle)),
|
||||||
|
@ -211,7 +243,7 @@ create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
|
||||||
|
|
||||||
-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
|
-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
|
||||||
{ok, generation(), state()}.
|
{ok, generation(), state()}.
|
||||||
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) ->
|
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
|
||||||
% TODO: Backend implementation should ensure idempotency.
|
% TODO: Backend implementation should ensure idempotency.
|
||||||
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
|
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
|
||||||
Gen = #{
|
Gen = #{
|
||||||
|
@ -219,24 +251,38 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_familie
|
||||||
data => Schema,
|
data => Schema,
|
||||||
since => Since
|
since => Since
|
||||||
},
|
},
|
||||||
{ok, Gen, S#s{column_families = NewCFs ++ CFs}}.
|
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
|
||||||
|
|
||||||
-spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
|
-spec open_db(emqx_types:zone()) -> {ok, state()} | {error, _TODO}.
|
||||||
open_db(Zone) ->
|
open_db(Zone) ->
|
||||||
Filename = atom_to_list(Zone),
|
Filename = atom_to_list(Zone),
|
||||||
DBOptions = emqx_replay_conf:db_options(),
|
DBOptions = [
|
||||||
ColumnFamiles =
|
{create_if_missing, true},
|
||||||
|
{create_missing_column_families, true}
|
||||||
|
| emqx_replay_conf:db_options()
|
||||||
|
],
|
||||||
|
ExistingCFs =
|
||||||
case rocksdb:list_column_families(Filename, DBOptions) of
|
case rocksdb:list_column_families(Filename, DBOptions) of
|
||||||
{ok, ColumnFamiles0} ->
|
{ok, CFs} ->
|
||||||
[{I, []} || I <- ColumnFamiles0];
|
[{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
|
||||||
% DB is not present. First start
|
% DB is not present. First start
|
||||||
{error, {db_open, _}} ->
|
{error, {db_open, _}} ->
|
||||||
[{"default", ?DEFAULT_CF_OPTS}]
|
[]
|
||||||
end,
|
end,
|
||||||
case rocksdb:open(Filename, [{create_if_missing, true} | DBOptions], ColumnFamiles) of
|
ColumnFamilies = [
|
||||||
{ok, Handle, CFRefs} ->
|
{?DEFAULT_CF, ?DEFAULT_CF_OPTS},
|
||||||
{CFNames, _} = lists:unzip(ColumnFamiles),
|
{?ITERATOR_CF, ?ITERATOR_CF_OPTS}
|
||||||
{ok, Handle, lists:zip(CFNames, CFRefs)};
|
| ExistingCFs
|
||||||
|
],
|
||||||
|
case rocksdb:open(Filename, DBOptions, ColumnFamilies) of
|
||||||
|
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
|
||||||
|
{CFNames, _} = lists:unzip(ExistingCFs),
|
||||||
|
{ok, #s{
|
||||||
|
zone = Zone,
|
||||||
|
db = DBHandle,
|
||||||
|
cf_iterator = CFIterator,
|
||||||
|
cf_generations = lists:zip(CFNames, CFRefs)
|
||||||
|
}};
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
@ -245,7 +291,7 @@ open_db(Zone) ->
|
||||||
open_gen(
|
open_gen(
|
||||||
GenId,
|
GenId,
|
||||||
Gen = #{module := Mod, data := Data},
|
Gen = #{module := Mod, data := Data},
|
||||||
#s{zone = Zone, db = DBHandle, column_families = CFs}
|
#s{zone = Zone, db = DBHandle, cf_generations = CFs}
|
||||||
) ->
|
) ->
|
||||||
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
|
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
|
||||||
Gen#{data := DB}.
|
Gen#{data := DB}.
|
||||||
|
@ -261,13 +307,72 @@ open_next_iterator(Gen = #{}, It) ->
|
||||||
|
|
||||||
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
|
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
|
||||||
open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
|
open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
|
||||||
case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of
|
case Mod:make_iterator(Data, It#it.replay) of
|
||||||
{ok, ItData} ->
|
{ok, ItData} ->
|
||||||
{ok, It#it{module = Mod, data = ItData}};
|
{ok, It#it{module = Mod, data = ItData}};
|
||||||
Err ->
|
Err ->
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec open_restore_iterator(generation(), iterator(), binary()) ->
|
||||||
|
{ok, iterator()} | {error, _Reason}.
|
||||||
|
open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, Serial) ->
|
||||||
|
case Mod:restore_iterator(Data, Replay, Serial) of
|
||||||
|
{ok, ItData} ->
|
||||||
|
{ok, It#it{module = Mod, data = ItData}};
|
||||||
|
Err ->
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>).
|
||||||
|
|
||||||
|
-define(ITERATION_WRITE_OPTS, []).
|
||||||
|
-define(ITERATION_READ_OPTS, []).
|
||||||
|
|
||||||
|
iterator_get_state(Zone, ReplayID) ->
|
||||||
|
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
|
||||||
|
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
|
||||||
|
|
||||||
|
iterator_put_state(ID, It = #it{zone = Zone}) ->
|
||||||
|
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
|
||||||
|
Serial = preserve_iterator_state(It),
|
||||||
|
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
|
||||||
|
|
||||||
|
iterator_delete(Zone, ID) ->
|
||||||
|
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Zone, db),
|
||||||
|
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
|
||||||
|
|
||||||
|
preserve_iterator_state(#it{
|
||||||
|
gen = Gen,
|
||||||
|
replay = {TopicFilter, StartTime},
|
||||||
|
module = Mod,
|
||||||
|
data = ItData
|
||||||
|
}) ->
|
||||||
|
term_to_binary(#{
|
||||||
|
v => 1,
|
||||||
|
gen => Gen,
|
||||||
|
filter => TopicFilter,
|
||||||
|
start => StartTime,
|
||||||
|
st => Mod:preserve_iterator(ItData)
|
||||||
|
}).
|
||||||
|
|
||||||
|
restore_iterator_state(Zone, Serial) when is_binary(Serial) ->
|
||||||
|
restore_iterator_state(Zone, binary_to_term(Serial));
|
||||||
|
restore_iterator_state(
|
||||||
|
Zone,
|
||||||
|
#{
|
||||||
|
v := 1,
|
||||||
|
gen := Gen,
|
||||||
|
filter := TopicFilter,
|
||||||
|
start := StartTime,
|
||||||
|
st := State
|
||||||
|
}
|
||||||
|
) ->
|
||||||
|
It = #it{zone = Zone, gen = Gen, replay = {TopicFilter, StartTime}},
|
||||||
|
open_restore_iterator(meta_get_gen(Zone, Gen), It, State).
|
||||||
|
|
||||||
%% Functions for dealing with the metadata stored persistently in rocksdb
|
%% Functions for dealing with the metadata stored persistently in rocksdb
|
||||||
|
|
||||||
-define(CURRENT_GEN, <<"current">>).
|
-define(CURRENT_GEN, <<"current">>).
|
||||||
|
|
|
@ -94,12 +94,12 @@
|
||||||
-export([make_keymapper/1]).
|
-export([make_keymapper/1]).
|
||||||
|
|
||||||
-export([store/5]).
|
-export([store/5]).
|
||||||
|
-export([make_iterator/2]).
|
||||||
-export([make_iterator/3]).
|
-export([make_iterator/3]).
|
||||||
-export([make_iterator/4]).
|
|
||||||
-export([next/1]).
|
-export([next/1]).
|
||||||
|
|
||||||
-export([preserve_iterator/1]).
|
-export([preserve_iterator/1]).
|
||||||
-export([restore_iterator/2]).
|
-export([restore_iterator/3]).
|
||||||
-export([refresh_iterator/1]).
|
-export([refresh_iterator/1]).
|
||||||
|
|
||||||
%% Debug/troubleshooting:
|
%% Debug/troubleshooting:
|
||||||
|
@ -114,7 +114,7 @@
|
||||||
|
|
||||||
%% Keyspace filters
|
%% Keyspace filters
|
||||||
-export([
|
-export([
|
||||||
make_keyspace_filter/3,
|
make_keyspace_filter/2,
|
||||||
compute_initial_seek/1,
|
compute_initial_seek/1,
|
||||||
compute_next_seek/2,
|
compute_next_seek/2,
|
||||||
compute_time_seek/3,
|
compute_time_seek/3,
|
||||||
|
@ -289,20 +289,20 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
|
||||||
Value = make_message_value(Topic, MessagePayload),
|
Value = make_message_value(Topic, MessagePayload),
|
||||||
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
||||||
|
|
||||||
-spec make_iterator(db(), emqx_topic:words(), time()) ->
|
-spec make_iterator(db(), emqx_replay:replay()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(DB, TopicFilter, StartTime) ->
|
make_iterator(DB, Replay) ->
|
||||||
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
|
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
|
||||||
make_iterator(DB, TopicFilter, StartTime, Options).
|
make_iterator(DB, Replay, Options).
|
||||||
|
|
||||||
-spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) ->
|
-spec make_iterator(db(), emqx_replay:replay(), iteration_options()) ->
|
||||||
% {error, invalid_start_time}? might just start from the beginning of time
|
% {error, invalid_start_time}? might just start from the beginning of time
|
||||||
% and call it a day: client violated the contract anyway.
|
% and call it a day: client violated the contract anyway.
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) ->
|
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) ->
|
||||||
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
||||||
{ok, ITHandle} ->
|
{ok, ITHandle} ->
|
||||||
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
|
Filter = make_keyspace_filter(Replay, DB#db.keymapper),
|
||||||
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
|
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
|
||||||
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
|
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
|
||||||
{ok, #it{
|
{ok, #it{
|
||||||
|
@ -342,26 +342,23 @@ next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec preserve_iterator(iterator()) -> binary().
|
-spec preserve_iterator(iterator()) -> binary().
|
||||||
preserve_iterator(#it{cursor = Cursor, filter = Filter}) ->
|
preserve_iterator(#it{cursor = Cursor}) ->
|
||||||
State = #{
|
State = #{
|
||||||
v => 1,
|
v => 1,
|
||||||
cursor => Cursor,
|
cursor => Cursor
|
||||||
filter => Filter#filter.topic_filter,
|
|
||||||
stime => Filter#filter.start_time
|
|
||||||
},
|
},
|
||||||
term_to_binary(State).
|
term_to_binary(State).
|
||||||
|
|
||||||
-spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}.
|
-spec restore_iterator(db(), emqx_replay:replay(), binary()) ->
|
||||||
restore_iterator(DB, Serial) when is_binary(Serial) ->
|
{ok, iterator()} | {error, _TODO}.
|
||||||
|
restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
|
||||||
State = binary_to_term(Serial),
|
State = binary_to_term(Serial),
|
||||||
restore_iterator(DB, State);
|
restore_iterator(DB, Replay, State);
|
||||||
restore_iterator(DB, #{
|
restore_iterator(DB, Replay, #{
|
||||||
v := 1,
|
v := 1,
|
||||||
cursor := Cursor,
|
cursor := Cursor
|
||||||
filter := TopicFilter,
|
|
||||||
stime := StartTime
|
|
||||||
}) ->
|
}) ->
|
||||||
case make_iterator(DB, TopicFilter, StartTime) of
|
case make_iterator(DB, Replay) of
|
||||||
{ok, It} when Cursor == undefined ->
|
{ok, It} when Cursor == undefined ->
|
||||||
% Iterator was preserved right after it has been made.
|
% Iterator was preserved right after it has been made.
|
||||||
{ok, It};
|
{ok, It};
|
||||||
|
@ -434,8 +431,8 @@ hash(Input, Bits) ->
|
||||||
% at most 32 bits
|
% at most 32 bits
|
||||||
erlang:phash2(Input, 1 bsl Bits).
|
erlang:phash2(Input, 1 bsl Bits).
|
||||||
|
|
||||||
-spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter().
|
-spec make_keyspace_filter(emqx_replay:replay(), keymapper()) -> keyspace_filter().
|
||||||
make_keyspace_filter(TopicFilter, StartTime, Keymapper) ->
|
make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
|
||||||
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
|
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
|
||||||
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
|
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
|
||||||
TimeBitmask = compute_time_bitmask(Keymapper),
|
TimeBitmask = compute_time_bitmask(Keymapper),
|
||||||
|
|
|
@ -73,7 +73,7 @@ t_iterate(_Config) ->
|
||||||
%% Iterate through individual topics:
|
%% Iterate through individual topics:
|
||||||
[
|
[
|
||||||
begin
|
begin
|
||||||
{ok, It} = emqx_replay_local_store:make_iterator(?ZONE, Topic, 0),
|
{ok, It} = emqx_replay_local_store:make_iterator(?ZONE, {Topic, 0}),
|
||||||
Values = iterate(It),
|
Values = iterate(It),
|
||||||
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
|
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
|
||||||
end
|
end
|
||||||
|
@ -195,7 +195,7 @@ store(Zone, PublishedAt, Topic, Payload) ->
|
||||||
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
|
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
|
||||||
|
|
||||||
iterate(DB, TopicFilter, StartTime) ->
|
iterate(DB, TopicFilter, StartTime) ->
|
||||||
{ok, It} = emqx_replay_local_store:make_iterator(DB, parse_topic(TopicFilter), StartTime),
|
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
|
||||||
iterate(It).
|
iterate(It).
|
||||||
|
|
||||||
iterate(It) ->
|
iterate(It) ->
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
-export([open/0]).
|
-export([open/0]).
|
||||||
-export([close/1]).
|
-export([close/1]).
|
||||||
-export([store/5]).
|
-export([store/5]).
|
||||||
-export([iterate/3]).
|
-export([iterate/2]).
|
||||||
|
|
||||||
-type topic() :: list(binary()).
|
-type topic() :: list(binary()).
|
||||||
-type time() :: integer().
|
-type time() :: integer().
|
||||||
|
@ -41,9 +41,9 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) ->
|
||||||
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
|
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec iterate(t(), emqx_topic:words(), time()) ->
|
-spec iterate(t(), emqx_replay:replay()) ->
|
||||||
[binary()].
|
[binary()].
|
||||||
iterate(Tab, TopicFilter, StartTime) ->
|
iterate(Tab, {TopicFilter, StartTime}) ->
|
||||||
ets:foldr(
|
ets:foldr(
|
||||||
fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
|
fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
|
||||||
case emqx_topic:match(Topic, TopicFilter) of
|
case emqx_topic:match(Topic, TopicFilter) of
|
||||||
|
|
|
@ -21,6 +21,8 @@
|
||||||
|
|
||||||
-define(WORK_DIR, ["_build", "test"]).
|
-define(WORK_DIR, ["_build", "test"]).
|
||||||
-define(RUN_ID, {?MODULE, testrun_id}).
|
-define(RUN_ID, {?MODULE, testrun_id}).
|
||||||
|
|
||||||
|
-define(ZONE, ?MODULE).
|
||||||
-define(GEN_ID, 42).
|
-define(GEN_ID, 42).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -51,8 +53,7 @@ prop_next_seek_monotonic() ->
|
||||||
{topic_filter(), pos_integer(), keymapper()},
|
{topic_filter(), pos_integer(), keymapper()},
|
||||||
begin
|
begin
|
||||||
Filter = emqx_replay_message_storage:make_keyspace_filter(
|
Filter = emqx_replay_message_storage:make_keyspace_filter(
|
||||||
TopicFilter,
|
{TopicFilter, StartTime},
|
||||||
StartTime,
|
|
||||||
Keymapper
|
Keymapper
|
||||||
),
|
),
|
||||||
?FORALL(
|
?FORALL(
|
||||||
|
@ -99,8 +100,9 @@ prop_iterate_messages() ->
|
||||||
},
|
},
|
||||||
begin
|
begin
|
||||||
TopicFilter = make_topic_filter(Pattern, Topic),
|
TopicFilter = make_topic_filter(Pattern, Topic),
|
||||||
Messages = iterate_db(DB, TopicFilter, StartTime),
|
Iteration = {TopicFilter, StartTime},
|
||||||
Reference = iterate_shim(Shim, TopicFilter, StartTime),
|
Messages = iterate_db(DB, Iteration),
|
||||||
|
Reference = iterate_shim(Shim, Iteration),
|
||||||
ok = close_db(Handle),
|
ok = close_db(Handle),
|
||||||
ok = emqx_replay_message_storage_shim:close(Shim),
|
ok = emqx_replay_message_storage_shim:close(Shim),
|
||||||
?WHENFAIL(
|
?WHENFAIL(
|
||||||
|
@ -143,10 +145,11 @@ prop_iterate_eq_iterate_with_preserve_restore() ->
|
||||||
shuffled(flat([non_empty(list({preserve, restore})), list(iterate)]))
|
shuffled(flat([non_empty(list({preserve, restore})), list(iterate)]))
|
||||||
},
|
},
|
||||||
begin
|
begin
|
||||||
TopicFilter = make_topic_filter(Pat, Topic),
|
Replay = {make_topic_filter(Pat, Topic), StartTime},
|
||||||
Iterator = make_iterator(DB, TopicFilter, StartTime),
|
Iterator = make_iterator(DB, Replay),
|
||||||
Messages = run_iterator_commands(Commands, Iterator, DB),
|
Ctx = #{db => DB, replay => Replay},
|
||||||
equals(Messages, iterate_db(DB, TopicFilter, StartTime))
|
Messages = run_iterator_commands(Commands, Iterator, Ctx),
|
||||||
|
equals(Messages, iterate_db(DB, Replay))
|
||||||
end
|
end
|
||||||
)
|
)
|
||||||
end).
|
end).
|
||||||
|
@ -177,11 +180,11 @@ prop_iterate_eq_iterate_with_refresh() ->
|
||||||
pos_integer()
|
pos_integer()
|
||||||
},
|
},
|
||||||
?TIMEOUT(5000, begin
|
?TIMEOUT(5000, begin
|
||||||
TopicFilter = make_topic_filter(Pat, Topic),
|
Replay = {make_topic_filter(Pat, Topic), StartTime},
|
||||||
IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
|
IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
|
||||||
Iterator = make_iterator(DB, TopicFilter, StartTime, IterationOptions),
|
Iterator = make_iterator(DB, Replay, IterationOptions),
|
||||||
Messages = iterate_db(Iterator),
|
Messages = iterate_db(Iterator),
|
||||||
equals(Messages, iterate_db(DB, TopicFilter, StartTime))
|
equals(Messages, iterate_db(DB, Replay))
|
||||||
end)
|
end)
|
||||||
)
|
)
|
||||||
end).
|
end).
|
||||||
|
@ -205,8 +208,8 @@ store_db(DB, Messages) ->
|
||||||
Messages
|
Messages
|
||||||
).
|
).
|
||||||
|
|
||||||
iterate_db(DB, TopicFilter, StartTime) ->
|
iterate_db(DB, Iteration) ->
|
||||||
iterate_db(make_iterator(DB, TopicFilter, StartTime)).
|
iterate_db(make_iterator(DB, Iteration)).
|
||||||
|
|
||||||
iterate_db(It) ->
|
iterate_db(It) ->
|
||||||
case emqx_replay_message_storage:next(It) of
|
case emqx_replay_message_storage:next(It) of
|
||||||
|
@ -216,26 +219,30 @@ iterate_db(It) ->
|
||||||
[]
|
[]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
make_iterator(DB, TopicFilter, StartTime) ->
|
make_iterator(DB, Replay) ->
|
||||||
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime),
|
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay),
|
||||||
It.
|
It.
|
||||||
|
|
||||||
make_iterator(DB, TopicFilter, StartTime, Options) ->
|
make_iterator(DB, Replay, Options) ->
|
||||||
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime, Options),
|
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Replay, Options),
|
||||||
It.
|
It.
|
||||||
|
|
||||||
run_iterator_commands([iterate | Rest], It, DB) ->
|
run_iterator_commands([iterate | Rest], It, Ctx) ->
|
||||||
case emqx_replay_message_storage:next(It) of
|
case emqx_replay_message_storage:next(It) of
|
||||||
{value, Payload, ItNext} ->
|
{value, Payload, ItNext} ->
|
||||||
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, DB)];
|
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)];
|
||||||
none ->
|
none ->
|
||||||
[]
|
[]
|
||||||
end;
|
end;
|
||||||
run_iterator_commands([{preserve, restore} | Rest], It, DB) ->
|
run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
|
||||||
|
#{
|
||||||
|
db := DB,
|
||||||
|
replay := Replay
|
||||||
|
} = Ctx,
|
||||||
Serial = emqx_replay_message_storage:preserve_iterator(It),
|
Serial = emqx_replay_message_storage:preserve_iterator(It),
|
||||||
{ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Serial),
|
{ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Replay, Serial),
|
||||||
run_iterator_commands(Rest, ItNext, DB);
|
run_iterator_commands(Rest, ItNext, Ctx);
|
||||||
run_iterator_commands([], It, _DB) ->
|
run_iterator_commands([], It, _Ctx) ->
|
||||||
iterate_db(It).
|
iterate_db(It).
|
||||||
|
|
||||||
store_shim(Shim, Messages) ->
|
store_shim(Shim, Messages) ->
|
||||||
|
@ -247,10 +254,10 @@ store_shim(Shim, Messages) ->
|
||||||
Messages
|
Messages
|
||||||
).
|
).
|
||||||
|
|
||||||
iterate_shim(Shim, TopicFilter, StartTime) ->
|
iterate_shim(Shim, Iteration) ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun binary_to_term/1,
|
fun binary_to_term/1,
|
||||||
emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime)
|
emqx_replay_message_storage_shim:iterate(Shim, Iteration)
|
||||||
).
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -260,7 +267,7 @@ iterate_shim(Shim, TopicFilter, StartTime) ->
|
||||||
open_db(Filepath, Options) ->
|
open_db(Filepath, Options) ->
|
||||||
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
|
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
|
||||||
{Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options),
|
{Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options),
|
||||||
DB = emqx_replay_message_storage:open(Handle, ?GEN_ID, CFRefs, Schema),
|
DB = emqx_replay_message_storage:open(?ZONE, Handle, ?GEN_ID, CFRefs, Schema),
|
||||||
{DB, Handle}.
|
{DB, Handle}.
|
||||||
|
|
||||||
close_db(Handle) ->
|
close_db(Handle) ->
|
||||||
|
@ -384,7 +391,7 @@ keyspace_filter() ->
|
||||||
?LET(
|
?LET(
|
||||||
{TopicFilter, StartTime, Keymapper},
|
{TopicFilter, StartTime, Keymapper},
|
||||||
{topic_filter(), pos_integer(), keymapper()},
|
{topic_filter(), pos_integer(), keymapper()},
|
||||||
emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper)
|
emqx_replay_message_storage:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
|
||||||
).
|
).
|
||||||
|
|
||||||
messages(Topic) ->
|
messages(Topic) ->
|
||||||
|
|
Loading…
Reference in New Issue