feat: implement keyspace partitioning across time

This commit is contained in:
Andrew Mayorov 2022-12-30 13:35:04 +03:00
parent 83467e7174
commit 0cfeee0df7
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 244 additions and 133 deletions

View File

@ -27,9 +27,8 @@
%% Debug/troubleshooting: %% Debug/troubleshooting:
-export([ -export([
make_message_key/4, make_message_key/4,
compute_topic_hash/2, compute_bitstring/3,
compute_hash_bitmask/2, compute_hash_bitmask/2,
combine/4,
hash/2 hash/2
]). ]).
@ -104,19 +103,27 @@
keymapper :: keymapper(), keymapper :: keymapper(),
next_action :: {seek, binary()} | next, next_action :: {seek, binary()} | next,
topic_filter :: emqx_topic:words(), topic_filter :: emqx_topic:words(),
hash_filter :: integer(), hash_bitfilter :: integer(),
hash_bitmask :: integer(), hash_bitmask :: integer(),
start_time :: time() time_bitfilter :: integer(),
time_bitmask :: integer()
}). }).
% NOTE % NOTE
% Keymapper decides how to map messages into RocksDB column family keyspace. % Keymapper decides how to map messages into RocksDB column family keyspace.
-record(keymapper, { -record(keymapper, {
topic_bits :: bits(), source :: [bitsource(), ...],
topic_bits_per_level :: bits_per_level(), bitsize :: bits(),
timestamp_bits :: bits() tau :: non_neg_integer()
}). }).
-type bitsource() ::
%% Consume `_Size` bits from timestamp starting at `_Offset`th bit.
%% TODO consistency
{timestamp, _Offset :: bits(), _Size :: bits()}
%% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash.
| {hash, level | levels, _Size :: bits()}.
-opaque db() :: #db{}. -opaque db() :: #db{}.
-opaque iterator() :: #it{}. -opaque iterator() :: #it{}.
-type keymapper() :: #keymapper{}. -type keymapper() :: #keymapper{}.
@ -162,18 +169,32 @@ close(#db{handle = DB}) ->
-spec make_keymapper(Options) -> keymapper() when -spec make_keymapper(Options) -> keymapper() when
Options :: #{ Options :: #{
%% Number of bits in a key allocated to a message timestamp. %% Number of bits in a message timestamp.
timestamp_bits := bits(), timestamp_bits := bits(),
%% Number of bits in a key allocated to each level in a message topic. %% Number of bits in a key allocated to each level in a message topic.
topic_bits_per_level := bits_per_level() topic_bits_per_level := bits_per_level(),
%% Maximum granularity of iteration over time.
max_tau := time()
}. }.
make_keymapper(Options) -> make_keymapper(#{
TimestampBits = maps:get(timestamp_bits, Options), timestamp_bits := TimestampBits,
TopicBitsPerLevel = maps:get(topic_bits_per_level, Options), topic_bits_per_level := BitsPerLevel,
max_tau := MaxTau
}) ->
TimestampLSBs = floor(math:log2(MaxTau)),
TimestampMSBs = TimestampBits - TimestampLSBs,
NLevels = length(BitsPerLevel),
{LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
Source = lists:flatten([
{timestamp, TimestampLSBs, TimestampMSBs},
[{hash, level, Bits} || Bits <- LevelBits],
{hash, levels, TailLevelsBits},
[{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
]),
#keymapper{ #keymapper{
timestamp_bits = TimestampBits, source = Source,
topic_bits = lists:sum(TopicBitsPerLevel), bitsize = lists:sum([S || {_, _, S} <- Source]),
topic_bits_per_level = TopicBitsPerLevel tau = 1 bsl TimestampLSBs
}. }.
-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
@ -190,18 +211,21 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
{ok, ITHandle} -> {ok, ITHandle} ->
Hash = compute_topic_hash(TopicFilter, DB#db.keymapper), Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper),
HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper), HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper),
HashFilter = Hash band HashBitmask, TimeBitmask = compute_time_bitmask(DB#db.keymapper),
InitialSeek = combine(HashFilter, StartTime, <<>>, DB#db.keymapper), HashBitfilter = Bitstring band HashBitmask,
TimeBitfilter = Bitstring band TimeBitmask,
InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper),
{ok, #it{ {ok, #it{
handle = ITHandle, handle = ITHandle,
keymapper = DB#db.keymapper, keymapper = DB#db.keymapper,
next_action = {seek, InitialSeek}, next_action = {seek, InitialSeek},
topic_filter = TopicFilter, topic_filter = TopicFilter,
start_time = StartTime, hash_bitfilter = HashBitfilter,
hash_filter = HashFilter, hash_bitmask = HashBitmask,
hash_bitmask = HashBitmask time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}}; }};
Err -> Err ->
Err Err
@ -212,8 +236,8 @@ next(It = #it{next_action = Action}) ->
case rocksdb:iterator_move(It#it.handle, Action) of case rocksdb:iterator_move(It#it.handle, Action) of
% spec says `{ok, Key}` is also possible but the implementation says it's not % spec says `{ok, Key}` is also possible but the implementation says it's not
{ok, Key, Value} -> {ok, Key, Value} ->
{TopicHash, PublishedAt} = extract(Key, It#it.keymapper), Bitstring = extract(Key, It#it.keymapper),
match_next(It, TopicHash, PublishedAt, Value); match_next(It, Bitstring, Value);
{error, invalid_iterator} -> {error, invalid_iterator} ->
stop_iteration(It); stop_iteration(It);
{error, iterator_closed} -> {error, iterator_closed} ->
@ -225,7 +249,7 @@ next(It = #it{next_action = Action}) ->
%%================================================================================ %%================================================================================
make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
combine(compute_topic_hash(Topic, Keymapper), PublishedAt, MessageID, Keymapper). combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
make_message_value(Topic, MessagePayload) -> make_message_value(Topic, MessagePayload) ->
term_to_binary({Topic, MessagePayload}). term_to_binary({Topic, MessagePayload}).
@ -233,61 +257,74 @@ make_message_value(Topic, MessagePayload) ->
unwrap_message_value(Binary) -> unwrap_message_value(Binary) ->
binary_to_term(Binary). binary_to_term(Binary).
-spec combine(_TopicHash :: integer(), time(), emqx_guid:guid(), keymapper()) -> -spec combine(_Bitstring :: integer(), emqx_guid:guid(), keymapper()) ->
key(). key().
combine(TopicHash, PublishedAt, MessageID, #keymapper{ combine(Bitstring, MessageID, #keymapper{bitsize = Size}) ->
timestamp_bits = TimestampBits, <<Bitstring:Size/integer, MessageID/binary>>.
topic_bits = TopicBits
}) ->
<<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, MessageID/binary>>.
-spec extract(key(), keymapper()) -> -spec extract(key(), keymapper()) ->
{_TopicHash :: integer(), time()}. _Bitstring :: integer().
extract(Key, #keymapper{ extract(Key, #keymapper{bitsize = Size}) ->
timestamp_bits = TimestampBits, <<Bitstring:Size/integer, _MessageID/binary>> = Key,
topic_bits = TopicBits Bitstring.
}) ->
<<TopicHash:TopicBits/integer, PublishedAt:TimestampBits/integer, _MessageID/binary>> = Key,
{TopicHash, PublishedAt}.
compute_topic_hash(Topic, Keymapper) -> -spec compute_bitstring(topic(), time(), keymapper()) -> integer().
compute_topic_hash(Topic, Keymapper#keymapper.topic_bits_per_level, 0). compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) ->
compute_bitstring(Topic, Timestamp, Source, 0).
-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer().
compute_hash_bitmask(TopicFilter, #keymapper{source = Source}) ->
compute_hash_bitmask(TopicFilter, Source, 0).
-spec compute_time_bitmask(keymapper()) -> integer().
compute_time_bitmask(#keymapper{source = Source}) ->
compute_time_bitmask(Source, 0).
hash(Input, Bits) -> hash(Input, Bits) ->
% at most 32 bits % at most 32 bits
erlang:phash2(Input, 1 bsl Bits). erlang:phash2(Input, 1 bsl Bits).
-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer().
compute_hash_bitmask(TopicFilter, Keymapper) ->
compute_hash_bitmask(TopicFilter, Keymapper#keymapper.topic_bits_per_level, 0).
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
compute_topic_hash(LevelsRest, [Bits], Acc) -> compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) ->
Hash = hash(LevelsRest, Bits), I = (Timestamp bsr Offset) band ones(Size),
Acc bsl Bits + Hash; compute_bitstring(Topic, Timestamp, Rest, (Acc bsl Size) + I);
compute_topic_hash([], [Bits | BitsRest], Acc) -> compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) ->
Hash = hash(<<"/">>, Bits), I = hash(<<"/">>, Size),
compute_topic_hash([], BitsRest, Acc bsl Bits + Hash); compute_bitstring([], Timestamp, Rest, (Acc bsl Size) + I);
compute_topic_hash([Level | LevelsRest], [Bits | BitsRest], Acc) -> compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) ->
Hash = hash(Level, Bits), I = hash(Level, Size),
compute_topic_hash(LevelsRest, BitsRest, Acc bsl Bits + Hash). compute_bitstring(Tail, Timestamp, Rest, (Acc bsl Size) + I);
compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) ->
I = hash(Tail, Size),
compute_bitstring(Tail, Timestamp, Rest, (Acc bsl Size) + I);
compute_bitstring(_, _, [], Acc) ->
Acc.
compute_hash_bitmask(['#'], BitsPerLevel, Acc) -> compute_hash_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) ->
Acc bsl lists:sum(BitsPerLevel) + 0; compute_hash_bitmask(Filter, Rest, (Acc bsl Size) + 0);
compute_hash_bitmask(['+' | LevelsRest], [Bits | BitsRest], Acc) -> compute_hash_bitmask(['#'], [{hash, _, Size} | Rest], Acc) ->
compute_hash_bitmask(LevelsRest, BitsRest, Acc bsl Bits + 0); compute_hash_bitmask(['#'], Rest, (Acc bsl Size) + 0);
compute_hash_bitmask(_, [Bits], Acc) -> compute_hash_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) ->
Acc bsl Bits + ones(Bits); compute_hash_bitmask(Tail, Rest, (Acc bsl Size) + 0);
compute_hash_bitmask([], [Bits | BitsRest], Acc) -> compute_hash_bitmask([], [{hash, level, Size} | Rest], Acc) ->
compute_hash_bitmask([], BitsRest, Acc bsl Bits + ones(Bits)); compute_hash_bitmask([], Rest, (Acc bsl Size) + ones(Size));
compute_hash_bitmask([_ | LevelsRest], [Bits | BitsRest], Acc) -> compute_hash_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
compute_hash_bitmask(LevelsRest, BitsRest, Acc bsl Bits + ones(Bits)); compute_hash_bitmask(Tail, Rest, (Acc bsl Size) + ones(Size));
compute_hash_bitmask(_, [{hash, levels, Size} | Rest], Acc) ->
compute_hash_bitmask([], Rest, (Acc bsl Size) + ones(Size));
compute_hash_bitmask(_, [], Acc) -> compute_hash_bitmask(_, [], Acc) ->
Acc. Acc.
compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) ->
compute_time_bitmask(Rest, (Acc bsl Size) + ones(Size));
compute_time_bitmask([{hash, _, Size} | Rest], Acc) ->
compute_time_bitmask(Rest, (Acc bsl Size) + 0);
compute_time_bitmask([], Acc) ->
Acc.
ones(Bits) -> ones(Bits) ->
1 bsl Bits - 1. 1 bsl Bits - 1.
@ -308,16 +345,16 @@ match_next(
It = #it{ It = #it{
keymapper = Keymapper, keymapper = Keymapper,
topic_filter = TopicFilter, topic_filter = TopicFilter,
hash_filter = HashFilter, hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask, hash_bitmask = HashBitmask,
start_time = StartTime time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}, },
TopicHash, Bitstring,
PublishedAt,
Value Value
) -> ) ->
HashMatches = (TopicHash band It#it.hash_bitmask) == It#it.hash_filter, HashMatches = (Bitstring band HashBitmask) == HashBitfilter,
TimeMatches = PublishedAt >= It#it.start_time, TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter,
case HashMatches of case HashMatches of
true when TimeMatches -> true when TimeMatches ->
{Topic, MessagePayload} = unwrap_message_value(Value), {Topic, MessagePayload} = unwrap_message_value(Value),
@ -327,13 +364,20 @@ match_next(
false -> false ->
next(It#it{next_action = next}) next(It#it{next_action = next})
end; end;
true -> true when not TimeMatches ->
NextSeek = combine(TopicHash, StartTime, <<>>, Keymapper), NextBitstring = (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter,
NextSeek = combine(NextBitstring, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}}); next(It#it{next_action = {seek, NextSeek}});
false -> false ->
case compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper) of % _ ->
NextHash when is_integer(NextHash) -> case compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) of
NextSeek = combine(NextHash, StartTime, <<>>, Keymapper), NextBitstring when is_integer(NextBitstring) ->
% ct:pal("Bitstring = ~32.16.0B", [Bitstring]),
% ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]),
% ct:pal("HBitmask = ~32.16.0B", [HashBitmask]),
% ct:pal("TBitmask = ~32.16.0B", [TimeBitmask]),
% ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]),
NextSeek = combine(NextBitstring, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}}); next(It#it{next_action = {seek, NextSeek}});
none -> none ->
stop_iteration(It) stop_iteration(It)
@ -344,10 +388,12 @@ stop_iteration(It) ->
ok = rocksdb:iterator_close(It#it.handle), ok = rocksdb:iterator_close(It#it.handle),
none. none.
compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper = #keymapper{}) -> compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
BitsPerLevel = Keymapper#keymapper.topic_bits_per_level, Sources = Keymapper#keymapper.source,
compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel); Size = Keymapper#keymapper.bitsize,
compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) -> compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
% NOTE % NOTE
% Ok, this convoluted mess implements a sort of _increment operation_ for some % Ok, this convoluted mess implements a sort of _increment operation_ for some
% strange number in variable bit-width base. There are `Levels` "digits", those % strange number in variable bit-width base. There are `Levels` "digits", those
@ -356,66 +402,117 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
% with exacly one possible value). % with exacly one possible value).
% TODO make at least remotely readable / optimize later % TODO make at least remotely readable / optimize later
Result = zipfoldr3( Result = zipfoldr3(
fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) -> fun(Source, Substring, Filter, LBitmask, Offset, {Carry, Acc}) ->
case LevelMask of case Source of
0 when Carry == 0 -> {hash, _, _} when LBitmask =:= 0, Carry =:= 0 ->
{0, Acc + (LevelHash bsl Shift)}; {0, Acc + (Substring bsl Offset)};
0 -> {hash, _, S} when LBitmask =:= 0 ->
LevelHash1 = LevelHash + Carry, Substring1 = Substring + Carry,
NextCarry = LevelHash1 bsr Bits, Carry1 = Substring1 bsr S,
NextAcc = (LevelHash1 band ones(Bits)) bsl Shift, Acc1 = (Substring1 band ones(S)) bsl Offset,
{NextCarry, NextAcc}; {Carry1, Acc1};
_ when (LevelHash + Carry) == Filter -> {hash, _, _} when LBitmask =/= 0, (Substring + Carry) =:= Filter ->
{0, Acc + (Filter bsl Shift)}; {0, Acc + (Filter bsl Offset)};
_ when (LevelHash + Carry) > Filter -> {hash, _, _} when LBitmask =/= 0, (Substring + Carry) > Filter ->
{1, Filter bsl Shift}; {1, Filter bsl Offset};
_ -> {hash, _, _} when LBitmask =/= 0 ->
{0, Filter bsl Shift} {0, Filter bsl Offset};
{timestamp, _, _} when Carry =:= 0 ->
{0, Acc + (Substring bsl Offset)};
{timestamp, _, S} ->
Substring1 = Substring + Carry,
Carry1 = Substring1 bsr S,
Acc1 = (Substring1 band ones(S)) bsl Offset,
{Carry1, Acc1}
end end
end, end,
% TODO
% We can put carry bit into the `Acc`'s MSB instead of wrapping it into a tuple.
% This could save us a heap alloc which might be imporatant in a hot path.
{1, 0}, {1, 0},
TopicHash, Bitstring,
HashFilter, HashBitfilter,
HashBitmask, HashBitmask,
BitsPerLevel Size,
Sources
), ),
case Result of case Result of
{_, {_Carry = 0, Next}} -> {_Carry = 0, Next} ->
Next bor HashFilter; Next bor (HashBitfilter band HashBitmask);
{_, {_Carry = 1, _}} -> {_Carry = 1, _} ->
% we got "carried away" past the range, time to stop iteration % we got "carried away" past the range, time to stop iteration
none none
end. end.
zipfoldr3(_FoldFun, Acc, _, _, _, []) -> zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
{0, Acc}; Acc;
zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) -> zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
{Shift, AccNext} = zipfoldr3( OffsetNext = Offset - S,
FoldFun, AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest),
Acc, FoldFun(
I1, Source,
I2, substring(I1, OffsetNext, S),
I3, substring(I2, OffsetNext, S),
Rest substring(I3, OffsetNext, S),
), OffsetNext,
{ AccNext
Shift + Bits, ).
FoldFun(
(I1 bsr Shift) band ones(Bits), substring(I, Offset, Size) ->
(I2 bsr Shift) band ones(Bits), (I bsr Offset) band ones(Size).
(I3 bsr Shift) band ones(Bits),
Bits,
Shift,
AccNext
)
}.
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
make_keymapper_test_() ->
[
?_assertEqual(
#keymapper{
source = [
{timestamp, 9, 23},
{hash, level, 2},
{hash, level, 4},
{hash, levels, 8},
{timestamp, 0, 9}
],
bitsize = 46,
tau = 512
},
make_keymapper(#{
timestamp_bits => 32,
topic_bits_per_level => [2, 4, 8],
max_tau => 1000
})
),
?_assertEqual(
#keymapper{
source = [
{timestamp, 0, 32},
{hash, levels, 16}
],
bitsize = 48,
tau = 1
},
make_keymapper(#{
timestamp_bits => 32,
topic_bits_per_level => [16],
max_tau => 1
})
)
].
compute_test_bitmask(TopicFilter) -> compute_test_bitmask(TopicFilter) ->
compute_hash_bitmask(TopicFilter, [3, 4, 5, 2], 0). compute_hash_bitmask(
TopicFilter,
[
{hash, level, 3},
{hash, level, 4},
{hash, level, 5},
{hash, levels, 2}
],
0
).
bitmask_test_() -> bitmask_test_() ->
[ [
@ -464,8 +561,19 @@ wildcard_bitmask_test_() ->
%% Key3 = |123|999|679|001| Seek = 1 |123|000|678|000| eos %% Key3 = |123|999|679|001| Seek = 1 |123|000|678|000| eos
%% Key4 = |125|011|179|017| Seek = 1 |123|000|678|000| eos %% Key4 = |125|011|179|017| Seek = 1 |123|000|678|000| eos
compute_test_next_seek(TopicHash, HashFilter, HashBitmask) -> compute_test_next_seek(Bitstring, Bitfilter, HBitmask) ->
compute_next_seek(TopicHash, HashFilter, HashBitmask, [8, 8, 16, 12]). compute_next_seek(
Bitstring,
Bitfilter,
HBitmask,
[
{hash, level, 8},
{hash, level, 8},
{hash, level, 16},
{hash, levels, 12}
],
8 + 8 + 16 + 12
).
next_seek_test_() -> next_seek_test_() ->
[ [

View File

@ -52,7 +52,7 @@ t_iterate(Config) ->
begin begin
{ok, It} = emqx_replay_message_storage:make_iterator(DB, Topic, 0), {ok, It} = emqx_replay_message_storage:make_iterator(DB, Topic, 0),
Values = iterate(It), Values = iterate(It),
?assertEqual(Values, lists:map(fun integer_to_binary/1, Timestamps)) ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
end end
|| Topic <- Topics || Topic <- Topics
], ],
@ -137,28 +137,30 @@ parse_topic(Topic) ->
t_prop_topic_hash_computes(_) -> t_prop_topic_hash_computes(_) ->
Keymapper = emqx_replay_message_storage:make_keymapper(#{ Keymapper = emqx_replay_message_storage:make_keymapper(#{
timestamp_bits => 32,
topic_bits_per_level => [8, 12, 16, 24], topic_bits_per_level => [8, 12, 16, 24],
timestamp_bits => 0 max_tau => 10000
}), }),
?assert( ?assert(
proper:quickcheck( proper:quickcheck(
?FORALL(Topic, topic(), begin ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
Hash = emqx_replay_message_storage:compute_topic_hash(Topic, Keymapper), BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) is_integer(BS) andalso (BS < (1 bsl 92))
end) end)
) )
). ).
t_prop_hash_bitmask_computes(_) -> t_prop_hash_bitmask_computes(_) ->
Keymapper = emqx_replay_message_storage:make_keymapper(#{ Keymapper = emqx_replay_message_storage:make_keymapper(#{
topic_bits_per_level => [8, 12, 16, 24], timestamp_bits => 16,
timestamp_bits => 0 topic_bits_per_level => [8, 12, 16],
max_tau => 100
}), }),
?assert( ?assert(
proper:quickcheck( proper:quickcheck(
?FORALL(TopicFilter, topic_filter(), begin ?FORALL(TopicFilter, topic_filter(), begin
Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper), Mask = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper),
is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
end) end)
) )
). ).
@ -252,8 +254,9 @@ init_per_testcase(TC, Config) ->
{ok, DB} = emqx_replay_message_storage:open(Filename, #{ {ok, DB} = emqx_replay_message_storage:open(Filename, #{
column_family => {atom_to_list(TC), []}, column_family => {atom_to_list(TC), []},
keymapper => emqx_replay_message_storage:make_keymapper(#{ keymapper => emqx_replay_message_storage:make_keymapper(#{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16], topic_bits_per_level => [8, 8, 32, 16],
timestamp_bits => 64 max_tau => 5
}) })
}), }),
[{handle, DB} | Config]. [{handle, DB} | Config].