feat(ds): Wire iteration options up to the app config

This commit is contained in:
Andrew Mayorov 2023-01-16 17:25:55 +03:00 committed by ieQu1
parent 2bf8a07b05
commit f5a7b49f57
4 changed files with 53 additions and 20 deletions

View File

@ -20,27 +20,51 @@
%% API: %% API:
-export([zone_config/1, db_options/0]). -export([zone_config/1, db_options/0]).
-export([zone_iteration_options/1]).
-export([default_iteration_options/0]).
%%================================================================================ %%================================================================================
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
-define(APP, emqx_replay). -define(APP, emqx_replay).
-spec zone_config(emqx_types:zone()) -> -type zone() :: emqx_types:zone().
{module(), term()}. -type config() ::
{emqx_replay_message_storage, emqx_replay_message_storage:options()}
| {module(), _Options}.
-spec zone_config(zone()) -> config().
zone_config(Zone) -> zone_config(Zone) ->
DefaultConf = DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()),
#{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 8, 32, 16],
epoch => 5
},
DefaultZoneConfig = application:get_env(
?APP, default_zone_config, {emqx_replay_message_storage, DefaultConf}
),
Zones = application:get_env(?APP, zone_config, #{}), Zones = application:get_env(?APP, zone_config, #{}),
maps:get(Zone, Zones, DefaultZoneConfig). maps:get(Zone, Zones, DefaultZoneConfig).
-spec zone_iteration_options(zone()) -> emqx_replay_message_storage:iteration_options().
zone_iteration_options(Zone) ->
case zone_config(Zone) of
{emqx_replay_message_storage, Config} ->
maps:get(iteration, Config, default_iteration_options());
{_Module, _} ->
default_iteration_options()
end.
-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options().
default_iteration_options() ->
{emqx_replay_message_storage, Config} = default_zone_config(),
maps:get(iteration, Config).
-spec default_zone_config() -> config().
default_zone_config() ->
{emqx_replay_message_storage, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 100}
}
}}.
-spec db_options() -> emqx_replay_local_store:db_options(). -spec db_options() -> emqx_replay_local_store:db_options().
db_options() -> db_options() ->
application:get_env(?APP, db_options, []). application:get_env(?APP, db_options, []).

View File

@ -150,7 +150,7 @@ read_metadata(S) ->
-spec read_metadata(gen_id(), #s{}) -> ok. -spec read_metadata(gen_id(), #s{}) -> ok.
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId),
DB = Mod:open(DBHandle, GenId, CFs, Data), DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
meta_put(Zone, GenId, Gen#generation{data = DB}). meta_put(Zone, GenId, Gen#generation{data = DB}).
-spec ensure_current_generation(#s{}) -> #s{}. -spec ensure_current_generation(#s{}) -> #s{}.

View File

@ -90,7 +90,7 @@
%%================================================================================ %%================================================================================
%% API: %% API:
-export([create_new/3, open/4]). -export([create_new/3, open/5]).
-export([make_keymapper/1]). -export([make_keymapper/1]).
-export([store/5]). -export([store/5]).
@ -123,6 +123,9 @@
-export_type([db/0, iterator/0, schema/0]). -export_type([db/0, iterator/0, schema/0]).
-export_type([options/0]).
-export_type([iteration_options/0]).
-compile( -compile(
{inline, [ {inline, [
bitwise_concat/3, bitwise_concat/3,
@ -162,6 +165,8 @@
%% Maximum granularity of iteration over time. %% Maximum granularity of iteration over time.
epoch := time(), epoch := time(),
iteration => iteration_options(),
cf_options => emqx_replay_local_store:db_cf_options() cf_options => emqx_replay_local_store:db_cf_options()
}. }.
@ -180,12 +185,12 @@
-opaque schema() :: #schema{}. -opaque schema() :: #schema{}.
-record(db, { -record(db, {
zone :: emqx_types:zone(),
handle :: rocksdb:db_handle(), handle :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle(), cf :: rocksdb:cf_handle(),
keymapper :: keymapper(), keymapper :: keymapper(),
write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(),
read_options = [] :: emqx_replay_local_store:db_write_options(), read_options = [] :: emqx_replay_local_store:db_write_options()
iteration_options = #{} :: iteration_options()
}). }).
-record(it, { -record(it, {
@ -233,7 +238,6 @@
%% Create a new column family for the generation and a serializable representation of the schema %% Create a new column family for the generation and a serializable representation of the schema
-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) -> -spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) ->
{schema(), emqx_replay_local_store:cf_refs()}. {schema(), emqx_replay_local_store:cf_refs()}.
%{schema(), emqx_replay_local_store:cf_refs()}.
create_new(DBHandle, GenId, Options) -> create_new(DBHandle, GenId, Options) ->
CFName = data_cf(GenId), CFName = data_cf(GenId),
CFOptions = maps:get(cf_options, Options, []), CFOptions = maps:get(cf_options, Options, []),
@ -243,15 +247,17 @@ create_new(DBHandle, GenId, Options) ->
%% Reopen the database %% Reopen the database
-spec open( -spec open(
emqx_types:zone(),
rocksdb:db_handle(), rocksdb:db_handle(),
emqx_replay_local_store:gen_id(), emqx_replay_local_store:gen_id(),
emqx_replay_local_store:cf_refs(), emqx_replay_local_store:cf_refs(),
schema() schema()
) -> ) ->
db(). db().
open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> open(Zone, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
{value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
#db{ #db{
zone = Zone,
handle = DBHandle, handle = DBHandle,
cf = CFHandle, cf = CFHandle,
keymapper = Keymapper keymapper = Keymapper
@ -289,8 +295,8 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
-spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> -spec make_iterator(db(), emqx_topic:words(), time() | earliest) ->
{ok, iterator()} | {error, _TODO}. {ok, iterator()} | {error, _TODO}.
make_iterator(DB, TopicFilter, StartTime) -> make_iterator(DB, TopicFilter, StartTime) ->
% TODO wire it up somehow to the upper level Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
make_iterator(DB, TopicFilter, StartTime, DB#db.iteration_options). make_iterator(DB, TopicFilter, StartTime, Options).
-spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> -spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time % {error, invalid_start_time}? might just start from the beginning of time

View File

@ -164,7 +164,10 @@ init_per_testcase(TC, Config) ->
ok = set_zone_config(zone(TC), #{ ok = set_zone_config(zone(TC), #{
timestamp_bits => 64, timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16], topic_bits_per_level => [8, 8, 32, 16],
epoch => 5 epoch => 5,
iteration => #{
iterator_refresh => {every, 5}
}
}), }),
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
Config. Config.