From 4e364568435260ed402038314e069ecbea5fb7a1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 17 Jan 2023 13:31:01 +0300 Subject: [PATCH 1/4] feat: allow to create new storage generations --- apps/emqx_replay/src/emqx_replay.app.src | 2 +- apps/emqx_replay/src/emqx_replay.erl | 4 +- apps/emqx_replay/src/emqx_replay_conf.erl | 13 +- .../src/emqx_replay_local_store.erl | 163 +++++++++++++----- .../test/emqx_replay_local_store_SUITE.erl | 2 +- 5 files changed, 130 insertions(+), 54 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay.app.src b/apps/emqx_replay/src/emqx_replay.app.src index 7769e82e9..9c00a78ca 100644 --- a/apps/emqx_replay/src/emqx_replay.app.src +++ b/apps/emqx_replay/src/emqx_replay.app.src @@ -5,7 +5,7 @@ {vsn, "0.1.0"}, {modules, []}, {registered, []}, - {applications, [kernel, stdlib, rocksdb]}, + {applications, [kernel, stdlib, rocksdb, gproc]}, {mod, {emqx_replay_app, []}}, {env, []} ]}. diff --git a/apps/emqx_replay/src/emqx_replay.erl b/apps/emqx_replay/src/emqx_replay.erl index ee83e35d9..fb1ec39c6 100644 --- a/apps/emqx_replay/src/emqx_replay.erl +++ b/apps/emqx_replay/src/emqx_replay.erl @@ -27,8 +27,10 @@ %% parsed -type topic() :: list(binary()). +%% Timestamp +%% Earliest possible timestamp is 0. %% TODO granularity? --type time() :: integer(). +-type time() :: non_neg_integer(). %%================================================================================ %% API funcions diff --git a/apps/emqx_replay/src/emqx_replay_conf.erl b/apps/emqx_replay/src/emqx_replay_conf.erl index 57ba87ddf..46fa53867 100644 --- a/apps/emqx_replay/src/emqx_replay_conf.erl +++ b/apps/emqx_replay/src/emqx_replay_conf.erl @@ -23,6 +23,12 @@ -export([zone_iteration_options/1]). -export([default_iteration_options/0]). +-type backend_config() :: + {emqx_replay_message_storage, emqx_replay_message_storage:options()} + | {module(), _Options}. + +-export_type([backend_config/0]). + %%================================================================================ %% API funcions %%================================================================================ @@ -30,11 +36,8 @@ -define(APP, emqx_replay). -type zone() :: emqx_types:zone(). --type config() :: - {emqx_replay_message_storage, emqx_replay_message_storage:options()} - | {module(), _Options}. --spec zone_config(zone()) -> config(). +-spec zone_config(zone()) -> backend_config(). zone_config(Zone) -> DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()), Zones = application:get_env(?APP, zone_config, #{}), @@ -54,7 +57,7 @@ default_iteration_options() -> {emqx_replay_message_storage, Config} = default_zone_config(), maps:get(iteration, Config). --spec default_zone_config() -> config(). +-spec default_zone_config() -> backend_config(). default_zone_config() -> {emqx_replay_message_storage, #{ timestamp_bits => 64, diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 15a400a92..0138ffed5 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -19,8 +19,9 @@ %% API: -export([start_link/1]). +-export([create_generation/3]). --export([make_iterator/3, store/5, next/1]). +-export([store/5, make_iterator/3, next/1]). %% behavior callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -40,13 +41,18 @@ -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. --record(generation, { +%% Message storage generation +%% Keep in mind that instances of this type are persisted in long-term storage. +-type generation() :: #{ %% Module that handles data for the generation - module :: module(), - %% Module-specific attributes - data :: term() - % time_range :: {emqx_replay:time(), emqx_replay:time()} -}). + module := module(), + %% Module-specific data defined at generation creation time + data := term(), + %% When should this generation become active? + %% This generation should only contain messages timestamped no earlier than that. + %% The very first generation will have `since` equal 0. + since := emqx_replay:time() +}. -record(s, { zone :: emqx_types:zone(), @@ -61,6 +67,7 @@ -type gen_id() :: 0..16#ffff. +-opaque state() :: #s{}. -opaque iterator() :: #it{}. %% Contents of the default column family: @@ -70,19 +77,32 @@ -define(DEFAULT_CF_OPTS, []). +-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}). + %%================================================================================ %% API funcions %%================================================================================ -spec start_link(emqx_types:zone()) -> {ok, pid()}. start_link(Zone) -> - gen_server:start_link(?MODULE, [Zone], []). + gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []). + +-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> + {ok, gen_id()}. +create_generation(Zone, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Zone), {create_generation, Since, Config}). + +-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> + ok | {error, _TODO}. +store(Zone, GUID, Time, Topic, Msg) -> + #{module := Mod, data := Data} = meta_lookup_gen(Zone, Time), + Mod:store(Data, GUID, Time, Topic, Msg). -spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) -> {ok, _TODO} | {error, _TODO}. make_iterator(Zone, TopicFilter, StartTime) -> %% TODO: this is not supposed to work like this. Just a mock-up - #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), + #{module := Mod, data := Data} = meta_lookup_gen(Zone, StartTime), case Mod:make_iterator(Data, TopicFilter, StartTime) of {ok, It} -> {ok, #it{ @@ -93,13 +113,6 @@ make_iterator(Zone, TopicFilter, StartTime) -> Err end. --spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> - ok | {error, _TODO}. -store(Zone, GUID, Time, Topic, Msg) -> - %% TODO: this is not supposed to work like this. Just a mock-up - #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), - Mod:store(Data, GUID, Time, Topic, Msg). - -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. next(#it{module = Mod, data = It0}) -> case Mod:next(It0) of @@ -125,6 +138,9 @@ init([Zone]) -> read_metadata(S), {ok, S}. +handle_call({create_generation, Since, Config}, _From, S) -> + {ok, GenId, NS} = create_new_gen(Since, Config, S), + {reply, {ok, GenId}, NS}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -142,41 +158,50 @@ terminate(_Reason, #s{db = DB, zone = Zone}) -> %% Internal functions %%================================================================================ --spec read_metadata(#s{}) -> ok. -read_metadata(S) -> - %% TODO: just a mockup to make the existing tests pass - read_metadata(0, S). +-spec read_metadata(state()) -> ok. +read_metadata(S = #s{db = DBHandle}) -> + Current = schema_get_current(DBHandle), + lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)). --spec read_metadata(gen_id(), #s{}) -> ok. -read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> - Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), - DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), - meta_put(Zone, GenId, Gen#generation{data = DB}). +-spec read_metadata(gen_id(), state()) -> ok. +read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) -> + Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), + meta_register_gen(Zone, GenId, Gen). --spec ensure_current_generation(#s{}) -> #s{}. -ensure_current_generation(S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> +-spec ensure_current_generation(state()) -> state(). +ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> - GenId = 0, - ok = schema_put_current(DBHandle, GenId), - create_new_generation_schema(GenId, S); + Config = emqx_replay_conf:zone_config(Zone), + {ok, _, NS} = create_new_gen(0, Config, S), + NS; _GenId -> S end. --spec create_new_generation_schema(gen_id(), #s{}) -> #s{}. -create_new_generation_schema( - GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs} -) -> - {Module, Options} = emqx_replay_conf:zone_config(Zone), - {NewGenData, NewCFs} = Module:create_new(DBHandle, GenId, Options), - NewGen = #generation{ - module = Module, - data = NewGenData - }, +-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> + {ok, gen_id(), state()}. +create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> + GenId = get_next_id(meta_get_current(Zone)), + GenId = get_next_id(schema_get_current(DBHandle)), + % TODO: Propagate errors to clients. + true = is_gen_valid(Zone, GenId, Since), + {ok, Gen, NS} = create_gen(GenId, Since, Config, S), %% TODO: Transaction? Column family creation can't be transactional, anyway. - ok = schema_put_gen(DBHandle, GenId, NewGen), - S#s{column_families = NewCFs ++ CFs}. + ok = schema_put_gen(DBHandle, GenId, Gen), + ok = schema_put_current(DBHandle, GenId), + ok = meta_register_gen(Zone, GenId, Gen), + {ok, GenId, NS}. + +% -spec +create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) -> + {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), + Gen = #{ + module => Module, + data => Schema, + since => Since + }, + {ok, Gen, S#s{column_families = NewCFs ++ CFs}}. -spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. open_db(Zone) -> @@ -198,18 +223,27 @@ open_db(Zone) -> Error end. +-spec open_gen(gen_id(), generation(), state()) -> generation(). +open_gen( + GenId, + Gen = #{module := Mod, data := Data}, + #s{zone = Zone, db = DBHandle, column_families = CFs} +) -> + DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), + Gen#{data := DB}. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). -define(SCHEMA_WRITE_OPTS, []). -define(SCHEMA_READ_OPTS, []). --spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}. +-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation(). schema_get_gen(DBHandle, GenId) -> {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), binary_to_term(Bin). --spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}. +-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}. schema_put_gen(DBHandle, GenId, Gen) -> rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). @@ -238,11 +272,39 @@ schema_gen_key(N) -> -define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}). --spec meta_lookup(emqx_types:zone(), gen_id()) -> #generation{}. +-spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok. +meta_register_gen(Zone, GenId, Gen) -> + Gs = + case GenId > 0 of + true -> meta_lookup(Zone, GenId - 1); + false -> [] + end, + ok = meta_put(Zone, GenId, [Gen | Gs]), + ok = meta_put(Zone, current, GenId). + +-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> generation(). +meta_lookup_gen(Zone, Time) -> + % TODO + % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning + % towards a "no". + GenId = meta_lookup(Zone, current), + Gens = meta_lookup(Zone, GenId), + [Gen | _Older] = lists:dropwhile(fun(#{since := Since}) -> Since > Time end, Gens), + Gen. + +-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined. +meta_get_current(Zone) -> + meta_lookup(Zone, current, undefined). + +-spec meta_lookup(emqx_types:zone(), _K) -> _V. meta_lookup(Zone, GenId) -> persistent_term:get(?PERSISTENT_TERM(Zone, GenId)). --spec meta_put(emqx_types:zone(), gen_id(), #generation{}) -> ok. +-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default. +meta_lookup(Zone, GenId, Default) -> + persistent_term:get(?PERSISTENT_TERM(Zone, GenId), Default). + +-spec meta_put(emqx_types:zone(), _K, _V) -> ok. meta_put(Zone, GenId, Gen) -> persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen). @@ -256,6 +318,15 @@ meta_erase(Zone) -> -undef(PERSISTENT_TERM). +get_next_id(undefined) -> 0; +get_next_id(GenId) -> GenId + 1. + +is_gen_valid(Zone, GenId, Since) when GenId > 0 -> + [#{since := SincePrev} | _] = meta_lookup(Zone, GenId - 1), + Since > SincePrev; +is_gen_valid(_Zone, 0, 0) -> + true. + %% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. %% store_cfs(DBHandle, CFRefs) -> %% lists:foreach( diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index eee802e69..d3518d780 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -176,7 +176,7 @@ end_per_testcase(TC, _Config) -> ok = emqx_replay_local_store_sup:stop_zone(zone(TC)). zone(TC) -> - list_to_atom(?MODULE_STRING ++ atom_to_list(TC)). + list_to_atom(lists:concat([?MODULE, "_", TC])). set_zone_config(Zone, Options) -> ok = application:set_env(emqx_replay, zone_config, #{ From e42f009d3f8fe2214a1cf08344802ddf0c970d43 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 17 Jan 2023 15:58:29 +0300 Subject: [PATCH 2/4] chore: simplify iteration-related typespecs --- apps/emqx_replay/src/emqx_replay_local_store.erl | 2 +- apps/emqx_replay/src/emqx_replay_message_storage.erl | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 0138ffed5..35a31e65c 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -98,7 +98,7 @@ store(Zone, GUID, Time, Topic, Msg) -> #{module := Mod, data := Data} = meta_lookup_gen(Zone, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) -> +-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) -> {ok, _TODO} | {error, _TODO}. make_iterator(Zone, TopicFilter, StartTime) -> %% TODO: this is not supposed to work like this. Just a mock-up diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index f2a6afaa6..fb96863d1 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -140,11 +140,8 @@ %% Type declarations %%================================================================================ -%% parsed --type topic() :: list(binary()). - -%% TODO granularity? --type time() :: integer(). +-type topic() :: emqx_replay:topic(). +-type time() :: emqx_replay:time(). %% Number of bits -type bits() :: non_neg_integer(). @@ -292,20 +289,19 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> +-spec make_iterator(db(), emqx_topic:words(), time()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, TopicFilter, StartTime) -> Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), make_iterator(DB, TopicFilter, StartTime, Options). --spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> +-spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - % TODO earliest Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), From 418ecbcbbcde99c9b1eb2c4355fb4a7d8aa6748b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 17 Jan 2023 17:20:17 +0300 Subject: [PATCH 3/4] feat: make iteration fully generation-aware --- .../src/emqx_replay_local_store.erl | 98 +++++++++++++------ .../test/emqx_replay_local_store_SUITE.erl | 77 ++++++++++++--- 2 files changed, 133 insertions(+), 42 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 35a31e65c..77f0b2924 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -61,6 +61,10 @@ }). -record(it, { + zone :: emqx_types:zone(), + gen :: gen_id(), + filter :: emqx_topic:words(), + start_time :: emqx_replay:time(), module :: module(), data :: term() }). @@ -95,31 +99,36 @@ create_generation(Zone, Since, Config = {_Module, _Options}) -> -spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> ok | {error, _TODO}. store(Zone, GUID, Time, Topic, Msg) -> - #{module := Mod, data := Data} = meta_lookup_gen(Zone, Time), + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Zone, Time), Mod:store(Data, GUID, Time, Topic, Msg). -spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) -> - {ok, _TODO} | {error, _TODO}. + {ok, iterator()} | {error, _TODO}. make_iterator(Zone, TopicFilter, StartTime) -> - %% TODO: this is not supposed to work like this. Just a mock-up - #{module := Mod, data := Data} = meta_lookup_gen(Zone, StartTime), - case Mod:make_iterator(Data, TopicFilter, StartTime) of - {ok, It} -> - {ok, #it{ - module = Mod, - data = It - }}; - Err -> - Err - end. + {GenId, Gen} = meta_lookup_gen(Zone, StartTime), + open_iterator(Gen, #it{ + zone = Zone, + gen = GenId, + filter = TopicFilter, + start_time = StartTime + }). -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(#it{module = Mod, data = It0}) -> - case Mod:next(It0) of - {value, Val, It} -> - {value, Val, #it{module = Mod, data = It}}; - Other -> - Other +next(It = #it{module = Mod, data = ItData}) -> + case Mod:next(ItData) of + {value, Val, ItDataNext} -> + {value, Val, It#it{data = ItDataNext}}; + {error, _} = Error -> + Error; + none -> + case open_next_iterator(It) of + {ok, ItNext} -> + next(ItNext); + {error, _} = Error -> + Error; + none -> + none + end end. %%================================================================================ @@ -232,6 +241,24 @@ open_gen( DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), Gen#{data := DB}. +-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. +open_next_iterator(It = #it{zone = Zone, gen = GenId}) -> + open_next_iterator(meta_get_gen(Zone, GenId + 1), It#it{gen = GenId + 1}). + +open_next_iterator(undefined, _It) -> + none; +open_next_iterator(Gen = #{}, It) -> + open_iterator(Gen, It). + +-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. +open_iterator(#{module := Mod, data := Data}, It = #it{}) -> + case Mod:make_iterator(Data, It#it.filter, It#it.start_time) of + {ok, ItData} -> + {ok, It#it{module = Mod, data = ItData}}; + Err -> + Err + end. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). @@ -282,31 +309,42 @@ meta_register_gen(Zone, GenId, Gen) -> ok = meta_put(Zone, GenId, [Gen | Gs]), ok = meta_put(Zone, current, GenId). --spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> generation(). +-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> {gen_id(), generation()}. meta_lookup_gen(Zone, Time) -> % TODO % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning % towards a "no". - GenId = meta_lookup(Zone, current), - Gens = meta_lookup(Zone, GenId), - [Gen | _Older] = lists:dropwhile(fun(#{since := Since}) -> Since > Time end, Gens), - Gen. + Current = meta_lookup(Zone, current), + Gens = meta_lookup(Zone, Current), + find_gen(Time, Current, Gens). + +find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> + {GenId, Gen}; +find_gen(Time, GenId, [_Gen | Rest]) -> + find_gen(Time, GenId - 1, Rest). + +-spec meta_get_gen(emqx_types:zone(), gen_id()) -> generation() | undefined. +meta_get_gen(Zone, GenId) -> + case meta_lookup(Zone, GenId, []) of + [Gen | _Older] -> Gen; + [] -> undefined + end. -spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined. meta_get_current(Zone) -> meta_lookup(Zone, current, undefined). -spec meta_lookup(emqx_types:zone(), _K) -> _V. -meta_lookup(Zone, GenId) -> - persistent_term:get(?PERSISTENT_TERM(Zone, GenId)). +meta_lookup(Zone, K) -> + persistent_term:get(?PERSISTENT_TERM(Zone, K)). -spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default. -meta_lookup(Zone, GenId, Default) -> - persistent_term:get(?PERSISTENT_TERM(Zone, GenId), Default). +meta_lookup(Zone, K, Default) -> + persistent_term:get(?PERSISTENT_TERM(Zone, K), Default). -spec meta_put(emqx_types:zone(), _K, _V) -> ok. -meta_put(Zone, GenId, Gen) -> - persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen). +meta_put(Zone, K, V) -> + persistent_term:put(?PERSISTENT_TERM(Zone, K), V). -spec meta_erase(emqx_types:zone()) -> ok. meta_erase(Zone) -> diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index d3518d780..edda0f7f6 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -23,6 +23,25 @@ -define(ZONE, zone(?FUNCTION_NAME)). +-define(DEFAULT_CONFIG, + {emqx_replay_message_storage, #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 32, 16], + epoch => 5, + iteration => #{ + iterator_refresh => {every, 5} + } + }} +). + +-define(COMPACT_CONFIG, + {emqx_replay_message_storage, #{ + timestamp_bits => 16, + topic_bits_per_level => [16, 16], + epoch => 10 + }} +). + %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_replay_local_store_sup:stop_zone(?ZONE), @@ -128,6 +147,49 @@ t_iterate_long_tail_wildcard(_Config) -> lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)]) ). +t_create_gen(_Config) -> + {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG), + ?assertEqual( + {error, nonmonotonic}, + emqx_replay_local_store:create_generation(?ZONE, 1, ?DEFAULT_CONFIG) + ), + ?assertEqual( + {error, nonmonotonic}, + emqx_replay_local_store:create_generation(?ZONE, 5, ?DEFAULT_CONFIG) + ), + {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz"], + Timestamps = lists:seq(1, 100), + [ + ?assertEqual(ok, store(?ZONE, PublishedAt, Topic, <<>>)) + || Topic <- Topics, PublishedAt <- Timestamps + ]. + +t_iterate_multigen(_Config) -> + {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 1000, ?DEFAULT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], + Timestamps = lists:seq(1, 100), + _ = [ + store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "foo/#", 0)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)]) + ). + store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). @@ -161,14 +223,7 @@ end_per_suite(_Config) -> ok = application:stop(emqx_replay). init_per_testcase(TC, Config) -> - ok = set_zone_config(zone(TC), #{ - timestamp_bits => 64, - topic_bits_per_level => [8, 8, 32, 16], - epoch => 5, - iteration => #{ - iterator_refresh => {every, 5} - } - }), + ok = set_zone_config(zone(TC), ?DEFAULT_CONFIG), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. @@ -178,7 +233,5 @@ end_per_testcase(TC, _Config) -> zone(TC) -> list_to_atom(lists:concat([?MODULE, "_", TC])). -set_zone_config(Zone, Options) -> - ok = application:set_env(emqx_replay, zone_config, #{ - Zone => {emqx_replay_message_storage, Options} - }). +set_zone_config(Zone, Config) -> + ok = application:set_env(emqx_replay, zone_config, #{Zone => Config}). From 3c13dd38f68dd745e5a7ab0d0b3dc52033841cd1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 17 Jan 2023 17:21:59 +0300 Subject: [PATCH 4/4] feat: make `create_generation` safer against bad input --- .../src/emqx_replay_local_store.erl | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 77f0b2924..7148308b2 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -92,7 +92,7 @@ start_link(Zone) -> gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []). -spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> - {ok, gen_id()}. + {ok, gen_id()} | {error, nonmonotonic}. create_generation(Zone, Since, Config = {_Module, _Options}) -> gen_server:call(?REF(Zone), {create_generation, Since, Config}). @@ -148,8 +148,12 @@ init([Zone]) -> {ok, S}. handle_call({create_generation, Since, Config}, _From, S) -> - {ok, GenId, NS} = create_new_gen(Since, Config, S), - {reply, {ok, GenId}, NS}; + case create_new_gen(Since, Config, S) of + {ok, GenId, NS} -> + {reply, {ok, GenId}, NS}; + {error, _} = Error -> + {reply, Error, S} + end; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -193,17 +197,22 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> GenId = get_next_id(meta_get_current(Zone)), GenId = get_next_id(schema_get_current(DBHandle)), - % TODO: Propagate errors to clients. - true = is_gen_valid(Zone, GenId, Since), - {ok, Gen, NS} = create_gen(GenId, Since, Config, S), - %% TODO: Transaction? Column family creation can't be transactional, anyway. - ok = schema_put_gen(DBHandle, GenId, Gen), - ok = schema_put_current(DBHandle, GenId), - ok = meta_register_gen(Zone, GenId, Gen), - {ok, GenId, NS}. + case is_gen_valid(Zone, GenId, Since) of + ok -> + {ok, Gen, NS} = create_gen(GenId, Since, Config, S), + %% TODO: Transaction? Column family creation can't be transactional, anyway. + ok = schema_put_gen(DBHandle, GenId, Gen), + ok = schema_put_current(DBHandle, GenId), + ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)), + {ok, GenId, NS}; + {error, _} = Error -> + Error + end. -% -spec +-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> + {ok, generation(), state()}. create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) -> + % TODO: Backend implementation should ensure idempotency. {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), Gen = #{ module => Module, @@ -360,10 +369,15 @@ get_next_id(undefined) -> 0; get_next_id(GenId) -> GenId + 1. is_gen_valid(Zone, GenId, Since) when GenId > 0 -> - [#{since := SincePrev} | _] = meta_lookup(Zone, GenId - 1), - Since > SincePrev; + [GenPrev | _] = meta_lookup(Zone, GenId - 1), + case GenPrev of + #{since := SincePrev} when Since > SincePrev -> + ok; + #{} -> + {error, nonmonotonic} + end; is_gen_valid(_Zone, 0, 0) -> - true. + ok. %% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. %% store_cfs(DBHandle, CFRefs) ->