feat: allow to specify message store options

* Keymapper
* Column family name + DB options
* DB write / read options
This commit is contained in:
Andrew Mayorov 2022-12-28 19:02:05 +03:00
parent 7e13753ea5
commit 83467e7174
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 167 additions and 48 deletions

View File

@ -18,6 +18,7 @@
%% API:
-export([open/2, close/1]).
-export([make_keymapper/1]).
-export([store/5]).
-export([make_iterator/3]).
@ -25,11 +26,11 @@
%% Debug/troubleshooting:
-export([
make_message_key/3,
compute_topic_hash/1,
compute_hash_bitmask/1,
hash/2,
combine/3
make_message_key/4,
compute_topic_hash/2,
compute_hash_bitmask/2,
combine/4,
hash/2
]).
-export_type([db/0, iterator/0]).
@ -38,21 +39,69 @@
%% Type declarations
%%================================================================================
%% see rocksdb:db_options()
-type options() :: proplists:proplist().
%% parsed
-type topic() :: list(binary()).
%% TODO granularity?
-type time() :: integer().
%% Number of bits
-type bits() :: non_neg_integer().
%% Key of a RocksDB record.
-type key() :: binary().
%% Distribution of entropy among topic levels.
%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits,
%% and _rest of levels_ (if any) get 16 bits.
-type bits_per_level() :: [bits(), ...].
%% see rocksdb:db_options()
-type db_options() :: proplists:proplist().
%% see rocksdb:cf_options()
-type db_cf_options() :: proplists:proplist().
%% see rocksdb:write_options()
-type db_write_options() :: proplists:proplist().
%% see rocksdb:read_options()
-type db_read_options() :: proplists:proplist().
-type options() :: #{
%% Keymapper.
keymapper := keymapper(),
%% Name and options to use to open specific column family.
column_family => {_Name :: string(), db_cf_options()},
%% Options to use when opening the DB.
open_options => db_options(),
%% Options to use when writing a message to the DB.
write_options => db_write_options(),
%% Options to use when iterating over messages in the DB.
read_options => db_read_options()
}.
-define(DEFAULT_COLUMN_FAMILY, {"default", []}).
-define(DEFAULT_OPEN_OPTIONS, [
{create_if_missing, true},
{create_missing_column_families, true}
]).
-define(DEFAULT_WRITE_OPTIONS, [{sync, true}]).
-define(DEFAULT_READ_OPTIONS, []).
-record(db, {
handle :: rocksdb:db_handle()
handle :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle(),
keymapper :: keymapper(),
write_options = [{sync, true}] :: db_write_options(),
read_options = [] :: db_write_options()
}).
-record(it, {
handle :: rocksdb:itr_handle(),
keymapper :: keymapper(),
next_action :: {seek, binary()} | next,
topic_filter :: emqx_topic:words(),
hash_filter :: integer(),
@ -60,9 +109,17 @@
start_time :: time()
}).
-opaque db() :: #db{}.
% NOTE
% Keymapper decides how to map messages into RocksDB column family keyspace.
-record(keymapper, {
topic_bits :: bits(),
topic_bits_per_level :: bits_per_level(),
timestamp_bits :: bits()
}).
-opaque db() :: #db{}.
-opaque iterator() :: #it{}.
-type keymapper() :: #keymapper{}.
%%================================================================================
%% API funcions
@ -71,9 +128,30 @@
-spec open(file:filename_all(), options()) ->
{ok, db()} | {error, _TODO}.
open(Filename, Options) ->
case rocksdb:open(Filename, [{create_if_missing, true}, Options]) of
{ok, Handle} ->
{ok, #db{handle = Handle}};
CFDescriptors =
case maps:get(column_family, Options, undefined) of
CF = {_Name, _} ->
% TODO
% > When opening a DB in a read-write mode, you need to specify all
% > Column Families that currently exist in a DB. If that's not the case,
% > DB::Open call will return Status::InvalidArgument().
% This probably means that we need the _manager_ (the thing which knows
% about all the column families there is) to hold the responsibility to
% open the database and hold all the handles.
[CF, ?DEFAULT_COLUMN_FAMILY];
undefined ->
[?DEFAULT_COLUMN_FAMILY]
end,
DBOptions = maps:get(open_options, Options, ?DEFAULT_OPEN_OPTIONS),
case rocksdb:open(Filename, DBOptions, CFDescriptors) of
{ok, Handle, [CFHandle | _]} ->
{ok, #db{
handle = Handle,
cf = CFHandle,
keymapper = maps:get(keymapper, Options),
write_options = maps:get(write_options, Options, ?DEFAULT_WRITE_OPTIONS),
read_options = maps:get(read_options, Options, ?DEFAULT_READ_OPTIONS)
}};
Error ->
Error
end.
@ -82,26 +160,44 @@ open(Filename, Options) ->
close(#db{handle = DB}) ->
rocksdb:close(DB).
-spec make_keymapper(Options) -> keymapper() when
Options :: #{
%% Number of bits in a key allocated to a message timestamp.
timestamp_bits := bits(),
%% Number of bits in a key allocated to each level in a message topic.
topic_bits_per_level := bits_per_level()
}.
make_keymapper(Options) ->
TimestampBits = maps:get(timestamp_bits, Options),
TopicBitsPerLevel = maps:get(topic_bits_per_level, Options),
#keymapper{
timestamp_bits = TimestampBits,
topic_bits = lists:sum(TopicBitsPerLevel),
topic_bits_per_level = TopicBitsPerLevel
}.
-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
ok | {error, _TODO}.
store(#db{handle = DB}, MessageID, PublishedAt, Topic, MessagePayload) ->
Key = make_message_key(MessageID, Topic, PublishedAt),
store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) ->
Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
Value = make_message_value(Topic, MessagePayload),
rocksdb:put(DB, Key, Value, [{sync, true}]).
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-spec make_iterator(db(), emqx_topic:words(), time() | earliest) ->
% {error, invalid_start_time}? might just start from the beginning of time
% and call it a day: client violated the contract anyway.
{ok, iterator()} | {error, _TODO}.
make_iterator(#db{handle = DBHandle}, TopicFilter, StartTime) ->
case rocksdb:iterator(DBHandle, []) of
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
{ok, ITHandle} ->
Hash = compute_topic_hash(TopicFilter),
HashBitmask = compute_hash_bitmask(TopicFilter),
Hash = compute_topic_hash(TopicFilter, DB#db.keymapper),
HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper),
HashFilter = Hash band HashBitmask,
InitialSeek = combine(HashFilter, StartTime, <<>>, DB#db.keymapper),
{ok, #it{
handle = ITHandle,
next_action = {seek, combine(HashFilter, StartTime, <<>>)},
keymapper = DB#db.keymapper,
next_action = {seek, InitialSeek},
topic_filter = TopicFilter,
start_time = StartTime,
hash_filter = HashFilter,
@ -116,7 +212,7 @@ next(It = #it{next_action = Action}) ->
case rocksdb:iterator_move(It#it.handle, Action) of
% spec says `{ok, Key}` is also possible but the implementation says it's not
{ok, Key, Value} ->
{TopicHash, PublishedAt} = extract(Key),
{TopicHash, PublishedAt} = extract(Key, It#it.keymapper),
match_next(It, TopicHash, PublishedAt, Value);
{error, invalid_iterator} ->
stop_iteration(It);
@ -128,10 +224,8 @@ next(It = #it{next_action = Action}) ->
%% Internal exports
%%================================================================================
-define(TOPIC_LEVELS_ENTROPY_BITS, [8, 8, 32, 16]).
make_message_key(MessageID, Topic, PublishedAt) ->
combine(compute_topic_hash(Topic), PublishedAt, MessageID).
make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
combine(compute_topic_hash(Topic, Keymapper), PublishedAt, MessageID, Keymapper).
make_message_value(Topic, MessagePayload) ->
term_to_binary({Topic, MessagePayload}).
@ -139,22 +233,33 @@ make_message_value(Topic, MessagePayload) ->
unwrap_message_value(Binary) ->
binary_to_term(Binary).
combine(TopicHash, PublishedAt, MessageID) ->
<<TopicHash:64/integer, PublishedAt:64/integer, MessageID/binary>>.
-spec combine(_TopicHash :: integer(), time(), emqx_guid:guid(), keymapper()) ->
key().
combine(TopicHash, PublishedAt, MessageID, #keymapper{
timestamp_bits = TimestampBits,
topic_bits = TopicBits
}) ->
<<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, MessageID/binary>>.
extract(<<TopicHash:64/integer, PublishedAt:64/integer, _MessageID/binary>>) ->
-spec extract(key(), keymapper()) ->
{_TopicHash :: integer(), time()}.
extract(Key, #keymapper{
timestamp_bits = TimestampBits,
topic_bits = TopicBits
}) ->
<<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, _MessageID/binary>> = Key,
{TopicHash, PublishedAt}.
compute_topic_hash(Topic) ->
compute_topic_hash(Topic, ?TOPIC_LEVELS_ENTROPY_BITS, 0).
compute_topic_hash(Topic, Keymapper) ->
compute_topic_hash(Topic, Keymapper#keymapper.topic_bits_per_level, 0).
hash(Input, Bits) ->
% at most 32 bits
erlang:phash2(Input, 1 bsl Bits).
-spec compute_hash_bitmask(emqx_topic:words()) -> integer().
compute_hash_bitmask(TopicFilter) ->
compute_hash_bitmask(TopicFilter, ?TOPIC_LEVELS_ENTROPY_BITS, 0).
-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer().
compute_hash_bitmask(TopicFilter, Keymapper) ->
compute_hash_bitmask(TopicFilter, Keymapper#keymapper.topic_bits_per_level, 0).
%%================================================================================
%% Internal functions
@ -201,6 +306,7 @@ ones(Bits) ->
match_next(
It = #it{
keymapper = Keymapper,
topic_filter = TopicFilter,
hash_filter = HashFilter,
hash_bitmask = HashBitmask,
@ -222,13 +328,13 @@ match_next(
next(It#it{next_action = next})
end;
true ->
NextAction = {seek, combine(TopicHash, StartTime, <<>>)},
next(It#it{next_action = NextAction});
NextSeek = combine(TopicHash, StartTime, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}});
false ->
case compute_next_seek(TopicHash, HashFilter, HashBitmask) of
case compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper) of
NextHash when is_integer(NextHash) ->
NextAction = {seek, combine(NextHash, StartTime, <<>>)},
next(It#it{next_action = NextAction});
NextSeek = combine(NextHash, StartTime, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}});
none ->
stop_iteration(It)
end
@ -238,9 +344,9 @@ stop_iteration(It) ->
ok = rocksdb:iterator_close(It#it.handle),
none.
compute_next_seek(TopicHash, HashFilter, HashBitmask) ->
compute_next_seek(TopicHash, HashFilter, HashBitmask, ?TOPIC_LEVELS_ENTROPY_BITS).
compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper = #keymapper{}) ->
BitsPerLevel = Keymapper#keymapper.topic_bits_per_level,
compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel);
compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
% NOTE
% Ok, this convoluted mess implements a sort of _increment operation_ for some

View File

@ -136,20 +136,28 @@ parse_topic(Topic) ->
%%
t_prop_topic_hash_computes(_) ->
Keymapper = emqx_replay_message_storage:make_keymapper(#{
topic_bits_per_level => [8, 12, 16, 24],
timestamp_bits => 0
}),
?assert(
proper:quickcheck(
?FORALL(Topic, topic(), begin
Hash = emqx_replay_message_storage:compute_topic_hash(Topic),
Hash = emqx_replay_message_storage:compute_topic_hash(Topic, Keymapper),
is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
end)
)
).
t_prop_hash_bitmask_computes(_) ->
Keymapper = emqx_replay_message_storage:make_keymapper(#{
topic_bits_per_level => [8, 12, 16, 24],
timestamp_bits => 0
}),
?assert(
proper:quickcheck(
?FORALL(TopicFilter, topic_filter(), begin
Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter),
Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper),
is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
end)
)
@ -165,8 +173,9 @@ t_prop_iterate_stored_messages(Config) ->
messages(),
begin
Stream = payload_gen:interleave_streams(Streams),
ok = store_message_stream(DB, Stream)
ok = store_message_stream(DB, Stream),
% TODO actually verify some property
true
end
)
)
@ -194,8 +203,6 @@ topic(EntropyWeights) ->
?LET(
L,
list(1),
% ?SIZED(S, [topic(S * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)])
% [topic(10 * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)]
?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))])
).
@ -242,7 +249,13 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(TC, Config) ->
Filename = filename:join(?MODULE_STRING, atom_to_list(TC)),
ok = filelib:ensure_dir(Filename),
{ok, DB} = emqx_replay_message_storage:open(Filename, []),
{ok, DB} = emqx_replay_message_storage:open(Filename, #{
column_family => {atom_to_list(TC), []},
keymapper => emqx_replay_message_storage:make_keymapper(#{
topic_bits_per_level => [8, 8, 32, 16],
timestamp_bits => 64
})
}),
[{handle, DB} | Config].
end_per_testcase(_TC, Config) ->