diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 78dd941be..23cedb04c 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -38,7 +38,7 @@ -type db_write_options() :: proplists:proplist(). --type cf_refs() :: [{_CFName :: string(), _CFRef :: reference()}]. +-type cf_refs() :: [{string(), rocksdb:cf_handle()}]. -record(generation, { %% Module that handles data for the generation @@ -142,12 +142,12 @@ terminate(_Reason, #s{db = DB, zone = Zone}) -> %% Internal functions %%================================================================================ --spec read_metadata(#s{}) -> #s{}. +-spec read_metadata(#s{}) -> ok. read_metadata(S) -> %% TODO: just a mockup to make the existing tests pass read_metadata(0, S). --spec read_metadata(gen_id(), #s{}) -> #s{}. +-spec read_metadata(gen_id(), #s{}) -> ok. read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), DB = Mod:open(DBHandle, GenId, CFs, Data), @@ -206,12 +206,12 @@ open_db(Zone) -> -spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> #generation{}. schema_get_gen(DBHandle, GenId) -> - {ok, Bin} = rocksdb:get(DBHandle, gen_rocksdb_key(GenId), ?SCHEMA_READ_OPTS), + {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), binary_to_term(Bin). -spec schema_put_gen(rocksdb:db_handle(), gen_id(), #generation{}) -> ok | {error, _}. schema_put_gen(DBHandle, GenId, Gen) -> - rocksdb:put(DBHandle, gen_rocksdb_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). + rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). -spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined. schema_get_current(DBHandle) -> @@ -226,8 +226,8 @@ schema_get_current(DBHandle) -> schema_put_current(DBHandle, GenId) -> rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS). --spec gen_rocksdb_key(integer()) -> string(). -gen_rocksdb_key(N) -> +-spec schema_gen_key(integer()) -> binary(). +schema_gen_key(N) -> <<"gen", N:32>>. -undef(CURRENT_GEN). diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 484cd7ae6..a622cdde0 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -105,7 +105,7 @@ hash/2 ]). --export_type([db/0, iterator/0]). +-export_type([db/0, iterator/0, schema/0]). -compile({inline, [ones/1, bitwise_concat/3]}). @@ -131,9 +131,12 @@ -type bits_per_level() :: [bits(), ...]. -type options() :: #{ - %% Keymapper. - keymapper := keymapper(), - %% Name and options to use to open specific column family. + %% Number of bits in a message timestamp. + timestamp_bits := bits(), + %% Number of bits in a key allocated to each level in a message topic. + topic_bits_per_level := bits_per_level(), + %% Maximum granularity of iteration over time. + epoch := time(), cf_options => emqx_replay_local_store:db_cf_options() }. @@ -151,7 +154,7 @@ %% record when the database is reopened -record(schema, {keymapper :: keymapper()}). --type schema() :: #schema{}. +-opaque schema() :: #schema{}. -record(db, { handle :: rocksdb:db_handle(), @@ -196,8 +199,9 @@ %%================================================================================ %% Create a new column family for the generation and a serializable representation of the schema --spec create_new(rocksdb:db_handle(), emqx_replay_local_store:generation_id(), options()) -> +-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) -> {schema(), emqx_replay_local_store:cf_refs()}. +%{schema(), emqx_replay_local_store:cf_refs()}. create_new(DBHandle, GenId, Options) -> CFName = data_cf(GenId), CFOptions = maps:get(cf_options, Options, []), @@ -208,30 +212,20 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( rocksdb:db_handle(), - emqx_replay_local_store:generation_id(), - [{_CFName :: string(), _CFHandle :: reference()}], + emqx_replay_local_store:gen_id(), + emqx_replay_local_store:cf_refs(), schema() ) -> db(). open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> - CFHandle = proplists:get_value(data_cf(GenId), CFs), - % assert - true = is_reference(CFHandle), + {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ handle = DBHandle, cf = CFHandle, keymapper = Keymapper }. --spec make_keymapper(Options) -> keymapper() when - Options :: #{ - %% Number of bits in a message timestamp. - timestamp_bits := bits(), - %% Number of bits in a key allocated to each level in a message topic. - topic_bits_per_level := bits_per_level(), - %% Maximum granularity of iteration over time. - epoch := time() - }. +-spec make_keymapper(options()) -> keymapper(). make_keymapper(#{ timestamp_bits := TimestampBits, topic_bits_per_level := BitsPerLevel, @@ -313,7 +307,7 @@ make_message_value(Topic, MessagePayload) -> unwrap_message_value(Binary) -> binary_to_term(Binary). --spec combine(_Bitstring :: integer(), emqx_guid:guid(), keymapper()) -> +-spec combine(_Bitstring :: integer(), emqx_guid:guid() | <<>>, keymapper()) -> key(). combine(Bitstring, MessageID, #keymapper{bitsize = Size}) -> <>. @@ -521,7 +515,7 @@ substring(I, Offset, Size) -> (I bsr Offset) band ones(Size). %% @doc Generate a column family ID for the MQTT messages --spec data_cf(emqx_replay_local_store:gen_id()) -> string(). +-spec data_cf(emqx_replay_local_store:gen_id()) -> [char()]. data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index f604f8d63..a0424541a 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -25,12 +25,12 @@ -define(ZONE, zone(?FUNCTION_NAME)). %% Smoke test for opening and reopening the database -t_open(Config) -> +t_open(_Config) -> ok = emqx_replay_local_store_sup:stop_zone(?ZONE), {ok, _} = emqx_replay_local_store_sup:start_zone(?ZONE). %% Smoke test of store function -t_store(Config) -> +t_store(_Config) -> MessageID = emqx_guid:gen(), PublishedAt = 1000, Topic = [<<"foo">>, <<"bar">>], @@ -38,7 +38,7 @@ t_store(Config) -> ?assertMatch(ok, emqx_replay_local_store:store(?ZONE, MessageID, PublishedAt, Topic, Payload)). %% Smoke test for iteration through a concrete topic -t_iterate(Config) -> +t_iterate(_Config) -> %% Prepare data: Topics = [[<<"foo">>, <<"bar">>], [<<"foo">>, <<"bar">>, <<"baz">>], [<<"a">>]], Timestamps = lists:seq(1, 10), @@ -64,7 +64,7 @@ t_iterate(Config) -> ok. %% Smoke test for iteration with wildcard topic filter -t_iterate_wildcard(Config) -> +t_iterate_wildcard(_Config) -> %% Prepare data: Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], Timestamps = lists:seq(1, 10),