From 83467e717414a665e8765ff39f03f2c4cddff390 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 28 Dec 2022 19:02:05 +0300 Subject: [PATCH] feat: allow to specify message store options * Keymapper * Column family name + DB options * DB write / read options --- .../src/emqx_replay_message_storage.erl | 190 ++++++++++++++---- .../test/emqx_replay_storage_SUITE.erl | 25 ++- 2 files changed, 167 insertions(+), 48 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 30a9859b8..b591157f4 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -18,6 +18,7 @@ %% API: -export([open/2, close/1]). +-export([make_keymapper/1]). -export([store/5]). -export([make_iterator/3]). @@ -25,11 +26,11 @@ %% Debug/troubleshooting: -export([ - make_message_key/3, - compute_topic_hash/1, - compute_hash_bitmask/1, - hash/2, - combine/3 + make_message_key/4, + compute_topic_hash/2, + compute_hash_bitmask/2, + combine/4, + hash/2 ]). -export_type([db/0, iterator/0]). @@ -38,21 +39,69 @@ %% Type declarations %%================================================================================ -%% see rocksdb:db_options() --type options() :: proplists:proplist(). - %% parsed -type topic() :: list(binary()). %% TODO granularity? -type time() :: integer(). +%% Number of bits +-type bits() :: non_neg_integer(). + +%% Key of a RocksDB record. +-type key() :: binary(). + +%% Distribution of entropy among topic levels. +%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits, +%% and _rest of levels_ (if any) get 16 bits. +-type bits_per_level() :: [bits(), ...]. + +%% see rocksdb:db_options() +-type db_options() :: proplists:proplist(). + +%% see rocksdb:cf_options() +-type db_cf_options() :: proplists:proplist(). + +%% see rocksdb:write_options() +-type db_write_options() :: proplists:proplist(). + +%% see rocksdb:read_options() +-type db_read_options() :: proplists:proplist(). + +-type options() :: #{ + %% Keymapper. + keymapper := keymapper(), + %% Name and options to use to open specific column family. + column_family => {_Name :: string(), db_cf_options()}, + %% Options to use when opening the DB. + open_options => db_options(), + %% Options to use when writing a message to the DB. + write_options => db_write_options(), + %% Options to use when iterating over messages in the DB. + read_options => db_read_options() +}. + +-define(DEFAULT_COLUMN_FAMILY, {"default", []}). + +-define(DEFAULT_OPEN_OPTIONS, [ + {create_if_missing, true}, + {create_missing_column_families, true} +]). + +-define(DEFAULT_WRITE_OPTIONS, [{sync, true}]). +-define(DEFAULT_READ_OPTIONS, []). + -record(db, { - handle :: rocksdb:db_handle() + handle :: rocksdb:db_handle(), + cf :: rocksdb:cf_handle(), + keymapper :: keymapper(), + write_options = [{sync, true}] :: db_write_options(), + read_options = [] :: db_write_options() }). -record(it, { handle :: rocksdb:itr_handle(), + keymapper :: keymapper(), next_action :: {seek, binary()} | next, topic_filter :: emqx_topic:words(), hash_filter :: integer(), @@ -60,9 +109,17 @@ start_time :: time() }). --opaque db() :: #db{}. +% NOTE +% Keymapper decides how to map messages into RocksDB column family keyspace. +-record(keymapper, { + topic_bits :: bits(), + topic_bits_per_level :: bits_per_level(), + timestamp_bits :: bits() +}). +-opaque db() :: #db{}. -opaque iterator() :: #it{}. +-type keymapper() :: #keymapper{}. %%================================================================================ %% API funcions @@ -71,9 +128,30 @@ -spec open(file:filename_all(), options()) -> {ok, db()} | {error, _TODO}. open(Filename, Options) -> - case rocksdb:open(Filename, [{create_if_missing, true}, Options]) of - {ok, Handle} -> - {ok, #db{handle = Handle}}; + CFDescriptors = + case maps:get(column_family, Options, undefined) of + CF = {_Name, _} -> + % TODO + % > When opening a DB in a read-write mode, you need to specify all + % > Column Families that currently exist in a DB. If that's not the case, + % > DB::Open call will return Status::InvalidArgument(). + % This probably means that we need the _manager_ (the thing which knows + % about all the column families there is) to hold the responsibility to + % open the database and hold all the handles. + [CF, ?DEFAULT_COLUMN_FAMILY]; + undefined -> + [?DEFAULT_COLUMN_FAMILY] + end, + DBOptions = maps:get(open_options, Options, ?DEFAULT_OPEN_OPTIONS), + case rocksdb:open(Filename, DBOptions, CFDescriptors) of + {ok, Handle, [CFHandle | _]} -> + {ok, #db{ + handle = Handle, + cf = CFHandle, + keymapper = maps:get(keymapper, Options), + write_options = maps:get(write_options, Options, ?DEFAULT_WRITE_OPTIONS), + read_options = maps:get(read_options, Options, ?DEFAULT_READ_OPTIONS) + }}; Error -> Error end. @@ -82,26 +160,44 @@ open(Filename, Options) -> close(#db{handle = DB}) -> rocksdb:close(DB). +-spec make_keymapper(Options) -> keymapper() when + Options :: #{ + %% Number of bits in a key allocated to a message timestamp. + timestamp_bits := bits(), + %% Number of bits in a key allocated to each level in a message topic. + topic_bits_per_level := bits_per_level() + }. +make_keymapper(Options) -> + TimestampBits = maps:get(timestamp_bits, Options), + TopicBitsPerLevel = maps:get(topic_bits_per_level, Options), + #keymapper{ + timestamp_bits = TimestampBits, + topic_bits = lists:sum(TopicBitsPerLevel), + topic_bits_per_level = TopicBitsPerLevel + }. + -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> ok | {error, _TODO}. -store(#db{handle = DB}, MessageID, PublishedAt, Topic, MessagePayload) -> - Key = make_message_key(MessageID, Topic, PublishedAt), +store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) -> + Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), Value = make_message_value(Topic, MessagePayload), - rocksdb:put(DB, Key, Value, [{sync, true}]). + rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). -spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. -make_iterator(#db{handle = DBHandle}, TopicFilter, StartTime) -> - case rocksdb:iterator(DBHandle, []) of +make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> + case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - Hash = compute_topic_hash(TopicFilter), - HashBitmask = compute_hash_bitmask(TopicFilter), + Hash = compute_topic_hash(TopicFilter, DB#db.keymapper), + HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper), HashFilter = Hash band HashBitmask, + InitialSeek = combine(HashFilter, StartTime, <<>>, DB#db.keymapper), {ok, #it{ handle = ITHandle, - next_action = {seek, combine(HashFilter, StartTime, <<>>)}, + keymapper = DB#db.keymapper, + next_action = {seek, InitialSeek}, topic_filter = TopicFilter, start_time = StartTime, hash_filter = HashFilter, @@ -116,7 +212,7 @@ next(It = #it{next_action = Action}) -> case rocksdb:iterator_move(It#it.handle, Action) of % spec says `{ok, Key}` is also possible but the implementation says it's not {ok, Key, Value} -> - {TopicHash, PublishedAt} = extract(Key), + {TopicHash, PublishedAt} = extract(Key, It#it.keymapper), match_next(It, TopicHash, PublishedAt, Value); {error, invalid_iterator} -> stop_iteration(It); @@ -128,10 +224,8 @@ next(It = #it{next_action = Action}) -> %% Internal exports %%================================================================================ --define(TOPIC_LEVELS_ENTROPY_BITS, [8, 8, 32, 16]). - -make_message_key(MessageID, Topic, PublishedAt) -> - combine(compute_topic_hash(Topic), PublishedAt, MessageID). +make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> + combine(compute_topic_hash(Topic, Keymapper), PublishedAt, MessageID, Keymapper). make_message_value(Topic, MessagePayload) -> term_to_binary({Topic, MessagePayload}). @@ -139,22 +233,33 @@ make_message_value(Topic, MessagePayload) -> unwrap_message_value(Binary) -> binary_to_term(Binary). -combine(TopicHash, PublishedAt, MessageID) -> - <>. +-spec combine(_TopicHash :: integer(), time(), emqx_guid:guid(), keymapper()) -> + key(). +combine(TopicHash, PublishedAt, MessageID, #keymapper{ + timestamp_bits = TimestampBits, + topic_bits = TopicBits +}) -> + <>. -extract(<>) -> +-spec extract(key(), keymapper()) -> + {_TopicHash :: integer(), time()}. +extract(Key, #keymapper{ + timestamp_bits = TimestampBits, + topic_bits = TopicBits +}) -> + <> = Key, {TopicHash, PublishedAt}. -compute_topic_hash(Topic) -> - compute_topic_hash(Topic, ?TOPIC_LEVELS_ENTROPY_BITS, 0). +compute_topic_hash(Topic, Keymapper) -> + compute_topic_hash(Topic, Keymapper#keymapper.topic_bits_per_level, 0). hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). --spec compute_hash_bitmask(emqx_topic:words()) -> integer(). -compute_hash_bitmask(TopicFilter) -> - compute_hash_bitmask(TopicFilter, ?TOPIC_LEVELS_ENTROPY_BITS, 0). +-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer(). +compute_hash_bitmask(TopicFilter, Keymapper) -> + compute_hash_bitmask(TopicFilter, Keymapper#keymapper.topic_bits_per_level, 0). %%================================================================================ %% Internal functions @@ -201,6 +306,7 @@ ones(Bits) -> match_next( It = #it{ + keymapper = Keymapper, topic_filter = TopicFilter, hash_filter = HashFilter, hash_bitmask = HashBitmask, @@ -222,13 +328,13 @@ match_next( next(It#it{next_action = next}) end; true -> - NextAction = {seek, combine(TopicHash, StartTime, <<>>)}, - next(It#it{next_action = NextAction}); + NextSeek = combine(TopicHash, StartTime, <<>>, Keymapper), + next(It#it{next_action = {seek, NextSeek}}); false -> - case compute_next_seek(TopicHash, HashFilter, HashBitmask) of + case compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper) of NextHash when is_integer(NextHash) -> - NextAction = {seek, combine(NextHash, StartTime, <<>>)}, - next(It#it{next_action = NextAction}); + NextSeek = combine(NextHash, StartTime, <<>>, Keymapper), + next(It#it{next_action = {seek, NextSeek}}); none -> stop_iteration(It) end @@ -238,9 +344,9 @@ stop_iteration(It) -> ok = rocksdb:iterator_close(It#it.handle), none. -compute_next_seek(TopicHash, HashFilter, HashBitmask) -> - compute_next_seek(TopicHash, HashFilter, HashBitmask, ?TOPIC_LEVELS_ENTROPY_BITS). - +compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper = #keymapper{}) -> + BitsPerLevel = Keymapper#keymapper.topic_bits_per_level, + compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel); compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) -> % NOTE % Ok, this convoluted mess implements a sort of _increment operation_ for some diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index 761ac041a..b1a8a396b 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -136,20 +136,28 @@ parse_topic(Topic) -> %% t_prop_topic_hash_computes(_) -> + Keymapper = emqx_replay_message_storage:make_keymapper(#{ + topic_bits_per_level => [8, 12, 16, 24], + timestamp_bits => 0 + }), ?assert( proper:quickcheck( ?FORALL(Topic, topic(), begin - Hash = emqx_replay_message_storage:compute_topic_hash(Topic), + Hash = emqx_replay_message_storage:compute_topic_hash(Topic, Keymapper), is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) end) ) ). t_prop_hash_bitmask_computes(_) -> + Keymapper = emqx_replay_message_storage:make_keymapper(#{ + topic_bits_per_level => [8, 12, 16, 24], + timestamp_bits => 0 + }), ?assert( proper:quickcheck( ?FORALL(TopicFilter, topic_filter(), begin - Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter), + Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper), is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) end) ) @@ -165,8 +173,9 @@ t_prop_iterate_stored_messages(Config) -> messages(), begin Stream = payload_gen:interleave_streams(Streams), - ok = store_message_stream(DB, Stream) + ok = store_message_stream(DB, Stream), % TODO actually verify some property + true end ) ) @@ -194,8 +203,6 @@ topic(EntropyWeights) -> ?LET( L, list(1), - % ?SIZED(S, [topic(S * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)]) - % [topic(10 * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)] ?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))]) ). @@ -242,7 +249,13 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_testcase(TC, Config) -> Filename = filename:join(?MODULE_STRING, atom_to_list(TC)), ok = filelib:ensure_dir(Filename), - {ok, DB} = emqx_replay_message_storage:open(Filename, []), + {ok, DB} = emqx_replay_message_storage:open(Filename, #{ + column_family => {atom_to_list(TC), []}, + keymapper => emqx_replay_message_storage:make_keymapper(#{ + topic_bits_per_level => [8, 8, 32, 16], + timestamp_bits => 64 + }) + }), [{handle, DB} | Config]. end_per_testcase(_TC, Config) ->