chore: attempt to make `compute_next_seek`'s logic clearer

This commit is contained in:
Andrew Mayorov 2023-01-03 18:29:06 +03:00
parent 917c8635e1
commit e248a18fd4
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 121 additions and 57 deletions

View File

@ -102,6 +102,9 @@
make_message_key/4, make_message_key/4,
compute_bitstring/3, compute_bitstring/3,
compute_hash_bitmask/2, compute_hash_bitmask/2,
compute_next_seek/4,
compute_time_seek/3,
compute_hash_seek/4,
hash/2 hash/2
]). ]).
@ -231,12 +234,12 @@ make_keymapper(#{
topic_bits_per_level := BitsPerLevel, topic_bits_per_level := BitsPerLevel,
epoch := MaxEpoch epoch := MaxEpoch
}) -> }) ->
TimestampLSBs = floor(math:log2(MaxEpoch)), TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))),
TimestampMSBs = TimestampBits - TimestampLSBs, TimestampMSBs = TimestampBits - TimestampLSBs,
NLevels = length(BitsPerLevel), NLevels = length(BitsPerLevel),
{LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel), {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
Source = lists:flatten([ Source = lists:flatten([
{timestamp, TimestampLSBs, TimestampMSBs}, [{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0],
[{hash, level, Bits} || Bits <- LevelBits], [{hash, level, Bits} || Bits <- LevelBits],
{hash, levels, TailLevelsBits}, {hash, levels, TailLevelsBits},
[{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0] [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
@ -408,8 +411,8 @@ match_next(
) -> ) ->
HashMatches = (Bitstring band HashBitmask) == HashBitfilter, HashMatches = (Bitstring band HashBitmask) == HashBitfilter,
TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter, TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter,
case HashMatches of case HashMatches and TimeMatches of
true when TimeMatches -> true ->
{Topic, MessagePayload} = unwrap_message_value(Value), {Topic, MessagePayload} = unwrap_message_value(Value),
case emqx_topic:match(Topic, TopicFilter) of case emqx_topic:match(Topic, TopicFilter) of
true -> true ->
@ -417,13 +420,8 @@ match_next(
false -> false ->
next(It#it{next_action = next}) next(It#it{next_action = next})
end; end;
true when not TimeMatches ->
NextBitstring = (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter,
NextSeek = combine(NextBitstring, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}});
false -> false ->
% _ -> case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of
case compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) of
NextBitstring when is_integer(NextBitstring) -> NextBitstring when is_integer(NextBitstring) ->
% ct:pal("Bitstring = ~32.16.0B", [Bitstring]), % ct:pal("Bitstring = ~32.16.0B", [Bitstring]),
% ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]), % ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]),
@ -441,62 +439,128 @@ stop_iteration(It) ->
ok = rocksdb:iterator_close(It#it.handle), ok = rocksdb:iterator_close(It#it.handle),
none. none.
compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> %% `Bitstring` is out of the hash space defined by `HashBitfilter`.
compute_next_seek(_HashMatches = false, _, Bitstring, It) ->
NextBitstring = compute_hash_seek(
Bitstring,
It#it.hash_bitfilter,
It#it.hash_bitmask,
It#it.keymapper
),
case NextBitstring of
none ->
none;
_ ->
TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter,
compute_next_seek(true, TimeMatches, NextBitstring, It)
end;
%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) ->
compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask);
compute_next_seek(true, true, Bitstring, _It) ->
Bitstring.
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
% Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
(Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
%% Find the closest bitstring which is:
%% * greater than `Bitstring`,
%% * and falls into the hash space defined by `HashBitfilter`.
%% Note that the result can end up "back" in time and out of the time range.
compute_hash_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
Sources = Keymapper#keymapper.source, Sources = Keymapper#keymapper.source,
Size = Keymapper#keymapper.bitsize, Size = Keymapper#keymapper.bitsize,
compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size). compute_hash_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> compute_hash_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
% NOTE % NOTE
% Ok, this convoluted mess implements a sort of _increment operation_ for some % We're iterating through `Substring` here, in lockstep with `HashBitfilter`
% strange number in variable bit-width base. There are `Levels` "digits", those % and`HashBitmask`, starting from least signigicant bits. Each bitsource in
% with `0` level bitmask have `BitsPerLevel` bit-width and those with `111...` % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
% level bitmask have in some sense 0 bits (because they are fixed "digits" % bits long which we interpret as a "digit". There are 2 flavors of those
% with exacly one possible value). % "digits":
% TODO make at least remotely readable / optimize later % * regular digit with 2^S possible values,
Result = zipfoldr3( % * degenerate digit with exactly 1 possible value U (represented with 0).
fun(Source, Substring, Filter, LBitmask, Offset, {Carry, Acc}) -> % Our goal here is to find a successor of `Bistring` and perform a kind of
% digit-by-digit addition operation with carry propagation.
NextSeek = zipfoldr3(
fun(Source, Substring, Filter, LBitmask, Offset, Acc) ->
case Source of case Source of
{hash, _, _} when LBitmask =:= 0, Carry =:= 0 ->
{0, Acc + (Substring bsl Offset)};
{hash, _, S} when LBitmask =:= 0 -> {hash, _, S} when LBitmask =:= 0 ->
Substring1 = Substring + Carry, % Regular case
Carry1 = Substring1 bsr S, bitwise_add_digit(Substring, Acc, S, Offset);
Acc1 = (Substring1 band ones(S)) bsl Offset, {hash, _, _} when LBitmask =/= 0, Substring < Filter ->
{Carry1, Acc1}; % Degenerate case, I_digit < U, no overflow.
{hash, _, _} when LBitmask =/= 0, (Substring + Carry) =:= Filter -> % Successor is `U bsl Offset` which is equivalent to 0.
{0, Acc + (Filter bsl Offset)}; 0;
{hash, _, _} when LBitmask =/= 0, (Substring + Carry) > Filter -> {hash, _, S} when LBitmask =/= 0, Substring > Filter ->
{1, Filter bsl Offset}; % Degenerate case, I_digit > U, overflow.
{hash, _, _} when LBitmask =/= 0 -> % Successor is `(1 bsl Size + U) bsl Offset`.
{0, Filter bsl Offset}; overflow_digit(S, Offset);
{timestamp, _, _} when Carry =:= 0 -> {hash, _, S} when LBitmask =/= 0 ->
{0, Acc + (Substring bsl Offset)}; % Degenerate case, I_digit = U
% Perform digit addition with I_digit = 0, assuming "digit" has
% 0 bits of information (but is `S` bits long at the same time).
% This will overflow only if the result of previous iteration
% was an overflow.
bitwise_add_digit(0, Acc, 0, S, Offset);
{timestamp, _, S} -> {timestamp, _, S} ->
Substring1 = Substring + Carry, % Regular case
Carry1 = Substring1 bsr S, bitwise_add_digit(Substring, Acc, S, Offset)
Acc1 = (Substring1 band ones(S)) bsl Offset,
{Carry1, Acc1}
end end
end, end,
% TODO 0,
% We can put carry bit into the `Acc`'s MSB instead of wrapping it into a tuple.
% This could save us a heap alloc which might be imporatant in a hot path.
{1, 0},
Bitstring, Bitstring,
HashBitfilter, HashBitfilter,
HashBitmask, HashBitmask,
Size, Size,
Sources Sources
), ),
case Result of case NextSeek bsr Size of
{_Carry = 0, Next} -> _Carry = 0 ->
Next bor (HashBitfilter band HashBitmask); % Found the successor.
{_Carry = 1, _} -> % We need to recover values of those degenerate digits which we
% we got "carried away" past the range, time to stop iteration % represented with 0 during digit-by-digit iteration.
NextSeek bor (HashBitfilter band HashBitmask);
_Carry = 1 ->
% We got "carried away" past the range, time to stop iteration.
none none
end. end.
bitwise_add_digit(Digit, Number, Width, Offset) ->
bitwise_add_digit(Digit, Number, Width, Width, Offset).
%% Add "digit" (represented with integer `Digit`) to the `Number` assuming
%% this digit starts at `Offset` bits in `Number` and is `Width` bits long.
%% Perform an overflow if the result of addition would not fit into `Bits`
%% bits.
bitwise_add_digit(Digit, Number, Bits, Width, Offset) ->
Sum = (Digit bsl Offset) + Number,
case (Sum bsr Offset) < (1 bsl Bits) of
true -> Sum;
false -> overflow_digit(Width, Offset)
end.
%% Constuct a number which denotes an overflow of digit that starts at
%% `Offset` bits and is `Width` bits long.
overflow_digit(Width, Offset) ->
(1 bsl Width) bsl Offset.
%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least
%% significant bits first.
%%
%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are
%% specified in `Sources` list, in order from most significant bits to least
%% significant. Each iteration calls `FoldFun` with:
%% * bitsource that was used to extract sub-bitstrings,
%% * 3 sub-bitstrings in integer representation,
%% * bit offset into integers,
%% * current accumulator.
-spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) ->
Acc
when
FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc).
zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) -> zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
Acc; Acc;
zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) -> zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
@ -619,8 +683,8 @@ wildcard_bitmask_test_() ->
%% Key3 = |123|999|679|001| Seek = 1 |123|000|678|000| eos %% Key3 = |123|999|679|001| Seek = 1 |123|000|678|000| eos
%% Key4 = |125|011|179|017| Seek = 1 |123|000|678|000| eos %% Key4 = |125|011|179|017| Seek = 1 |123|000|678|000| eos
compute_test_next_seek(Bitstring, Bitfilter, HBitmask) -> compute_test_hash_seek(Bitstring, Bitfilter, HBitmask) ->
compute_next_seek( compute_hash_seek(
Bitstring, Bitstring,
Bitfilter, Bitfilter,
HBitmask, HBitmask,
@ -637,7 +701,7 @@ next_seek_test_() ->
[ [
?_assertMatch( ?_assertMatch(
none, none,
compute_test_next_seek( compute_test_hash_seek(
16#FD_42_4242_043, 16#FD_42_4242_043,
16#FD_42_4242_042, 16#FD_42_4242_042,
16#FF_FF_FFFF_FFF 16#FF_FF_FFFF_FFF
@ -645,7 +709,7 @@ next_seek_test_() ->
), ),
?_assertMatch( ?_assertMatch(
16#FD_11_0678_000, 16#FD_11_0678_000,
compute_test_next_seek( compute_test_hash_seek(
16#FD_11_0108_121, 16#FD_11_0108_121,
16#FD_00_0678_000, 16#FD_00_0678_000,
16#FF_00_FFFF_000 16#FF_00_FFFF_000
@ -653,7 +717,7 @@ next_seek_test_() ->
), ),
?_assertMatch( ?_assertMatch(
16#FD_12_0678_000, 16#FD_12_0678_000,
compute_test_next_seek( compute_test_hash_seek(
16#FD_11_0679_919, 16#FD_11_0679_919,
16#FD_00_0678_000, 16#FD_00_0678_000,
16#FF_00_FFFF_000 16#FF_00_FFFF_000
@ -661,7 +725,7 @@ next_seek_test_() ->
), ),
?_assertMatch( ?_assertMatch(
none, none,
compute_test_next_seek( compute_test_hash_seek(
16#FD_FF_0679_001, 16#FD_FF_0679_001,
16#FD_00_0678_000, 16#FD_00_0678_000,
16#FF_00_FFFF_000 16#FF_00_FFFF_000
@ -669,7 +733,7 @@ next_seek_test_() ->
), ),
?_assertMatch( ?_assertMatch(
none, none,
compute_test_next_seek( compute_test_hash_seek(
16#FE_11_0179_017, 16#FE_11_0179_017,
16#FD_00_0678_000, 16#FD_00_0678_000,
16#FF_00_FFFF_000 16#FF_00_FFFF_000

View File

@ -169,7 +169,7 @@ t_prop_hash_bitmask_computes(_) ->
) )
). ).
t_prop_iterate_stored_messages(Config) -> t_prop_iterate_stored_messages(_) ->
?assertEqual( ?assertEqual(
true, true,
proper:quickcheck( proper:quickcheck(
@ -256,7 +256,7 @@ init_per_testcase(TC, Config) ->
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
Config. Config.
end_per_testcase(_TC, Config) -> end_per_testcase(_TC, _Config) ->
ok = application:stop(emqx_replay). ok = application:stop(emqx_replay).
zone(TC) -> zone(TC) ->