Merge pull request #9797 from feat/rocksdb-replay-queue/cross-gen-iteration

feat: make iteration fully generation-aware
This commit is contained in:
Andrew Mayorov 2023-01-18 16:48:50 +04:00 committed by GitHub
commit c7aeb98466
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 269 additions and 92 deletions

View File

@ -5,7 +5,7 @@
{vsn, "0.1.0"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rocksdb]},
{applications, [kernel, stdlib, rocksdb, gproc]},
{mod, {emqx_replay_app, []}},
{env, []}
]}.

View File

@ -27,8 +27,10 @@
%% parsed
-type topic() :: list(binary()).
%% Timestamp
%% Earliest possible timestamp is 0.
%% TODO granularity?
-type time() :: integer().
-type time() :: non_neg_integer().
%%================================================================================
%% API funcions

View File

@ -23,6 +23,12 @@
-export([zone_iteration_options/1]).
-export([default_iteration_options/0]).
-type backend_config() ::
{emqx_replay_message_storage, emqx_replay_message_storage:options()}
| {module(), _Options}.
-export_type([backend_config/0]).
%%================================================================================
%% API funcions
%%================================================================================
@ -30,11 +36,8 @@
-define(APP, emqx_replay).
-type zone() :: emqx_types:zone().
-type config() ::
{emqx_replay_message_storage, emqx_replay_message_storage:options()}
| {module(), _Options}.
-spec zone_config(zone()) -> config().
-spec zone_config(zone()) -> backend_config().
zone_config(Zone) ->
DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()),
Zones = application:get_env(?APP, zone_config, #{}),
@ -54,7 +57,7 @@ default_iteration_options() ->
{emqx_replay_message_storage, Config} = default_zone_config(),
maps:get(iteration, Config).
-spec default_zone_config() -> config().
-spec default_zone_config() -> backend_config().
default_zone_config() ->
{emqx_replay_message_storage, #{
timestamp_bits => 64,

View File

@ -19,8 +19,9 @@
%% API:
-export([start_link/1]).
-export([create_generation/3]).
-export([make_iterator/3, store/5, next/1]).
-export([store/5, make_iterator/3, next/1]).
%% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -40,13 +41,18 @@
-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
-record(generation, {
%% Message storage generation
%% Keep in mind that instances of this type are persisted in long-term storage.
-type generation() :: #{
%% Module that handles data for the generation
module :: module(),
%% Module-specific attributes
data :: term()
% time_range :: {emqx_replay:time(), emqx_replay:time()}
}).
module := module(),
%% Module-specific data defined at generation creation time
data := term(),
%% When should this generation become active?
%% This generation should only contain messages timestamped no earlier than that.
%% The very first generation will have `since` equal 0.
since := emqx_replay:time()
}.
-record(s, {
zone :: emqx_types:zone(),
@ -55,12 +61,17 @@
}).
-record(it, {
zone :: emqx_types:zone(),
gen :: gen_id(),
filter :: emqx_topic:words(),
start_time :: emqx_replay:time(),
module :: module(),
data :: term()
}).
-type gen_id() :: 0..16#ffff.
-opaque state() :: #s{}.
-opaque iterator() :: #it{}.
%% Contents of the default column family:
@ -70,43 +81,54 @@
-define(DEFAULT_CF_OPTS, []).
-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link(emqx_types:zone()) -> {ok, pid()}.
start_link(Zone) ->
gen_server:start_link(?MODULE, [Zone], []).
gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []).
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) ->
{ok, _TODO} | {error, _TODO}.
make_iterator(Zone, TopicFilter, StartTime) ->
%% TODO: this is not supposed to work like this. Just a mock-up
#generation{module = Mod, data = Data} = meta_lookup(Zone, 0),
case Mod:make_iterator(Data, TopicFilter, StartTime) of
{ok, It} ->
{ok, #it{
module = Mod,
data = It
}};
Err ->
Err
end.
-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
{ok, gen_id()} | {error, nonmonotonic}.
create_generation(Zone, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(Zone), {create_generation, Since, Config}).
-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
ok | {error, _TODO}.
store(Zone, GUID, Time, Topic, Msg) ->
%% TODO: this is not supposed to work like this. Just a mock-up
#generation{module = Mod, data = Data} = meta_lookup(Zone, 0),
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(Zone, TopicFilter, StartTime) ->
{GenId, Gen} = meta_lookup_gen(Zone, StartTime),
open_iterator(Gen, #it{
zone = Zone,
gen = GenId,
filter = TopicFilter,
start_time = StartTime
}).
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(#it{module = Mod, data = It0}) ->
case Mod:next(It0) of
{value, Val, It} ->
{value, Val, #it{module = Mod, data = It}};
Other ->
Other
next(It = #it{module = Mod, data = ItData}) ->
case Mod:next(ItData) of
{value, Val, ItDataNext} ->
{value, Val, It#it{data = ItDataNext}};
{error, _} = Error ->
Error;
none ->
case open_next_iterator(It) of
{ok, ItNext} ->
next(ItNext);
{error, _} = Error ->
Error;
none ->
none
end
end.
%%================================================================================
@ -125,6 +147,13 @@ init([Zone]) ->
read_metadata(S),
{ok, S}.
handle_call({create_generation, Since, Config}, _From, S) ->
case create_new_gen(Since, Config, S) of
{ok, GenId, NS} ->
{reply, {ok, GenId}, NS};
{error, _} = Error ->
{reply, Error, S}
end;
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
@ -142,41 +171,55 @@ terminate(_Reason, #s{db = DB, zone = Zone}) ->
%% Internal functions
%%================================================================================
-spec read_metadata(#s{}) -> ok.
read_metadata(S) ->
%% TODO: just a mockup to make the existing tests pass
read_metadata(0, S).
-spec read_metadata(state()) -> ok.
read_metadata(S = #s{db = DBHandle}) ->
Current = schema_get_current(DBHandle),
lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)).
-spec read_metadata(gen_id(), #s{}) -> ok.
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId),
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
meta_put(Zone, GenId, Gen#generation{data = DB}).
-spec read_metadata(gen_id(), state()) -> ok.
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) ->
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
meta_register_gen(Zone, GenId, Gen).
-spec ensure_current_generation(#s{}) -> #s{}.
ensure_current_generation(S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
-spec ensure_current_generation(state()) -> state().
ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
case schema_get_current(DBHandle) of
undefined ->
GenId = 0,
ok = schema_put_current(DBHandle, GenId),
create_new_generation_schema(GenId, S);
Config = emqx_replay_conf:zone_config(Zone),
{ok, _, NS} = create_new_gen(0, Config, S),
NS;
_GenId ->
S
end.
-spec create_new_generation_schema(gen_id(), #s{}) -> #s{}.
create_new_generation_schema(
GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}
) ->
{Module, Options} = emqx_replay_conf:zone_config(Zone),
{NewGenData, NewCFs} = Module:create_new(DBHandle, GenId, Options),
NewGen = #generation{
module = Module,
data = NewGenData
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, gen_id(), state()}.
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Zone)),
GenId = get_next_id(schema_get_current(DBHandle)),
case is_gen_valid(Zone, GenId, Since) of
ok ->
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, Gen),
ok = schema_put_current(DBHandle, GenId),
ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)),
{ok, GenId, NS};
{error, _} = Error ->
Error
end.
-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, generation(), state()}.
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) ->
% TODO: Backend implementation should ensure idempotency.
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
Gen = #{
module => Module,
data => Schema,
since => Since
},
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, NewGen),
S#s{column_families = NewCFs ++ CFs}.
{ok, Gen, S#s{column_families = NewCFs ++ CFs}}.
-spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
open_db(Zone) ->
@ -198,18 +241,45 @@ open_db(Zone) ->
Error
end.
-spec open_gen(gen_id(), generation(), state()) -> generation().
open_gen(
GenId,
Gen = #{module := Mod, data := Data},
#s{zone = Zone, db = DBHandle, column_families = CFs}
) ->
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
Gen#{data := DB}.
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
open_next_iterator(It = #it{zone = Zone, gen = GenId}) ->
open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}).
open_next_iterator(undefined, _It) ->
none;
open_next_iterator(Gen = #{}, It) ->
open_iterator(Gen, It).
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
Err
end.
%% Functions for dealing with the metadata stored persistently in rocksdb
-define(CURRENT_GEN, <<"current">>).
-define(SCHEMA_WRITE_OPTS, []).
-define(SCHEMA_READ_OPTS, []).
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}.
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation().
schema_get_gen(DBHandle, GenId) ->
{ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS),
binary_to_term(Bin).
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}.
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}.
schema_put_gen(DBHandle, GenId, Gen) ->
rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
@ -238,13 +308,52 @@ schema_gen_key(N) ->
-define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}).
-spec meta_lookup(emqx_types:zone(), gen_id()) -> #generation{}.
meta_lookup(Zone, GenId) ->
persistent_term:get(?PERSISTENT_TERM(Zone, GenId)).
-spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok.
meta_register_gen(Zone, GenId, Gen) ->
Gs =
case GenId > 0 of
true -> meta_lookup(Zone, GenId - 1);
false -> []
end,
ok = meta_put(Zone, GenId, [Gen | Gs]),
ok = meta_put(Zone, current, GenId).
-spec meta_put(emqx_types:zone(), gen_id(), #generation{}) -> ok.
meta_put(Zone, GenId, Gen) ->
persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen).
-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Zone, Time) ->
% TODO
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
% towards a "no".
Current = meta_lookup(Zone, current),
Gens = meta_lookup(Zone, Current),
find_gen(Time, Current, Gens).
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
{GenId, Gen};
find_gen(Time, GenId, [_Gen | Rest]) ->
find_gen(Time, GenId - 1, Rest).
-spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined.
meta_get_gen(Zone, GenId) ->
case meta_lookup(Zone, GenId, []) of
[Gen | _Older] -> Gen;
[] -> undefined
end.
-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined.
meta_get_current(Zone) ->
meta_lookup(Zone, current, undefined).
-spec meta_lookup(emqx_types:zone(), _K) -> _V.
meta_lookup(Zone, K) ->
persistent_term:get(?PERSISTENT_TERM(Zone, K)).
-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default.
meta_lookup(Zone, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Zone, K), Default).
-spec meta_put(emqx_types:zone(), _K, _V) -> ok.
meta_put(Zone, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Zone, K), V).
-spec meta_erase(emqx_types:zone()) -> ok.
meta_erase(Zone) ->
@ -256,6 +365,20 @@ meta_erase(Zone) ->
-undef(PERSISTENT_TERM).
get_next_id(undefined) -> 0;
get_next_id(GenId) -> GenId + 1.
is_gen_valid(Zone, GenId, Since) when GenId > 0 ->
[GenPrev | _] = meta_lookup(Zone, GenId - 1),
case GenPrev of
#{since := SincePrev} when Since > SincePrev ->
ok;
#{} ->
{error, nonmonotonic}
end;
is_gen_valid(_Zone, 0, 0) ->
ok.
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
%% store_cfs(DBHandle, CFRefs) ->
%% lists:foreach(

View File

@ -140,11 +140,8 @@
%% Type declarations
%%================================================================================
%% parsed
-type topic() :: list(binary()).
%% TODO granularity?
-type time() :: integer().
-type topic() :: emqx_replay:topic().
-type time() :: emqx_replay:time().
%% Number of bits
-type bits() :: non_neg_integer().
@ -292,20 +289,19 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
Value = make_message_value(Topic, MessagePayload),
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-spec make_iterator(db(), emqx_topic:words(), time() | earliest) ->
-spec make_iterator(db(), emqx_topic:words(), time()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(DB, TopicFilter, StartTime) ->
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
make_iterator(DB, TopicFilter, StartTime, Options).
-spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) ->
-spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time
% and call it a day: client violated the contract anyway.
{ok, iterator()} | {error, _TODO}.
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) ->
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
{ok, ITHandle} ->
% TODO earliest
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),

View File

@ -23,6 +23,25 @@
-define(ZONE, zone(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG,
{emqx_replay_message_storage, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 5}
}
}}
).
-define(COMPACT_CONFIG,
{emqx_replay_message_storage, #{
timestamp_bits => 16,
topic_bits_per_level => [16, 16],
epoch => 10
}}
).
%% Smoke test for opening and reopening the database
t_open(_Config) ->
ok = emqx_replay_local_store_sup:stop_zone(?ZONE),
@ -128,6 +147,49 @@ t_iterate_long_tail_wildcard(_Config) ->
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)])
).
t_create_gen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG)
),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG)
),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz"],
Timestamps = lists:seq(1, 100),
[
?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>))
|| Topic <- Topics, PublishedAt <- Timestamps
].
t_iterate_multigen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 100),
_ = [
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
).
store(Zone, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
@ -161,14 +223,7 @@ end_per_suite(_Config) ->
ok = application:stop(emqx_replay).
init_per_testcase(TC, Config) ->
ok = set_zone_config(zone(TC), #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 5}
}
}),
ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
Config.
@ -176,9 +231,7 @@ end_per_testcase(TC, _Config) ->
ok = emqx_replay_local_store_sup:stop_zone(zone(TC)).
zone(TC) ->
list_to_atom(?MODULE_STRING ++ atom_to_list(TC)).
list_to_atom(lists:concat([?MODULE, "_", TC])).
set_zone_config(Zone, Options) ->
ok = application:set_env(emqx_replay, zone_config, #{
Zone => {emqx_replay_message_storage, Options}
}).
set_zone_config(Zone, Config) ->
ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}).