From cf6a5e1643a64c7e54362a88ee81eddebfce1aec Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 17 Jan 2023 13:31:01 +0300 Subject: [PATCH] feat(ds): Allow to create new storage generations --- apps/emqx_replay/src/emqx_replay.app.src | 2 +- apps/emqx_replay/src/emqx_replay.erl | 4 +- apps/emqx_replay/src/emqx_replay_conf.erl | 13 +- .../src/emqx_replay_local_store.erl | 163 +++++++++++++----- .../test/emqx_replay_local_store_SUITE.erl | 2 +- 5 files changed, 130 insertions(+), 54 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay.app.src b/apps/emqx_replay/src/emqx_replay.app.src index 7769e82e9..9c00a78ca 100644 --- a/apps/emqx_replay/src/emqx_replay.app.src +++ b/apps/emqx_replay/src/emqx_replay.app.src @@ -5,7 +5,7 @@ {vsn, "0.1.0"}, {modules, []}, {registered, []}, - {applications, [kernel, stdlib, rocksdb]}, + {applications, [kernel, stdlib, rocksdb, gproc]}, {mod, {emqx_replay_app, []}}, {env, []} ]}. diff --git a/apps/emqx_replay/src/emqx_replay.erl b/apps/emqx_replay/src/emqx_replay.erl index ee83e35d9..fb1ec39c6 100644 --- a/apps/emqx_replay/src/emqx_replay.erl +++ b/apps/emqx_replay/src/emqx_replay.erl @@ -27,8 +27,10 @@ %% parsed -type topic() :: list(binary()). +%% Timestamp +%% Earliest possible timestamp is 0. %% TODO granularity? --type time() :: integer(). +-type time() :: non_neg_integer(). %%================================================================================ %% API funcions diff --git a/apps/emqx_replay/src/emqx_replay_conf.erl b/apps/emqx_replay/src/emqx_replay_conf.erl index 57ba87ddf..46fa53867 100644 --- a/apps/emqx_replay/src/emqx_replay_conf.erl +++ b/apps/emqx_replay/src/emqx_replay_conf.erl @@ -23,6 +23,12 @@ -export([zone_iteration_options/1]). -export([default_iteration_options/0]). +-type backend_config() :: + {emqx_replay_message_storage, emqx_replay_message_storage:options()} + | {module(), _Options}. + +-export_type([backend_config/0]). + %%================================================================================ %% API funcions %%================================================================================ @@ -30,11 +36,8 @@ -define(APP, emqx_replay). -type zone() :: emqx_types:zone(). --type config() :: - {emqx_replay_message_storage, emqx_replay_message_storage:options()} - | {module(), _Options}. --spec zone_config(zone()) -> config(). +-spec zone_config(zone()) -> backend_config(). zone_config(Zone) -> DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()), Zones = application:get_env(?APP, zone_config, #{}), @@ -54,7 +57,7 @@ default_iteration_options() -> {emqx_replay_message_storage, Config} = default_zone_config(), maps:get(iteration, Config). --spec default_zone_config() -> config(). +-spec default_zone_config() -> backend_config(). default_zone_config() -> {emqx_replay_message_storage, #{ timestamp_bits => 64, diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 15a400a92..0138ffed5 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -19,8 +19,9 @@ %% API: -export([start_link/1]). +-export([create_generation/3]). --export([make_iterator/3, store/5, next/1]). +-export([store/5, make_iterator/3, next/1]). %% behavior callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -40,13 +41,18 @@ -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. --record(generation, { +%% Message storage generation +%% Keep in mind that instances of this type are persisted in long-term storage. +-type generation() :: #{ %% Module that handles data for the generation - module :: module(), - %% Module-specific attributes - data :: term() - % time_range :: {emqx_replay:time(), emqx_replay:time()} -}). + module := module(), + %% Module-specific data defined at generation creation time + data := term(), + %% When should this generation become active? + %% This generation should only contain messages timestamped no earlier than that. + %% The very first generation will have `since` equal 0. + since := emqx_replay:time() +}. -record(s, { zone :: emqx_types:zone(), @@ -61,6 +67,7 @@ -type gen_id() :: 0..16#ffff. +-opaque state() :: #s{}. -opaque iterator() :: #it{}. %% Contents of the default column family: @@ -70,19 +77,32 @@ -define(DEFAULT_CF_OPTS, []). +-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}). + %%================================================================================ %% API funcions %%================================================================================ -spec start_link(emqx_types:zone()) -> {ok, pid()}. start_link(Zone) -> - gen_server:start_link(?MODULE, [Zone], []). + gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []). + +-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> + {ok, gen_id()}. +create_generation(Zone, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(Zone), {create_generation, Since, Config}). + +-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> + ok | {error, _TODO}. +store(Zone, GUID, Time, Topic, Msg) -> + #{module := Mod, data := Data} = meta_lookup_gen(Zone, Time), + Mod:store(Data, GUID, Time, Topic, Msg). -spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) -> {ok, _TODO} | {error, _TODO}. make_iterator(Zone, TopicFilter, StartTime) -> %% TODO: this is not supposed to work like this. Just a mock-up - #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), + #{module := Mod, data := Data} = meta_lookup_gen(Zone, StartTime), case Mod:make_iterator(Data, TopicFilter, StartTime) of {ok, It} -> {ok, #it{ @@ -93,13 +113,6 @@ make_iterator(Zone, TopicFilter, StartTime) -> Err end. --spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) -> - ok | {error, _TODO}. -store(Zone, GUID, Time, Topic, Msg) -> - %% TODO: this is not supposed to work like this. Just a mock-up - #generation{module = Mod, data = Data} = meta_lookup(Zone, 0), - Mod:store(Data, GUID, Time, Topic, Msg). - -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. next(#it{module = Mod, data = It0}) -> case Mod:next(It0) of @@ -125,6 +138,9 @@ init([Zone]) -> read_metadata(S), {ok, S}. +handle_call({create_generation, Since, Config}, _From, S) -> + {ok, GenId, NS} = create_new_gen(Since, Config, S), + {reply, {ok, GenId}, NS}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -142,41 +158,50 @@ terminate(_Reason, #s{db = DB, zone = Zone}) -> %% Internal functions %%================================================================================ --spec read_metadata(#s{}) -> ok. -read_metadata(S) -> - %% TODO: just a mockup to make the existing tests pass - read_metadata(0, S). +-spec read_metadata(state()) -> ok. +read_metadata(S = #s{db = DBHandle}) -> + Current = schema_get_current(DBHandle), + lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)). --spec read_metadata(gen_id(), #s{}) -> ok. -read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> - Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), - DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), - meta_put(Zone, GenId, Gen#generation{data = DB}). +-spec read_metadata(gen_id(), state()) -> ok. +read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) -> + Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), + meta_register_gen(Zone, GenId, Gen). --spec ensure_current_generation(#s{}) -> #s{}. -ensure_current_generation(S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> +-spec ensure_current_generation(state()) -> state(). +ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> case schema_get_current(DBHandle) of undefined -> - GenId = 0, - ok = schema_put_current(DBHandle, GenId), - create_new_generation_schema(GenId, S); + Config = emqx_replay_conf:zone_config(Zone), + {ok, _, NS} = create_new_gen(0, Config, S), + NS; _GenId -> S end. --spec create_new_generation_schema(gen_id(), #s{}) -> #s{}. -create_new_generation_schema( - GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs} -) -> - {Module, Options} = emqx_replay_conf:zone_config(Zone), - {NewGenData, NewCFs} = Module:create_new(DBHandle, GenId, Options), - NewGen = #generation{ - module = Module, - data = NewGenData - }, +-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> + {ok, gen_id(), state()}. +create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> + GenId = get_next_id(meta_get_current(Zone)), + GenId = get_next_id(schema_get_current(DBHandle)), + % TODO: Propagate errors to clients. + true = is_gen_valid(Zone, GenId, Since), + {ok, Gen, NS} = create_gen(GenId, Since, Config, S), %% TODO: Transaction? Column family creation can't be transactional, anyway. - ok = schema_put_gen(DBHandle, GenId, NewGen), - S#s{column_families = NewCFs ++ CFs}. + ok = schema_put_gen(DBHandle, GenId, Gen), + ok = schema_put_current(DBHandle, GenId), + ok = meta_register_gen(Zone, GenId, Gen), + {ok, GenId, NS}. + +% -spec +create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) -> + {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), + Gen = #{ + module => Module, + data => Schema, + since => Since + }, + {ok, Gen, S#s{column_families = NewCFs ++ CFs}}. -spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. open_db(Zone) -> @@ -198,18 +223,27 @@ open_db(Zone) -> Error end. +-spec open_gen(gen_id(), generation(), state()) -> generation(). +open_gen( + GenId, + Gen = #{module := Mod, data := Data}, + #s{zone = Zone, db = DBHandle, column_families = CFs} +) -> + DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), + Gen#{data := DB}. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). -define(SCHEMA_WRITE_OPTS, []). -define(SCHEMA_READ_OPTS, []). --spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}. +-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation(). schema_get_gen(DBHandle, GenId) -> {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), binary_to_term(Bin). --spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}. +-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}. schema_put_gen(DBHandle, GenId, Gen) -> rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). @@ -238,11 +272,39 @@ schema_gen_key(N) -> -define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}). --spec meta_lookup(emqx_types:zone(), gen_id()) -> #generation{}. +-spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok. +meta_register_gen(Zone, GenId, Gen) -> + Gs = + case GenId > 0 of + true -> meta_lookup(Zone, GenId - 1); + false -> [] + end, + ok = meta_put(Zone, GenId, [Gen | Gs]), + ok = meta_put(Zone, current, GenId). + +-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> generation(). +meta_lookup_gen(Zone, Time) -> + % TODO + % Is cheaper persistent term GC on update here worth extra lookup? I'm leaning + % towards a "no". + GenId = meta_lookup(Zone, current), + Gens = meta_lookup(Zone, GenId), + [Gen | _Older] = lists:dropwhile(fun(#{since := Since}) -> Since > Time end, Gens), + Gen. + +-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined. +meta_get_current(Zone) -> + meta_lookup(Zone, current, undefined). + +-spec meta_lookup(emqx_types:zone(), _K) -> _V. meta_lookup(Zone, GenId) -> persistent_term:get(?PERSISTENT_TERM(Zone, GenId)). --spec meta_put(emqx_types:zone(), gen_id(), #generation{}) -> ok. +-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default. +meta_lookup(Zone, GenId, Default) -> + persistent_term:get(?PERSISTENT_TERM(Zone, GenId), Default). + +-spec meta_put(emqx_types:zone(), _K, _V) -> ok. meta_put(Zone, GenId, Gen) -> persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen). @@ -256,6 +318,15 @@ meta_erase(Zone) -> -undef(PERSISTENT_TERM). +get_next_id(undefined) -> 0; +get_next_id(GenId) -> GenId + 1. + +is_gen_valid(Zone, GenId, Since) when GenId > 0 -> + [#{since := SincePrev} | _] = meta_lookup(Zone, GenId - 1), + Since > SincePrev; +is_gen_valid(_Zone, 0, 0) -> + true. + %% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. %% store_cfs(DBHandle, CFRefs) -> %% lists:foreach( diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index eee802e69..d3518d780 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -176,7 +176,7 @@ end_per_testcase(TC, _Config) -> ok = emqx_replay_local_store_sup:stop_zone(zone(TC)). zone(TC) -> - list_to_atom(?MODULE_STRING ++ atom_to_list(TC)). + list_to_atom(lists:concat([?MODULE, "_", TC])). set_zone_config(Zone, Options) -> ok = application:set_env(emqx_replay, zone_config, #{