feat(ds): Allow to create new storage generations

This commit is contained in:
Andrew Mayorov 2023-01-17 13:31:01 +03:00 committed by ieQu1
parent f80e906553
commit cf6a5e1643
5 changed files with 130 additions and 54 deletions

View File

@ -5,7 +5,7 @@
{vsn, "0.1.0"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rocksdb]},
{applications, [kernel, stdlib, rocksdb, gproc]},
{mod, {emqx_replay_app, []}},
{env, []}
]}.

View File

@ -27,8 +27,10 @@
%% parsed
-type topic() :: list(binary()).
%% Timestamp
%% Earliest possible timestamp is 0.
%% TODO granularity?
-type time() :: integer().
-type time() :: non_neg_integer().
%%================================================================================
%% API funcions

View File

@ -23,6 +23,12 @@
-export([zone_iteration_options/1]).
-export([default_iteration_options/0]).
-type backend_config() ::
{emqx_replay_message_storage, emqx_replay_message_storage:options()}
| {module(), _Options}.
-export_type([backend_config/0]).
%%================================================================================
%% API funcions
%%================================================================================
@ -30,11 +36,8 @@
-define(APP, emqx_replay).
-type zone() :: emqx_types:zone().
-type config() ::
{emqx_replay_message_storage, emqx_replay_message_storage:options()}
| {module(), _Options}.
-spec zone_config(zone()) -> config().
-spec zone_config(zone()) -> backend_config().
zone_config(Zone) ->
DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()),
Zones = application:get_env(?APP, zone_config, #{}),
@ -54,7 +57,7 @@ default_iteration_options() ->
{emqx_replay_message_storage, Config} = default_zone_config(),
maps:get(iteration, Config).
-spec default_zone_config() -> config().
-spec default_zone_config() -> backend_config().
default_zone_config() ->
{emqx_replay_message_storage, #{
timestamp_bits => 64,

View File

@ -19,8 +19,9 @@
%% API:
-export([start_link/1]).
-export([create_generation/3]).
-export([make_iterator/3, store/5, next/1]).
-export([store/5, make_iterator/3, next/1]).
%% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -40,13 +41,18 @@
-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
-record(generation, {
%% Message storage generation
%% Keep in mind that instances of this type are persisted in long-term storage.
-type generation() :: #{
%% Module that handles data for the generation
module :: module(),
%% Module-specific attributes
data :: term()
% time_range :: {emqx_replay:time(), emqx_replay:time()}
}).
module := module(),
%% Module-specific data defined at generation creation time
data := term(),
%% When should this generation become active?
%% This generation should only contain messages timestamped no earlier than that.
%% The very first generation will have `since` equal 0.
since := emqx_replay:time()
}.
-record(s, {
zone :: emqx_types:zone(),
@ -61,6 +67,7 @@
-type gen_id() :: 0..16#ffff.
-opaque state() :: #s{}.
-opaque iterator() :: #it{}.
%% Contents of the default column family:
@ -70,19 +77,32 @@
-define(DEFAULT_CF_OPTS, []).
-define(REF(Zone), {via, gproc, {n, l, {?MODULE, Zone}}}).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link(emqx_types:zone()) -> {ok, pid()}.
start_link(Zone) ->
gen_server:start_link(?MODULE, [Zone], []).
gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []).
-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
{ok, gen_id()}.
create_generation(Zone, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(Zone), {create_generation, Since, Config}).
-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
ok | {error, _TODO}.
store(Zone, GUID, Time, Topic, Msg) ->
#{module := Mod, data := Data} = meta_lookup_gen(Zone, Time),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) ->
{ok, _TODO} | {error, _TODO}.
make_iterator(Zone, TopicFilter, StartTime) ->
%% TODO: this is not supposed to work like this. Just a mock-up
#generation{module = Mod, data = Data} = meta_lookup(Zone, 0),
#{module := Mod, data := Data} = meta_lookup_gen(Zone, StartTime),
case Mod:make_iterator(Data, TopicFilter, StartTime) of
{ok, It} ->
{ok, #it{
@ -93,13 +113,6 @@ make_iterator(Zone, TopicFilter, StartTime) ->
Err
end.
-spec store(emqx_types:zone(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary()) ->
ok | {error, _TODO}.
store(Zone, GUID, Time, Topic, Msg) ->
%% TODO: this is not supposed to work like this. Just a mock-up
#generation{module = Mod, data = Data} = meta_lookup(Zone, 0),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(#it{module = Mod, data = It0}) ->
case Mod:next(It0) of
@ -125,6 +138,9 @@ init([Zone]) ->
read_metadata(S),
{ok, S}.
handle_call({create_generation, Since, Config}, _From, S) ->
{ok, GenId, NS} = create_new_gen(Since, Config, S),
{reply, {ok, GenId}, NS};
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
@ -142,41 +158,50 @@ terminate(_Reason, #s{db = DB, zone = Zone}) ->
%% Internal functions
%%================================================================================
-spec read_metadata(#s{}) -> ok.
read_metadata(S) ->
%% TODO: just a mockup to make the existing tests pass
read_metadata(0, S).
-spec read_metadata(state()) -> ok.
read_metadata(S = #s{db = DBHandle}) ->
Current = schema_get_current(DBHandle),
lists:foreach(fun(GenId) -> read_metadata(GenId, S) end, lists:seq(0, Current)).
-spec read_metadata(gen_id(), #s{}) -> ok.
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId),
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
meta_put(Zone, GenId, Gen#generation{data = DB}).
-spec read_metadata(gen_id(), state()) -> ok.
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle}) ->
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
meta_register_gen(Zone, GenId, Gen).
-spec ensure_current_generation(#s{}) -> #s{}.
ensure_current_generation(S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
-spec ensure_current_generation(state()) -> state().
ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
case schema_get_current(DBHandle) of
undefined ->
GenId = 0,
ok = schema_put_current(DBHandle, GenId),
create_new_generation_schema(GenId, S);
Config = emqx_replay_conf:zone_config(Zone),
{ok, _, NS} = create_new_gen(0, Config, S),
NS;
_GenId ->
S
end.
-spec create_new_generation_schema(gen_id(), #s{}) -> #s{}.
create_new_generation_schema(
GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}
) ->
{Module, Options} = emqx_replay_conf:zone_config(Zone),
{NewGenData, NewCFs} = Module:create_new(DBHandle, GenId, Options),
NewGen = #generation{
module = Module,
data = NewGenData
},
-spec create_new_gen(emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
{ok, gen_id(), state()}.
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Zone)),
GenId = get_next_id(schema_get_current(DBHandle)),
% TODO: Propagate errors to clients.
true = is_gen_valid(Zone, GenId, Since),
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, NewGen),
S#s{column_families = NewCFs ++ CFs}.
ok = schema_put_gen(DBHandle, GenId, Gen),
ok = schema_put_current(DBHandle, GenId),
ok = meta_register_gen(Zone, GenId, Gen),
{ok, GenId, NS}.
% -spec
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) ->
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
Gen = #{
module => Module,
data => Schema,
since => Since
},
{ok, Gen, S#s{column_families = NewCFs ++ CFs}}.
-spec open_db(emqx_types:zone()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
open_db(Zone) ->
@ -198,18 +223,27 @@ open_db(Zone) ->
Error
end.
-spec open_gen(gen_id(), generation(), state()) -> generation().
open_gen(
GenId,
Gen = #{module := Mod, data := Data},
#s{zone = Zone, db = DBHandle, column_families = CFs}
) ->
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
Gen#{data := DB}.
%% Functions for dealing with the metadata stored persistently in rocksdb
-define(CURRENT_GEN, <<"current">>).
-define(SCHEMA_WRITE_OPTS, []).
-define(SCHEMA_READ_OPTS, []).
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}.
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation().
schema_get_gen(DBHandle, GenId) ->
{ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS),
binary_to_term(Bin).
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}.
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}.
schema_put_gen(DBHandle, GenId, Gen) ->
rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
@ -238,11 +272,39 @@ schema_gen_key(N) ->
-define(PERSISTENT_TERM(ZONE, GEN), {?MODULE, ZONE, GEN}).
-spec meta_lookup(emqx_types:zone(), gen_id()) -> #generation{}.
-spec meta_register_gen(emqx_types:zone(), gen_id(), generation()) -> ok.
meta_register_gen(Zone, GenId, Gen) ->
Gs =
case GenId > 0 of
true -> meta_lookup(Zone, GenId - 1);
false -> []
end,
ok = meta_put(Zone, GenId, [Gen | Gs]),
ok = meta_put(Zone, current, GenId).
-spec meta_lookup_gen(emqx_types:zone(), emqx_replay:time()) -> generation().
meta_lookup_gen(Zone, Time) ->
% TODO
% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
% towards a "no".
GenId = meta_lookup(Zone, current),
Gens = meta_lookup(Zone, GenId),
[Gen | _Older] = lists:dropwhile(fun(#{since := Since}) -> Since > Time end, Gens),
Gen.
-spec meta_get_current(emqx_types:zone()) -> gen_id() | undefined.
meta_get_current(Zone) ->
meta_lookup(Zone, current, undefined).
-spec meta_lookup(emqx_types:zone(), _K) -> _V.
meta_lookup(Zone, GenId) ->
persistent_term:get(?PERSISTENT_TERM(Zone, GenId)).
-spec meta_put(emqx_types:zone(), gen_id(), #generation{}) -> ok.
-spec meta_lookup(emqx_types:zone(), _K, Default) -> _V | Default.
meta_lookup(Zone, GenId, Default) ->
persistent_term:get(?PERSISTENT_TERM(Zone, GenId), Default).
-spec meta_put(emqx_types:zone(), _K, _V) -> ok.
meta_put(Zone, GenId, Gen) ->
persistent_term:put(?PERSISTENT_TERM(Zone, GenId), Gen).
@ -256,6 +318,15 @@ meta_erase(Zone) ->
-undef(PERSISTENT_TERM).
get_next_id(undefined) -> 0;
get_next_id(GenId) -> GenId + 1.
is_gen_valid(Zone, GenId, Since) when GenId > 0 ->
[#{since := SincePrev} | _] = meta_lookup(Zone, GenId - 1),
Since > SincePrev;
is_gen_valid(_Zone, 0, 0) ->
true.
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
%% store_cfs(DBHandle, CFRefs) ->
%% lists:foreach(

View File

@ -176,7 +176,7 @@ end_per_testcase(TC, _Config) ->
ok = emqx_replay_local_store_sup:stop_zone(zone(TC)).
zone(TC) ->
list_to_atom(?MODULE_STRING ++ atom_to_list(TC)).
list_to_atom(lists:concat([?MODULE, "_", TC])).
set_zone_config(Zone, Options) ->
ok = application:set_env(emqx_replay, zone_config, #{