diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 35a31e65c..77f0b2924 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -61,6 +61,10 @@ }). -record(it, { + zone :: emqx_types:zone(), + gen :: gen_id(), + filter :: emqx_topic:words(), + start_time :: emqx_replay:time(), module :: module(), data :: term() }). @@ -95,31 +99,36 @@ create_generation(Zone, Since, Config = {_Module, _Options}) -> -spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> ok | {error, _TODO}. store(Zone, GUID, Time, Topic, Msg) -> - #{module := Mod, data := Data} = meta_lookup_gen(Zone, Time), + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time), Mod:store(Data, GUID, Time, Topic, Msg). -spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) -> - {ok, _TODO} | {error, _TODO}. + {ok, iterator()} | {error, _TODO}. make_iterator(Zone, TopicFilter, StartTime) -> - %% TODO: this is not supposed to work like this. Just a mock-up - #{module := Mod, data := Data} = meta_lookup_gen(Zone, StartTime), - case Mod:make_iterator(Data, TopicFilter, StartTime) of - {ok, It} -> - {ok, #it{ - module = Mod, - data = It - }}; - Err -> - Err - end. + {GenId, Gen} = meta_lookup_gen(Zone, StartTime), + open_iterator(Gen, #it{ + zone = Zone, + gen = GenId, + filter = TopicFilter, + start_time = StartTime + }). -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(#it{module = Mod, data = It0}) -> - case Mod:next(It0) of - {value, Val, It} -> - {value, Val, #it{module = Mod, data = It}}; - Other -> - Other +next(It = #it{module = Mod, data = ItData}) -> + case Mod:next(ItData) of + {value, Val, ItDataNext} -> + {value, Val, It#it{data = ItDataNext}}; + {error, _} = Error -> + Error; + none -> + case open_next_iterator(It) of + {ok, ItNext} -> + next(ItNext); + {error, _} = Error -> + Error; + none -> + none + end end. %%================================================================================ @@ -232,6 +241,24 @@ open_gen( DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), Gen#{data := DB}. +-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. +open_next_iterator(It = #it{zone = Zone, gen = GenId}) -> + open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}). + +open_next_iterator(undefined, _It) -> + none; +open_next_iterator(Gen = #{}, It) -> + open_iterator(Gen, It). + +-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. +open_iterator(#{module := Mod, data := Data}, It = #it{}) -> + case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of + {ok, ItData} -> + {ok, It#it{module = Mod, data = ItData}}; + Err -> + Err + end. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). @@ -282,31 +309,42 @@ meta_register_gen(Zone, GenId, Gen) -> ok = meta_put(Zone, GenId, [Gen | Gs]), ok = meta_put(Zone, current, GenId). --spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> generation(). +-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}. meta_lookup_gen(Zone, Time) -> % TODO % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning % towards a "no". - GenId = meta_lookup(Zone, current), - Gens = meta_lookup(Zone, GenId), - [Gen | _Older] = lists:dropwhile(fun(#{since := Since}) -> Since > Time end, Gens), - Gen. + Current = meta_lookup(Zone, current), + Gens = meta_lookup(Zone, Current), + find_gen(Time, Current, Gens). + +find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> + {GenId, Gen}; +find_gen(Time, GenId, [_Gen | Rest]) -> + find_gen(Time, GenId - 1, Rest). + +-spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined. +meta_get_gen(Zone, GenId) -> + case meta_lookup(Zone, GenId, []) of + [Gen | _Older] -> Gen; + [] -> undefined + end. -spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined. meta_get_current(Zone) -> meta_lookup(Zone, current, undefined). -spec meta_lookup(emqx_types:zone(), _K) -> _V. -meta_lookup(Zone, GenId) -> - persistent_term:get(?PERSISTENT_TERM(Zone, GenId)). +meta_lookup(Zone, K) -> + persistent_term:get(?PERSISTENT_TERM(Zone, K)). -spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default. -meta_lookup(Zone, GenId, Default) -> - persistent_term:get(?PERSISTENT_TERM(Zone, GenId), Default). +meta_lookup(Zone, K, Default) -> + persistent_term:get(?PERSISTENT_TERM(Zone, K), Default). -spec meta_put(emqx_types:zone(), _K, _V) -> ok. -meta_put(Zone, GenId, Gen) -> - persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen). +meta_put(Zone, K, V) -> + persistent_term:put(?PERSISTENT_TERM(Zone, K), V). -spec meta_erase(emqx_types:zone()) -> ok. meta_erase(Zone) -> diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index d3518d780..edda0f7f6 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -23,6 +23,25 @@ -define(ZONE, zone(?FUNCTION_NAME)). +-define(DEFAULT_CONFIG, + {emqx_replay_message_storage, #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 32, 16], + epoch => 5, + iteration => #{ + iterator_refresh => {every, 5} + } + }} +). + +-define(COMPACT_CONFIG, + {emqx_replay_message_storage, #{ + timestamp_bits => 16, + topic_bits_per_level => [16, 16], + epoch => 10 + }} +). + %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_replay_local_store_sup:stop_zone(?ZONE), @@ -128,6 +147,49 @@ t_iterate_long_tail_wildcard(_Config) -> lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)]) ). +t_create_gen(_Config) -> + {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG), + ?assertEqual( + {error, nonmonotonic}, + emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG) + ), + ?assertEqual( + {error, nonmonotonic}, + emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG) + ), + {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz"], + Timestamps = lists:seq(1, 100), + [ + ?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>)) + || Topic <- Topics, PublishedAt <- Timestamps + ]. + +t_iterate_multigen(_Config) -> + {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], + Timestamps = lists:seq(1, 100), + _ = [ + store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)]) + ). + store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). @@ -161,14 +223,7 @@ end_per_suite(_Config) -> ok = application:stop(emqx_replay). init_per_testcase(TC, Config) -> - ok = set_zone_config(zone(TC), #{ - timestamp_bits => 64, - topic_bits_per_level => [8, 8, 32, 16], - epoch => 5, - iteration => #{ - iterator_refresh => {every, 5} - } - }), + ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. @@ -178,7 +233,5 @@ end_per_testcase(TC, _Config) -> zone(TC) -> list_to_atom(lists:concat([?MODULE, "_", TC])). -set_zone_config(Zone, Options) -> - ok = application:set_env(emqx_replay, zone_config, #{ - Zone => {emqx_replay_message_storage, Options} - }). +set_zone_config(Zone, Config) -> + ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}).