Merge pull request #9669 from keynslug/chore/rocksdb-replay-queue/simplify-next-seek
chore: attempt to make compute_next_seek's logic clearer
This commit is contained in:
commit
3de384e806
|
@ -24,6 +24,7 @@
|
|||
/apps/emqx_prometheus/ @JimMoen @ieQu1
|
||||
/apps/emqx_psk/ @lafirest @thalesmg @terry-xiaoyu
|
||||
/apps/emqx_resource/ @terry-xiaoyu @thalesmg
|
||||
/apps/emqx_replay/ @ieQu1
|
||||
/apps/emqx_retainer/ @lafirest @ieQu1 @thalesmg
|
||||
/apps/emqx_rule_engine/ @terry-xiaoyu @HJianBo @kjellwinblad
|
||||
/apps/emqx_slow_subs/ @lafirest @HJianBo
|
||||
|
|
|
@ -101,7 +101,10 @@
|
|||
-export([
|
||||
make_message_key/4,
|
||||
compute_bitstring/3,
|
||||
compute_hash_bitmask/2,
|
||||
compute_topic_bitmask/2,
|
||||
compute_next_seek/4,
|
||||
compute_time_seek/3,
|
||||
compute_topic_seek/4,
|
||||
hash/2
|
||||
]).
|
||||
|
||||
|
@ -140,16 +143,6 @@
|
|||
cf_options => emqx_replay_local_store:db_cf_options()
|
||||
}.
|
||||
|
||||
-define(DEFAULT_COLUMN_FAMILY, {"default", []}).
|
||||
|
||||
-define(DEFAULT_OPEN_OPTIONS, [
|
||||
{create_if_missing, true},
|
||||
{create_missing_column_families, true}
|
||||
]).
|
||||
|
||||
-define(DEFAULT_WRITE_OPTIONS, [{sync, true}]).
|
||||
-define(DEFAULT_READ_OPTIONS, []).
|
||||
|
||||
%% Persistent configuration of the generation, it is used to create db
|
||||
%% record when the database is reopened
|
||||
-record(schema, {keymapper :: keymapper()}).
|
||||
|
@ -231,12 +224,12 @@ make_keymapper(#{
|
|||
topic_bits_per_level := BitsPerLevel,
|
||||
epoch := MaxEpoch
|
||||
}) ->
|
||||
TimestampLSBs = floor(math:log2(MaxEpoch)),
|
||||
TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))),
|
||||
TimestampMSBs = TimestampBits - TimestampLSBs,
|
||||
NLevels = length(BitsPerLevel),
|
||||
{LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel),
|
||||
Source = lists:flatten([
|
||||
{timestamp, TimestampLSBs, TimestampMSBs},
|
||||
[{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0],
|
||||
[{hash, level, Bits} || Bits <- LevelBits],
|
||||
{hash, levels, TailLevelsBits},
|
||||
[{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0]
|
||||
|
@ -262,7 +255,7 @@ make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime
|
|||
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
||||
{ok, ITHandle} ->
|
||||
Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper),
|
||||
HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper),
|
||||
HashBitmask = compute_topic_bitmask(TopicFilter, DB#db.keymapper),
|
||||
TimeBitmask = compute_time_bitmask(DB#db.keymapper),
|
||||
HashBitfilter = Bitstring band HashBitmask,
|
||||
TimeBitfilter = Bitstring band TimeBitmask,
|
||||
|
@ -322,9 +315,9 @@ extract(Key, #keymapper{bitsize = Size}) ->
|
|||
compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) ->
|
||||
compute_bitstring(Topic, Timestamp, Source, 0).
|
||||
|
||||
-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer().
|
||||
compute_hash_bitmask(TopicFilter, #keymapper{source = Source}) ->
|
||||
compute_hash_bitmask(TopicFilter, Source, 0).
|
||||
-spec compute_topic_bitmask(emqx_topic:words(), keymapper()) -> integer().
|
||||
compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) ->
|
||||
compute_topic_bitmask(TopicFilter, Source, 0).
|
||||
|
||||
-spec compute_time_bitmask(keymapper()) -> integer().
|
||||
compute_time_bitmask(#keymapper{source = Source}) ->
|
||||
|
@ -353,19 +346,19 @@ compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) ->
|
|||
compute_bitstring(_, _, [], Acc) ->
|
||||
Acc.
|
||||
|
||||
compute_hash_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) ->
|
||||
compute_hash_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size));
|
||||
compute_hash_bitmask(['#'], [{hash, _, Size} | Rest], Acc) ->
|
||||
compute_hash_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size));
|
||||
compute_hash_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) ->
|
||||
compute_hash_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size));
|
||||
compute_hash_bitmask([], [{hash, level, Size} | Rest], Acc) ->
|
||||
compute_hash_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||
compute_hash_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
|
||||
compute_hash_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||
compute_hash_bitmask(_, [{hash, levels, Size} | Rest], Acc) ->
|
||||
compute_hash_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||
compute_hash_bitmask(_, [], Acc) ->
|
||||
compute_topic_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) ->
|
||||
compute_topic_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size));
|
||||
compute_topic_bitmask(['#'], [{hash, _, Size} | Rest], Acc) ->
|
||||
compute_topic_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size));
|
||||
compute_topic_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) ->
|
||||
compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size));
|
||||
compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) ->
|
||||
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||
compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
|
||||
compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||
compute_topic_bitmask(_, [{hash, levels, Size} | Rest], Acc) ->
|
||||
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||
compute_topic_bitmask(_, [], Acc) ->
|
||||
Acc.
|
||||
|
||||
compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) ->
|
||||
|
@ -408,8 +401,8 @@ match_next(
|
|||
) ->
|
||||
HashMatches = (Bitstring band HashBitmask) == HashBitfilter,
|
||||
TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter,
|
||||
case HashMatches of
|
||||
true when TimeMatches ->
|
||||
case HashMatches and TimeMatches of
|
||||
true ->
|
||||
{Topic, MessagePayload} = unwrap_message_value(Value),
|
||||
case emqx_topic:match(Topic, TopicFilter) of
|
||||
true ->
|
||||
|
@ -417,13 +410,8 @@ match_next(
|
|||
false ->
|
||||
next(It#it{next_action = next})
|
||||
end;
|
||||
true when not TimeMatches ->
|
||||
NextBitstring = (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter,
|
||||
NextSeek = combine(NextBitstring, <<>>, Keymapper),
|
||||
next(It#it{next_action = {seek, NextSeek}});
|
||||
false ->
|
||||
% _ ->
|
||||
case compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) of
|
||||
case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of
|
||||
NextBitstring when is_integer(NextBitstring) ->
|
||||
% ct:pal("Bitstring = ~32.16.0B", [Bitstring]),
|
||||
% ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]),
|
||||
|
@ -441,62 +429,128 @@ stop_iteration(It) ->
|
|||
ok = rocksdb:iterator_close(It#it.handle),
|
||||
none.
|
||||
|
||||
compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
|
||||
%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
|
||||
compute_next_seek(_HashMatches = false, _, Bitstring, It) ->
|
||||
NextBitstring = compute_topic_seek(
|
||||
Bitstring,
|
||||
It#it.hash_bitfilter,
|
||||
It#it.hash_bitmask,
|
||||
It#it.keymapper
|
||||
),
|
||||
case NextBitstring of
|
||||
none ->
|
||||
none;
|
||||
_ ->
|
||||
TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter,
|
||||
compute_next_seek(true, TimeMatches, NextBitstring, It)
|
||||
end;
|
||||
%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
|
||||
compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) ->
|
||||
compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask);
|
||||
compute_next_seek(true, true, Bitstring, _It) ->
|
||||
Bitstring.
|
||||
|
||||
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
|
||||
% Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
|
||||
(Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
|
||||
|
||||
%% Find the closest bitstring which is:
|
||||
%% * greater than `Bitstring`,
|
||||
%% * and falls into the hash space defined by `HashBitfilter`.
|
||||
%% Note that the result can end up "back" in time and out of the time range.
|
||||
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
|
||||
Sources = Keymapper#keymapper.source,
|
||||
Size = Keymapper#keymapper.bitsize,
|
||||
compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
|
||||
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size).
|
||||
|
||||
compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
|
||||
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
|
||||
% NOTE
|
||||
% Ok, this convoluted mess implements a sort of _increment operation_ for some
|
||||
% strange number in variable bit-width base. There are `Levels` "digits", those
|
||||
% with `0` level bitmask have `BitsPerLevel` bit-width and those with `111...`
|
||||
% level bitmask have in some sense 0 bits (because they are fixed "digits"
|
||||
% with exacly one possible value).
|
||||
% TODO make at least remotely readable / optimize later
|
||||
Result = zipfoldr3(
|
||||
fun(Source, Substring, Filter, LBitmask, Offset, {Carry, Acc}) ->
|
||||
% We're iterating through `Substring` here, in lockstep with `HashBitfilter`
|
||||
% and`HashBitmask`, starting from least signigicant bits. Each bitsource in
|
||||
% `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
|
||||
% bits long which we interpret as a "digit". There are 2 flavors of those
|
||||
% "digits":
|
||||
% * regular digit with 2^S possible values,
|
||||
% * degenerate digit with exactly 1 possible value U (represented with 0).
|
||||
% Our goal here is to find a successor of `Bistring` and perform a kind of
|
||||
% digit-by-digit addition operation with carry propagation.
|
||||
NextSeek = zipfoldr3(
|
||||
fun(Source, Substring, Filter, LBitmask, Offset, Acc) ->
|
||||
case Source of
|
||||
{hash, _, _} when LBitmask =:= 0, Carry =:= 0 ->
|
||||
{0, Acc + (Substring bsl Offset)};
|
||||
{hash, _, S} when LBitmask =:= 0 ->
|
||||
Substring1 = Substring + Carry,
|
||||
Carry1 = Substring1 bsr S,
|
||||
Acc1 = (Substring1 band ones(S)) bsl Offset,
|
||||
{Carry1, Acc1};
|
||||
{hash, _, _} when LBitmask =/= 0, (Substring + Carry) =:= Filter ->
|
||||
{0, Acc + (Filter bsl Offset)};
|
||||
{hash, _, _} when LBitmask =/= 0, (Substring + Carry) > Filter ->
|
||||
{1, Filter bsl Offset};
|
||||
{hash, _, _} when LBitmask =/= 0 ->
|
||||
{0, Filter bsl Offset};
|
||||
{timestamp, _, _} when Carry =:= 0 ->
|
||||
{0, Acc + (Substring bsl Offset)};
|
||||
% Regular case
|
||||
bitwise_add_digit(Substring, Acc, S, Offset);
|
||||
{hash, _, _} when LBitmask =/= 0, Substring < Filter ->
|
||||
% Degenerate case, I_digit < U, no overflow.
|
||||
% Successor is `U bsl Offset` which is equivalent to 0.
|
||||
0;
|
||||
{hash, _, S} when LBitmask =/= 0, Substring > Filter ->
|
||||
% Degenerate case, I_digit > U, overflow.
|
||||
% Successor is `(1 bsl Size + U) bsl Offset`.
|
||||
overflow_digit(S, Offset);
|
||||
{hash, _, S} when LBitmask =/= 0 ->
|
||||
% Degenerate case, I_digit = U
|
||||
% Perform digit addition with I_digit = 0, assuming "digit" has
|
||||
% 0 bits of information (but is `S` bits long at the same time).
|
||||
% This will overflow only if the result of previous iteration
|
||||
% was an overflow.
|
||||
bitwise_add_digit(0, Acc, 0, S, Offset);
|
||||
{timestamp, _, S} ->
|
||||
Substring1 = Substring + Carry,
|
||||
Carry1 = Substring1 bsr S,
|
||||
Acc1 = (Substring1 band ones(S)) bsl Offset,
|
||||
{Carry1, Acc1}
|
||||
% Regular case
|
||||
bitwise_add_digit(Substring, Acc, S, Offset)
|
||||
end
|
||||
end,
|
||||
% TODO
|
||||
% We can put carry bit into the `Acc`'s MSB instead of wrapping it into a tuple.
|
||||
% This could save us a heap alloc which might be imporatant in a hot path.
|
||||
{1, 0},
|
||||
0,
|
||||
Bitstring,
|
||||
HashBitfilter,
|
||||
HashBitmask,
|
||||
Size,
|
||||
Sources
|
||||
),
|
||||
case Result of
|
||||
{_Carry = 0, Next} ->
|
||||
Next bor (HashBitfilter band HashBitmask);
|
||||
{_Carry = 1, _} ->
|
||||
% we got "carried away" past the range, time to stop iteration
|
||||
case NextSeek bsr Size of
|
||||
_Carry = 0 ->
|
||||
% Found the successor.
|
||||
% We need to recover values of those degenerate digits which we
|
||||
% represented with 0 during digit-by-digit iteration.
|
||||
NextSeek bor (HashBitfilter band HashBitmask);
|
||||
_Carry = 1 ->
|
||||
% We got "carried away" past the range, time to stop iteration.
|
||||
none
|
||||
end.
|
||||
|
||||
bitwise_add_digit(Digit, Number, Width, Offset) ->
|
||||
bitwise_add_digit(Digit, Number, Width, Width, Offset).
|
||||
|
||||
%% Add "digit" (represented with integer `Digit`) to the `Number` assuming
|
||||
%% this digit starts at `Offset` bits in `Number` and is `Width` bits long.
|
||||
%% Perform an overflow if the result of addition would not fit into `Bits`
|
||||
%% bits.
|
||||
bitwise_add_digit(Digit, Number, Bits, Width, Offset) ->
|
||||
Sum = (Digit bsl Offset) + Number,
|
||||
case (Sum bsr Offset) < (1 bsl Bits) of
|
||||
true -> Sum;
|
||||
false -> overflow_digit(Width, Offset)
|
||||
end.
|
||||
|
||||
%% Constuct a number which denotes an overflow of digit that starts at
|
||||
%% `Offset` bits and is `Width` bits long.
|
||||
overflow_digit(Width, Offset) ->
|
||||
(1 bsl Width) bsl Offset.
|
||||
|
||||
%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least
|
||||
%% significant bits first.
|
||||
%%
|
||||
%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are
|
||||
%% specified in `Sources` list, in order from most significant bits to least
|
||||
%% significant. Each iteration calls `FoldFun` with:
|
||||
%% * bitsource that was used to extract sub-bitstrings,
|
||||
%% * 3 sub-bitstrings in integer representation,
|
||||
%% * bit offset into integers,
|
||||
%% * current accumulator.
|
||||
-spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) ->
|
||||
Acc
|
||||
when
|
||||
FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc).
|
||||
zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) ->
|
||||
Acc;
|
||||
zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) ->
|
||||
|
@ -561,7 +615,7 @@ make_keymapper_test_() ->
|
|||
].
|
||||
|
||||
compute_test_bitmask(TopicFilter) ->
|
||||
compute_hash_bitmask(
|
||||
compute_topic_bitmask(
|
||||
TopicFilter,
|
||||
[
|
||||
{hash, level, 3},
|
||||
|
@ -619,8 +673,8 @@ wildcard_bitmask_test_() ->
|
|||
%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos
|
||||
%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos
|
||||
|
||||
compute_test_next_seek(Bitstring, Bitfilter, HBitmask) ->
|
||||
compute_next_seek(
|
||||
compute_test_topic_seek(Bitstring, Bitfilter, HBitmask) ->
|
||||
compute_topic_seek(
|
||||
Bitstring,
|
||||
Bitfilter,
|
||||
HBitmask,
|
||||
|
@ -637,7 +691,7 @@ next_seek_test_() ->
|
|||
[
|
||||
?_assertMatch(
|
||||
none,
|
||||
compute_test_next_seek(
|
||||
compute_test_topic_seek(
|
||||
16#FD_42_4242_043,
|
||||
16#FD_42_4242_042,
|
||||
16#FF_FF_FFFF_FFF
|
||||
|
@ -645,7 +699,7 @@ next_seek_test_() ->
|
|||
),
|
||||
?_assertMatch(
|
||||
16#FD_11_0678_000,
|
||||
compute_test_next_seek(
|
||||
compute_test_topic_seek(
|
||||
16#FD_11_0108_121,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
|
@ -653,7 +707,7 @@ next_seek_test_() ->
|
|||
),
|
||||
?_assertMatch(
|
||||
16#FD_12_0678_000,
|
||||
compute_test_next_seek(
|
||||
compute_test_topic_seek(
|
||||
16#FD_11_0679_919,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
|
@ -661,7 +715,7 @@ next_seek_test_() ->
|
|||
),
|
||||
?_assertMatch(
|
||||
none,
|
||||
compute_test_next_seek(
|
||||
compute_test_topic_seek(
|
||||
16#FD_FF_0679_001,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
|
@ -669,7 +723,7 @@ next_seek_test_() ->
|
|||
),
|
||||
?_assertMatch(
|
||||
none,
|
||||
compute_test_next_seek(
|
||||
compute_test_topic_seek(
|
||||
16#FE_11_0179_017,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
|
|
|
@ -154,7 +154,7 @@ t_prop_topic_hash_computes(_) ->
|
|||
)
|
||||
).
|
||||
|
||||
t_prop_hash_bitmask_computes(_) ->
|
||||
t_prop_topic_bitmask_computes(_) ->
|
||||
Keymapper = emqx_replay_message_storage:make_keymapper(#{
|
||||
timestamp_bits => 16,
|
||||
topic_bits_per_level => [8, 12, 16],
|
||||
|
@ -163,13 +163,13 @@ t_prop_hash_bitmask_computes(_) ->
|
|||
?assert(
|
||||
proper:quickcheck(
|
||||
?FORALL(TopicFilter, topic_filter(), begin
|
||||
Mask = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper),
|
||||
Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper),
|
||||
is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
|
||||
end)
|
||||
)
|
||||
).
|
||||
|
||||
t_prop_iterate_stored_messages(Config) ->
|
||||
t_prop_iterate_stored_messages(_) ->
|
||||
?assertEqual(
|
||||
true,
|
||||
proper:quickcheck(
|
||||
|
@ -256,7 +256,7 @@ init_per_testcase(TC, Config) ->
|
|||
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TC, Config) ->
|
||||
end_per_testcase(_TC, _Config) ->
|
||||
ok = application:stop(emqx_replay).
|
||||
|
||||
zone(TC) ->
|
||||
|
|
Loading…
Reference in New Issue