diff --git a/apps/emqx_replay/src/emqx_replay_conf.erl b/apps/emqx_replay/src/emqx_replay_conf.erl index 8f7105312..57ba87ddf 100644 --- a/apps/emqx_replay/src/emqx_replay_conf.erl +++ b/apps/emqx_replay/src/emqx_replay_conf.erl @@ -20,27 +20,51 @@ %% API: -export([zone_config/1, db_options/0]). +-export([zone_iteration_options/1]). +-export([default_iteration_options/0]). + %%================================================================================ %% API funcions %%================================================================================ -define(APP, emqx_replay). --spec zone_config(emqx_types:zone()) -> - {module(), term()}. +-type zone() :: emqx_types:zone(). +-type config() :: + {emqx_replay_message_storage, emqx_replay_message_storage:options()} + | {module(), _Options}. + +-spec zone_config(zone()) -> config(). zone_config(Zone) -> - DefaultConf = - #{ - timestamp_bits => 64, - topic_bits_per_level => [8, 8, 8, 32, 16], - epoch => 5 - }, - DefaultZoneConfig = application:get_env( - ?APP, default_zone_config, {emqx_replay_message_storage, DefaultConf} - ), + DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()), Zones = application:get_env(?APP, zone_config, #{}), maps:get(Zone, Zones, DefaultZoneConfig). +-spec zone_iteration_options(zone()) -> emqx_replay_message_storage:iteration_options(). +zone_iteration_options(Zone) -> + case zone_config(Zone) of + {emqx_replay_message_storage, Config} -> + maps:get(iteration, Config, default_iteration_options()); + {_Module, _} -> + default_iteration_options() + end. + +-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options(). +default_iteration_options() -> + {emqx_replay_message_storage, Config} = default_zone_config(), + maps:get(iteration, Config). + +-spec default_zone_config() -> config(). +default_zone_config() -> + {emqx_replay_message_storage, #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 8, 32, 16], + epoch => 5, + iteration => #{ + iterator_refresh => {every, 100} + } + }}. + -spec db_options() -> emqx_replay_local_store:db_options(). db_options() -> application:get_env(?APP, db_options, []). diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 23cedb04c..15a400a92 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -150,7 +150,7 @@ read_metadata(S) -> -spec read_metadata(gen_id(), #s{}) -> ok. read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), - DB = Mod:open(DBHandle, GenId, CFs, Data), + DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), meta_put(Zone, GenId, Gen#generation{data = DB}). -spec ensure_current_generation(#s{}) -> #s{}. diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index b1c5f1806..dd6c41598 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -90,7 +90,7 @@ %%================================================================================ %% API: --export([create_new/3, open/4]). +-export([create_new/3, open/5]). -export([make_keymapper/1]). -export([store/5]). @@ -123,6 +123,9 @@ -export_type([db/0, iterator/0, schema/0]). +-export_type([options/0]). +-export_type([iteration_options/0]). + -compile( {inline, [ bitwise_concat/3, @@ -162,6 +165,8 @@ %% Maximum granularity of iteration over time. epoch := time(), + iteration => iteration_options(), + cf_options => emqx_replay_local_store:db_cf_options() }. @@ -180,12 +185,12 @@ -opaque schema() :: #schema{}. -record(db, { + zone :: emqx_types:zone(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), - read_options = [] :: emqx_replay_local_store:db_write_options(), - iteration_options = #{} :: iteration_options() + read_options = [] :: emqx_replay_local_store:db_write_options() }). -record(it, { @@ -233,7 +238,6 @@ %% Create a new column family for the generation and a serializable representation of the schema -spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) -> {schema(), emqx_replay_local_store:cf_refs()}. -%{schema(), emqx_replay_local_store:cf_refs()}. create_new(DBHandle, GenId, Options) -> CFName = data_cf(GenId), CFOptions = maps:get(cf_options, Options, []), @@ -243,15 +247,17 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( + emqx_types:zone(), rocksdb:db_handle(), emqx_replay_local_store:gen_id(), emqx_replay_local_store:cf_refs(), schema() ) -> db(). -open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Zone, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ + zone = Zone, handle = DBHandle, cf = CFHandle, keymapper = Keymapper @@ -289,8 +295,8 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, -spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, TopicFilter, StartTime) -> - % TODO wire it up somehow to the upper level - make_iterator(DB, TopicFilter, StartTime, DB#db.iteration_options). + Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), + make_iterator(DB, TopicFilter, StartTime, Options). -spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index 5a1bb59f4..eee802e69 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -164,7 +164,10 @@ init_per_testcase(TC, Config) -> ok = set_zone_config(zone(TC), #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 32, 16], - epoch => 5 + epoch => 5, + iteration => #{ + iterator_refresh => {every, 5} + } }), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config.