diff --git a/apps/emqx_replay/src/emqx_replay.app.src b/apps/emqx_replay/src/emqx_replay.app.src index 7769e82e9..9c00a78ca 100644 --- a/apps/emqx_replay/src/emqx_replay.app.src +++ b/apps/emqx_replay/src/emqx_replay.app.src @@ -5,7 +5,7 @@ {vsn, "0.1.0"}, {modules, []}, {registered, []}, - {applications, [kernel, stdlib, rocksdb]}, + {applications, [kernel, stdlib, rocksdb, gproc]}, {mod, {emqx_replay_app, []}}, {env, []} ]}. diff --git a/apps/emqx_replay/src/emqx_replay.erl b/apps/emqx_replay/src/emqx_replay.erl index ee83e35d9..fb1ec39c6 100644 --- a/apps/emqx_replay/src/emqx_replay.erl +++ b/apps/emqx_replay/src/emqx_replay.erl @@ -27,8 +27,10 @@ %% parsed -type topic() :: list(binary()). +%% Timestamp +%% Earliest possible timestamp is 0. %% TODO granularity? --type time() :: integer(). +-type time() :: non_neg_integer(). %%================================================================================ %% API funcions diff --git a/apps/emqx_replay/src/emqx_replay_conf.erl b/apps/emqx_replay/src/emqx_replay_conf.erl index 57ba87ddf..46fa53867 100644 --- a/apps/emqx_replay/src/emqx_replay_conf.erl +++ b/apps/emqx_replay/src/emqx_replay_conf.erl @@ -23,6 +23,12 @@ -export([zone_iteration_options/1]). -export([default_iteration_options/0]). +-type backend_config() :: + {emqx_replay_message_storage, emqx_replay_message_storage:options()} + | {module(), _Options}. + +-export_type([backend_config/0]). + %%================================================================================ %% API funcions %%================================================================================ @@ -30,11 +36,8 @@ -define(APP, emqx_replay). -type zone() :: emqx_types:zone(). --type config() :: - {emqx_replay_message_storage, emqx_replay_message_storage:options()} - | {module(), _Options}. --spec zone_config(zone()) -> config(). +-spec zone_config(zone()) -> backend_config(). zone_config(Zone) -> DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()), Zones = application:get_env(?APP, zone_config, #{}), @@ -54,7 +57,7 @@ default_iteration_options() -> {emqx_replay_message_storage, Config} = default_zone_config(), maps:get(iteration, Config). --spec default_zone_config() -> config(). +-spec default_zone_config() -> backend_config(). default_zone_config() -> {emqx_replay_message_storage, #{ timestamp_bits => 64, diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 15a400a92..7148308b2 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -19,8 +19,9 @@ %% API: -export([start_link/1]). +-export([create_generation/3]). --export([make_iterator/3, store/5, next/1]). +-export([store/5, make_iterator/3, next/1]). %% behavior callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -40,13 +41,18 @@ -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. --record(generation, { +%% Message storage generation +%% Keep in mind that instances of this type are persisted in long-term storage. +-type generation() :: #{ %% Module that handles data for the generation - module :: module(), - %% Module-specific attributes - data :: term() - % time_range :: {emqx_replay:time(), emqx_replay:time()} -}). + module := module(), + %% Module-specific data defined at generation creation time + data := term(), + %% When should this generation become active? + %% This generation should only contain messages timestamped no earlier than that. + %% The very first generation will have `since` equal 0. + since := emqx_replay:time() +}. -record(s, { zone :: emqx_types:zone(), @@ -55,12 +61,17 @@ }). -record(it, { + zone :: emqx_types:zone(), + gen :: gen_id(), + filter :: emqx_topic:words(), + start_time :: emqx_replay:time(), module :: module(), data :: term() }). -type gen_id() :: 0..16#ffff. +-opaque state() :: #s{}. -opaque iterator() :: #it{}. %% Contents of the default column family: @@ -70,43 +81,54 @@ -define(DEFAULT_CF_OPTS, []). +-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}). + %%================================================================================ %% API funcions %%================================================================================ -spec start_link(emqx_types:zone()) -> {ok, pid()}. start_link(Zone) -> - gen_server:start_link(?MODULE, [Zone], []). + gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []). --spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) -> - {ok, _TODO} | {error, _TODO}. -make_iterator(Zone, TopicFilter, StartTime) -> - %% TODO: this is not supposed to work like this. Just a mock-up - #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), - case Mod:make_iterator(Data, TopicFilter, StartTime) of - {ok, It} -> - {ok, #it{ - module = Mod, - data = It - }}; - Err -> - Err - end. +-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> + {ok, gen_id()} | {error, nonmonotonic}. +create_generation(Zone, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Zone), {create_generation, Since, Config}). -spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> ok | {error, _TODO}. store(Zone, GUID, Time, Topic, Msg) -> - %% TODO: this is not supposed to work like this. Just a mock-up - #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time), Mod:store(Data, GUID, Time, Topic, Msg). +-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) -> + {ok, iterator()} | {error, _TODO}. +make_iterator(Zone, TopicFilter, StartTime) -> + {GenId, Gen} = meta_lookup_gen(Zone, StartTime), + open_iterator(Gen, #it{ + zone = Zone, + gen = GenId, + filter = TopicFilter, + start_time = StartTime + }). + -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(#it{module = Mod, data = It0}) -> - case Mod:next(It0) of - {value, Val, It} -> - {value, Val, #it{module = Mod, data = It}}; - Other -> - Other +next(It = #it{module = Mod, data = ItData}) -> + case Mod:next(ItData) of + {value, Val, ItDataNext} -> + {value, Val, It#it{data = ItDataNext}}; + {error, _} = Error -> + Error; + none -> + case open_next_iterator(It) of + {ok, ItNext} -> + next(ItNext); + {error, _} = Error -> + Error; + none -> + none + end end. %%================================================================================ @@ -125,6 +147,13 @@ init([Zone]) -> read_metadata(S), {ok, S}. +handle_call({create_generation, Since, Config}, _From, S) -> + case create_new_gen(Since, Config, S) of + {ok, GenId, NS} -> + {reply, {ok, GenId}, NS}; + {error, _} = Error -> + {reply, Error, S} + end; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -142,41 +171,55 @@ terminate(_Reason, #s{db = DB, zone = Zone}) -> %% Internal functions %%================================================================================ --spec read_metadata(#s{}) -> ok. -read_metadata(S) -> - %% TODO: just a mockup to make the existing tests pass - read_metadata(0, S). +-spec read_metadata(state()) -> ok. +read_metadata(S = #s{db = DBHandle}) -> + Current = schema_get_current(DBHandle), + lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)). --spec read_metadata(gen_id(), #s{}) -> ok. -read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> - Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), - DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), - meta_put(Zone, GenId, Gen#generation{data = DB}). +-spec read_metadata(gen_id(), state()) -> ok. +read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) -> + Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), + meta_register_gen(Zone, GenId, Gen). --spec ensure_current_generation(#s{}) -> #s{}. -ensure_current_generation(S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> +-spec ensure_current_generation(state()) -> state(). +ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> - GenId = 0, - ok = schema_put_current(DBHandle, GenId), - create_new_generation_schema(GenId, S); + Config = emqx_replay_conf:zone_config(Zone), + {ok, _, NS} = create_new_gen(0, Config, S), + NS; _GenId -> S end. --spec create_new_generation_schema(gen_id(), #s{}) -> #s{}. -create_new_generation_schema( - GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs} -) -> - {Module, Options} = emqx_replay_conf:zone_config(Zone), - {NewGenData, NewCFs} = Module:create_new(DBHandle, GenId, Options), - NewGen = #generation{ - module = Module, - data = NewGenData +-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> + {ok, gen_id(), state()}. +create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> + GenId = get_next_id(meta_get_current(Zone)), + GenId = get_next_id(schema_get_current(DBHandle)), + case is_gen_valid(Zone, GenId, Since) of + ok -> + {ok, Gen, NS} = create_gen(GenId, Since, Config, S), + %% TODO: Transaction? Column family creation can't be transactional, anyway. + ok = schema_put_gen(DBHandle, GenId, Gen), + ok = schema_put_current(DBHandle, GenId), + ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)), + {ok, GenId, NS}; + {error, _} = Error -> + Error + end. + +-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> + {ok, generation(), state()}. +create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) -> + % TODO: Backend implementation should ensure idempotency. + {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), + Gen = #{ + module => Module, + data => Schema, + since => Since }, - %% TODO: Transaction? Column family creation can't be transactional, anyway. - ok = schema_put_gen(DBHandle, GenId, NewGen), - S#s{column_families = NewCFs ++ CFs}. + {ok, Gen, S#s{column_families = NewCFs ++ CFs}}. -spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. open_db(Zone) -> @@ -198,18 +241,45 @@ open_db(Zone) -> Error end. +-spec open_gen(gen_id(), generation(), state()) -> generation(). +open_gen( + GenId, + Gen = #{module := Mod, data := Data}, + #s{zone = Zone, db = DBHandle, column_families = CFs} +) -> + DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), + Gen#{data := DB}. + +-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. +open_next_iterator(It = #it{zone = Zone, gen = GenId}) -> + open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}). + +open_next_iterator(undefined, _It) -> + none; +open_next_iterator(Gen = #{}, It) -> + open_iterator(Gen, It). + +-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. +open_iterator(#{module := Mod, data := Data}, It = #it{}) -> + case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of + {ok, ItData} -> + {ok, It#it{module = Mod, data = ItData}}; + Err -> + Err + end. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). -define(SCHEMA_WRITE_OPTS, []). -define(SCHEMA_READ_OPTS, []). --spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}. +-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation(). schema_get_gen(DBHandle, GenId) -> {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), binary_to_term(Bin). --spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}. +-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}. schema_put_gen(DBHandle, GenId, Gen) -> rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). @@ -238,13 +308,52 @@ schema_gen_key(N) -> -define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}). --spec meta_lookup(emqx_types:zone(), gen_id()) -> #generation{}. -meta_lookup(Zone, GenId) -> - persistent_term:get(?PERSISTENT_TERM(Zone, GenId)). +-spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok. +meta_register_gen(Zone, GenId, Gen) -> + Gs = + case GenId > 0 of + true -> meta_lookup(Zone, GenId - 1); + false -> [] + end, + ok = meta_put(Zone, GenId, [Gen | Gs]), + ok = meta_put(Zone, current, GenId). --spec meta_put(emqx_types:zone(), gen_id(), #generation{}) -> ok. -meta_put(Zone, GenId, Gen) -> - persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen). +-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}. +meta_lookup_gen(Zone, Time) -> + % TODO + % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning + % towards a "no". + Current = meta_lookup(Zone, current), + Gens = meta_lookup(Zone, Current), + find_gen(Time, Current, Gens). + +find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> + {GenId, Gen}; +find_gen(Time, GenId, [_Gen | Rest]) -> + find_gen(Time, GenId - 1, Rest). + +-spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined. +meta_get_gen(Zone, GenId) -> + case meta_lookup(Zone, GenId, []) of + [Gen | _Older] -> Gen; + [] -> undefined + end. + +-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined. +meta_get_current(Zone) -> + meta_lookup(Zone, current, undefined). + +-spec meta_lookup(emqx_types:zone(), _K) -> _V. +meta_lookup(Zone, K) -> + persistent_term:get(?PERSISTENT_TERM(Zone, K)). + +-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default. +meta_lookup(Zone, K, Default) -> + persistent_term:get(?PERSISTENT_TERM(Zone, K), Default). + +-spec meta_put(emqx_types:zone(), _K, _V) -> ok. +meta_put(Zone, K, V) -> + persistent_term:put(?PERSISTENT_TERM(Zone, K), V). -spec meta_erase(emqx_types:zone()) -> ok. meta_erase(Zone) -> @@ -256,6 +365,20 @@ meta_erase(Zone) -> -undef(PERSISTENT_TERM). +get_next_id(undefined) -> 0; +get_next_id(GenId) -> GenId + 1. + +is_gen_valid(Zone, GenId, Since) when GenId > 0 -> + [GenPrev | _] = meta_lookup(Zone, GenId - 1), + case GenPrev of + #{since := SincePrev} when Since > SincePrev -> + ok; + #{} -> + {error, nonmonotonic} + end; +is_gen_valid(_Zone, 0, 0) -> + ok. + %% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. %% store_cfs(DBHandle, CFRefs) -> %% lists:foreach( diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index f2a6afaa6..fb96863d1 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -140,11 +140,8 @@ %% Type declarations %%================================================================================ -%% parsed --type topic() :: list(binary()). - -%% TODO granularity? --type time() :: integer(). +-type topic() :: emqx_replay:topic(). +-type time() :: emqx_replay:time(). %% Number of bits -type bits() :: non_neg_integer(). @@ -292,20 +289,19 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> +-spec make_iterator(db(), emqx_topic:words(), time()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, TopicFilter, StartTime) -> Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), make_iterator(DB, TopicFilter, StartTime, Options). --spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> +-spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - % TODO earliest Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index eee802e69..edda0f7f6 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -23,6 +23,25 @@ -define(ZONE, zone(?FUNCTION_NAME)). +-define(DEFAULT_CONFIG, + {emqx_replay_message_storage, #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 32, 16], + epoch => 5, + iteration => #{ + iterator_refresh => {every, 5} + } + }} +). + +-define(COMPACT_CONFIG, + {emqx_replay_message_storage, #{ + timestamp_bits => 16, + topic_bits_per_level => [16, 16], + epoch => 10 + }} +). + %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_replay_local_store_sup:stop_zone(?ZONE), @@ -128,6 +147,49 @@ t_iterate_long_tail_wildcard(_Config) -> lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)]) ). +t_create_gen(_Config) -> + {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG), + ?assertEqual( + {error, nonmonotonic}, + emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG) + ), + ?assertEqual( + {error, nonmonotonic}, + emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG) + ), + {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz"], + Timestamps = lists:seq(1, 100), + [ + ?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>)) + || Topic <- Topics, PublishedAt <- Timestamps + ]. + +t_iterate_multigen(_Config) -> + {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], + Timestamps = lists:seq(1, 100), + _ = [ + store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)]) + ). + store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). @@ -161,14 +223,7 @@ end_per_suite(_Config) -> ok = application:stop(emqx_replay). init_per_testcase(TC, Config) -> - ok = set_zone_config(zone(TC), #{ - timestamp_bits => 64, - topic_bits_per_level => [8, 8, 32, 16], - epoch => 5, - iteration => #{ - iterator_refresh => {every, 5} - } - }), + ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. @@ -176,9 +231,7 @@ end_per_testcase(TC, _Config) -> ok = emqx_replay_local_store_sup:stop_zone(zone(TC)). zone(TC) -> - list_to_atom(?MODULE_STRING ++ atom_to_list(TC)). + list_to_atom(lists:concat([?MODULE, "_", TC])). -set_zone_config(Zone, Options) -> - ok = application:set_env(emqx_replay, zone_config, #{ - Zone => {emqx_replay_message_storage, Options} - }). +set_zone_config(Zone, Config) -> + ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}).