feat(ds): Make iteration fully generation-aware

This commit is contained in:
Andrew Mayorov 2023-01-17 17:20:17 +03:00 committed by ieQu1
parent bf9d57f6a9
commit 83d1606d00
2 changed files with 133 additions and 42 deletions

View File

@ -61,6 +61,10 @@
}).
-record(it, {
zone :: emqx_types:zone(),
gen :: gen_id(),
filter :: emqx_topic:words(),
start_time :: emqx_replay:time(),
module :: module(),
data :: term()
}).
@ -95,31 +99,36 @@ create_generation(Zone, Since, Config = {_Module, _Options}) ->
-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
ok | {error, _TODO}.
store(Zone, GUID, Time, Topic, Msg) ->
#{module := Mod, data := Data} = meta_lookup_gen(Zone, Time),
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) ->
{ok, _TODO} | {error, _TODO}.
{ok, iterator()} | {error, _TODO}.
make_iterator(Zone, TopicFilter, StartTime) ->
%% TODO: this is not supposed to work like this. Just a mock-up
#{module := Mod, data := Data} = meta_lookup_gen(Zone, StartTime),
case Mod:make_iterator(Data, TopicFilter, StartTime) of
{ok, It} ->
{ok, #it{
module = Mod,
data = It
}};
Err ->
Err
end.
{GenId, Gen} = meta_lookup_gen(Zone, StartTime),
open_iterator(Gen, #it{
zone = Zone,
gen = GenId,
filter = TopicFilter,
start_time = StartTime
}).
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(#it{module = Mod, data = It0}) ->
case Mod:next(It0) of
{value, Val, It} ->
{value, Val, #it{module = Mod, data = It}};
Other ->
Other
next(It = #it{module = Mod, data = ItData}) ->
case Mod:next(ItData) of
{value, Val, ItDataNext} ->
{value, Val, It#it{data = ItDataNext}};
{error, _} = Error ->
Error;
none ->
case open_next_iterator(It) of
{ok, ItNext} ->
next(ItNext);
{error, _} = Error ->
Error;
none ->
none
end
end.
%%================================================================================
@ -232,6 +241,24 @@ open_gen(
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
Gen#{data := DB}.
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
open_next_iterator(It = #it{zone = Zone, gen = GenId}) ->
open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}).
open_next_iterator(undefined, _It) ->
none;
open_next_iterator(Gen = #{}, It) ->
open_iterator(Gen, It).
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}.
open_iterator(#{module := Mod, data := Data}, It = #it{}) ->
case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
Err
end.
%% Functions for dealing with the metadata stored persistently in rocksdb
-define(CURRENT_GEN, <<"current">>).
@ -282,31 +309,42 @@ meta_register_gen(Zone, GenId, Gen) ->
ok = meta_put(Zone, GenId, [Gen | Gs]),
ok = meta_put(Zone, current, GenId).
-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> generation().
-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Zone, Time) ->
% TODO
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
% towards a "no".
GenId = meta_lookup(Zone, current),
Gens = meta_lookup(Zone, GenId),
[Gen | _Older] = lists:dropwhile(fun(#{since := Since}) -> Since > Time end, Gens),
Gen.
Current = meta_lookup(Zone, current),
Gens = meta_lookup(Zone, Current),
find_gen(Time, Current, Gens).
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
{GenId, Gen};
find_gen(Time, GenId, [_Gen | Rest]) ->
find_gen(Time, GenId - 1, Rest).
-spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined.
meta_get_gen(Zone, GenId) ->
case meta_lookup(Zone, GenId, []) of
[Gen | _Older] -> Gen;
[] -> undefined
end.
-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined.
meta_get_current(Zone) ->
meta_lookup(Zone, current, undefined).
-spec meta_lookup(emqx_types:zone(), _K) -> _V.
meta_lookup(Zone, GenId) ->
persistent_term:get(?PERSISTENT_TERM(Zone, GenId)).
meta_lookup(Zone, K) ->
persistent_term:get(?PERSISTENT_TERM(Zone, K)).
-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default.
meta_lookup(Zone, GenId, Default) ->
persistent_term:get(?PERSISTENT_TERM(Zone, GenId), Default).
meta_lookup(Zone, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Zone, K), Default).
-spec meta_put(emqx_types:zone(), _K, _V) -> ok.
meta_put(Zone, GenId, Gen) ->
persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen).
meta_put(Zone, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Zone, K), V).
-spec meta_erase(emqx_types:zone()) -> ok.
meta_erase(Zone) ->

View File

@ -23,6 +23,25 @@
-define(ZONE, zone(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG,
{emqx_replay_message_storage, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 5}
}
}}
).
-define(COMPACT_CONFIG,
{emqx_replay_message_storage, #{
timestamp_bits => 16,
topic_bits_per_level => [16, 16],
epoch => 10
}}
).
%% Smoke test for opening and reopening the database
t_open(_Config) ->
ok = emqx_replay_local_store_sup:stop_zone(?ZONE),
@ -128,6 +147,49 @@ t_iterate_long_tail_wildcard(_Config) ->
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)])
).
t_create_gen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG)
),
?assertEqual(
{error, nonmonotonic},
emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG)
),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz"],
Timestamps = lists:seq(1, 100),
[
?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>))
|| Topic <- Topics, PublishedAt <- Timestamps
].
t_iterate_multigen(_Config) ->
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 100),
_ = [
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
).
store(Zone, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
@ -161,14 +223,7 @@ end_per_suite(_Config) ->
ok = application:stop(emqx_replay).
init_per_testcase(TC, Config) ->
ok = set_zone_config(zone(TC), #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 5}
}
}),
ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
Config.
@ -178,7 +233,5 @@ end_per_testcase(TC, _Config) ->
zone(TC) ->
list_to_atom(lists:concat([?MODULE, "_", TC])).
set_zone_config(Zone, Options) ->
ok = application:set_env(emqx_replay, zone_config, #{
Zone => {emqx_replay_message_storage, Options}
}).
set_zone_config(Zone, Config) ->
ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}).