diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3033b7965..a353766d4 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -24,6 +24,7 @@ /apps/emqx_prometheus/ @JimMoen @ieQu1 /apps/emqx_psk/ @lafirest @thalesmg @terry-xiaoyu /apps/emqx_resource/ @terry-xiaoyu @thalesmg +/apps/emqx_replay/ @ieQu1 /apps/emqx_retainer/ @lafirest @ieQu1 @thalesmg /apps/emqx_rule_engine/ @terry-xiaoyu @HJianBo @kjellwinblad /apps/emqx_slow_subs/ @lafirest @HJianBo diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index a622cdde0..dafaf8c68 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -101,7 +101,10 @@ -export([ make_message_key/4, compute_bitstring/3, - compute_hash_bitmask/2, + compute_topic_bitmask/2, + compute_next_seek/4, + compute_time_seek/3, + compute_topic_seek/4, hash/2 ]). @@ -140,16 +143,6 @@ cf_options => emqx_replay_local_store:db_cf_options() }. --define(DEFAULT_COLUMN_FAMILY, {"default", []}). - --define(DEFAULT_OPEN_OPTIONS, [ - {create_if_missing, true}, - {create_missing_column_families, true} -]). - --define(DEFAULT_WRITE_OPTIONS, [{sync, true}]). --define(DEFAULT_READ_OPTIONS, []). - %% Persistent configuration of the generation, it is used to create db %% record when the database is reopened -record(schema, {keymapper :: keymapper()}). @@ -231,12 +224,12 @@ make_keymapper(#{ topic_bits_per_level := BitsPerLevel, epoch := MaxEpoch }) -> - TimestampLSBs = floor(math:log2(MaxEpoch)), + TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))), TimestampMSBs = TimestampBits - TimestampLSBs, NLevels = length(BitsPerLevel), {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel), Source = lists:flatten([ - {timestamp, TimestampLSBs, TimestampMSBs}, + [{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0], [{hash, level, Bits} || Bits <- LevelBits], {hash, levels, TailLevelsBits}, [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0] @@ -262,7 +255,7 @@ make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper), - HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper), + HashBitmask = compute_topic_bitmask(TopicFilter, DB#db.keymapper), TimeBitmask = compute_time_bitmask(DB#db.keymapper), HashBitfilter = Bitstring band HashBitmask, TimeBitfilter = Bitstring band TimeBitmask, @@ -322,9 +315,9 @@ extract(Key, #keymapper{bitsize = Size}) -> compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) -> compute_bitstring(Topic, Timestamp, Source, 0). --spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer(). -compute_hash_bitmask(TopicFilter, #keymapper{source = Source}) -> - compute_hash_bitmask(TopicFilter, Source, 0). +-spec compute_topic_bitmask(emqx_topic:words(), keymapper()) -> integer(). +compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> + compute_topic_bitmask(TopicFilter, Source, 0). -spec compute_time_bitmask(keymapper()) -> integer(). compute_time_bitmask(#keymapper{source = Source}) -> @@ -353,19 +346,19 @@ compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) -> compute_bitstring(_, _, [], Acc) -> Acc. -compute_hash_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) -> - compute_hash_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size)); -compute_hash_bitmask(['#'], [{hash, _, Size} | Rest], Acc) -> - compute_hash_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size)); -compute_hash_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) -> - compute_hash_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size)); -compute_hash_bitmask([], [{hash, level, Size} | Rest], Acc) -> - compute_hash_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_hash_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> - compute_hash_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_hash_bitmask(_, [{hash, levels, Size} | Rest], Acc) -> - compute_hash_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_hash_bitmask(_, [], Acc) -> +compute_topic_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) -> + compute_topic_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size)); +compute_topic_bitmask(['#'], [{hash, _, Size} | Rest], Acc) -> + compute_topic_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size)); +compute_topic_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) -> + compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size)); +compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) -> + compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> + compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask(_, [{hash, levels, Size} | Rest], Acc) -> + compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask(_, [], Acc) -> Acc. compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) -> @@ -408,8 +401,8 @@ match_next( ) -> HashMatches = (Bitstring band HashBitmask) == HashBitfilter, TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter, - case HashMatches of - true when TimeMatches -> + case HashMatches and TimeMatches of + true -> {Topic, MessagePayload} = unwrap_message_value(Value), case emqx_topic:match(Topic, TopicFilter) of true -> @@ -417,13 +410,8 @@ match_next( false -> next(It#it{next_action = next}) end; - true when not TimeMatches -> - NextBitstring = (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter, - NextSeek = combine(NextBitstring, <<>>, Keymapper), - next(It#it{next_action = {seek, NextSeek}}); false -> - % _ -> - case compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) of + case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of NextBitstring when is_integer(NextBitstring) -> % ct:pal("Bitstring = ~32.16.0B", [Bitstring]), % ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]), @@ -441,62 +429,128 @@ stop_iteration(It) -> ok = rocksdb:iterator_close(It#it.handle), none. -compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> +%% `Bitstring` is out of the hash space defined by `HashBitfilter`. +compute_next_seek(_HashMatches = false, _, Bitstring, It) -> + NextBitstring = compute_topic_seek( + Bitstring, + It#it.hash_bitfilter, + It#it.hash_bitmask, + It#it.keymapper + ), + case NextBitstring of + none -> + none; + _ -> + TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter, + compute_next_seek(true, TimeMatches, NextBitstring, It) + end; +%% `Bitstring` is out of the time range defined by `TimeBitfilter`. +compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) -> + compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask); +compute_next_seek(true, true, Bitstring, _It) -> + Bitstring. + +compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) -> + % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`. + (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter. + +%% Find the closest bitstring which is: +%% * greater than `Bitstring`, +%% * and falls into the hash space defined by `HashBitfilter`. +%% Note that the result can end up "back" in time and out of the time range. +compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> Sources = Keymapper#keymapper.source, Size = Keymapper#keymapper.bitsize, - compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size). + compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size). -compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> +compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> % NOTE - % Ok, this convoluted mess implements a sort of _increment operation_ for some - % strange number in variable bit-width base. There are `Levels` "digits", those - % with `0` level bitmask have `BitsPerLevel` bit-width and those with `111...` - % level bitmask have in some sense 0 bits (because they are fixed "digits" - % with exacly one possible value). - % TODO make at least remotely readable / optimize later - Result = zipfoldr3( - fun(Source, Substring, Filter, LBitmask, Offset, {Carry, Acc}) -> + % We're iterating through `Substring` here, in lockstep with `HashBitfilter` + % and`HashBitmask`, starting from least signigicant bits. Each bitsource in + % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S` + % bits long which we interpret as a "digit". There are 2 flavors of those + % "digits": + % * regular digit with 2^S possible values, + % * degenerate digit with exactly 1 possible value U (represented with 0). + % Our goal here is to find a successor of `Bistring` and perform a kind of + % digit-by-digit addition operation with carry propagation. + NextSeek = zipfoldr3( + fun(Source, Substring, Filter, LBitmask, Offset, Acc) -> case Source of - {hash, _, _} when LBitmask =:= 0, Carry =:= 0 -> - {0, Acc + (Substring bsl Offset)}; {hash, _, S} when LBitmask =:= 0 -> - Substring1 = Substring + Carry, - Carry1 = Substring1 bsr S, - Acc1 = (Substring1 band ones(S)) bsl Offset, - {Carry1, Acc1}; - {hash, _, _} when LBitmask =/= 0, (Substring + Carry) =:= Filter -> - {0, Acc + (Filter bsl Offset)}; - {hash, _, _} when LBitmask =/= 0, (Substring + Carry) > Filter -> - {1, Filter bsl Offset}; - {hash, _, _} when LBitmask =/= 0 -> - {0, Filter bsl Offset}; - {timestamp, _, _} when Carry =:= 0 -> - {0, Acc + (Substring bsl Offset)}; + % Regular case + bitwise_add_digit(Substring, Acc, S, Offset); + {hash, _, _} when LBitmask =/= 0, Substring < Filter -> + % Degenerate case, I_digit < U, no overflow. + % Successor is `U bsl Offset` which is equivalent to 0. + 0; + {hash, _, S} when LBitmask =/= 0, Substring > Filter -> + % Degenerate case, I_digit > U, overflow. + % Successor is `(1 bsl Size + U) bsl Offset`. + overflow_digit(S, Offset); + {hash, _, S} when LBitmask =/= 0 -> + % Degenerate case, I_digit = U + % Perform digit addition with I_digit = 0, assuming "digit" has + % 0 bits of information (but is `S` bits long at the same time). + % This will overflow only if the result of previous iteration + % was an overflow. + bitwise_add_digit(0, Acc, 0, S, Offset); {timestamp, _, S} -> - Substring1 = Substring + Carry, - Carry1 = Substring1 bsr S, - Acc1 = (Substring1 band ones(S)) bsl Offset, - {Carry1, Acc1} + % Regular case + bitwise_add_digit(Substring, Acc, S, Offset) end end, - % TODO - % We can put carry bit into the `Acc`'s MSB instead of wrapping it into a tuple. - % This could save us a heap alloc which might be imporatant in a hot path. - {1, 0}, + 0, Bitstring, HashBitfilter, HashBitmask, Size, Sources ), - case Result of - {_Carry = 0, Next} -> - Next bor (HashBitfilter band HashBitmask); - {_Carry = 1, _} -> - % we got "carried away" past the range, time to stop iteration + case NextSeek bsr Size of + _Carry = 0 -> + % Found the successor. + % We need to recover values of those degenerate digits which we + % represented with 0 during digit-by-digit iteration. + NextSeek bor (HashBitfilter band HashBitmask); + _Carry = 1 -> + % We got "carried away" past the range, time to stop iteration. none end. +bitwise_add_digit(Digit, Number, Width, Offset) -> + bitwise_add_digit(Digit, Number, Width, Width, Offset). + +%% Add "digit" (represented with integer `Digit`) to the `Number` assuming +%% this digit starts at `Offset` bits in `Number` and is `Width` bits long. +%% Perform an overflow if the result of addition would not fit into `Bits` +%% bits. +bitwise_add_digit(Digit, Number, Bits, Width, Offset) -> + Sum = (Digit bsl Offset) + Number, + case (Sum bsr Offset) < (1 bsl Bits) of + true -> Sum; + false -> overflow_digit(Width, Offset) + end. + +%% Constuct a number which denotes an overflow of digit that starts at +%% `Offset` bits and is `Width` bits long. +overflow_digit(Width, Offset) -> + (1 bsl Width) bsl Offset. + +%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least +%% significant bits first. +%% +%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are +%% specified in `Sources` list, in order from most significant bits to least +%% significant. Each iteration calls `FoldFun` with: +%% * bitsource that was used to extract sub-bitstrings, +%% * 3 sub-bitstrings in integer representation, +%% * bit offset into integers, +%% * current accumulator. +-spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) -> + Acc +when + FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc). zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) -> Acc; zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) -> @@ -561,7 +615,7 @@ make_keymapper_test_() -> ]. compute_test_bitmask(TopicFilter) -> - compute_hash_bitmask( + compute_topic_bitmask( TopicFilter, [ {hash, level, 3}, @@ -619,8 +673,8 @@ wildcard_bitmask_test_() -> %% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos %% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos -compute_test_next_seek(Bitstring, Bitfilter, HBitmask) -> - compute_next_seek( +compute_test_topic_seek(Bitstring, Bitfilter, HBitmask) -> + compute_topic_seek( Bitstring, Bitfilter, HBitmask, @@ -637,7 +691,7 @@ next_seek_test_() -> [ ?_assertMatch( none, - compute_test_next_seek( + compute_test_topic_seek( 16#FD_42_4242_043, 16#FD_42_4242_042, 16#FF_FF_FFFF_FFF @@ -645,7 +699,7 @@ next_seek_test_() -> ), ?_assertMatch( 16#FD_11_0678_000, - compute_test_next_seek( + compute_test_topic_seek( 16#FD_11_0108_121, 16#FD_00_0678_000, 16#FF_00_FFFF_000 @@ -653,7 +707,7 @@ next_seek_test_() -> ), ?_assertMatch( 16#FD_12_0678_000, - compute_test_next_seek( + compute_test_topic_seek( 16#FD_11_0679_919, 16#FD_00_0678_000, 16#FF_00_FFFF_000 @@ -661,7 +715,7 @@ next_seek_test_() -> ), ?_assertMatch( none, - compute_test_next_seek( + compute_test_topic_seek( 16#FD_FF_0679_001, 16#FD_00_0678_000, 16#FF_00_FFFF_000 @@ -669,7 +723,7 @@ next_seek_test_() -> ), ?_assertMatch( none, - compute_test_next_seek( + compute_test_topic_seek( 16#FE_11_0179_017, 16#FD_00_0678_000, 16#FF_00_FFFF_000 diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index a0424541a..30850927b 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -154,7 +154,7 @@ t_prop_topic_hash_computes(_) -> ) ). -t_prop_hash_bitmask_computes(_) -> +t_prop_topic_bitmask_computes(_) -> Keymapper = emqx_replay_message_storage:make_keymapper(#{ timestamp_bits => 16, topic_bits_per_level => [8, 12, 16], @@ -163,13 +163,13 @@ t_prop_hash_bitmask_computes(_) -> ?assert( proper:quickcheck( ?FORALL(TopicFilter, topic_filter(), begin - Mask = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper), + Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) end) ) ). -t_prop_iterate_stored_messages(Config) -> +t_prop_iterate_stored_messages(_) -> ?assertEqual( true, proper:quickcheck( @@ -256,7 +256,7 @@ init_per_testcase(TC, Config) -> {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. -end_per_testcase(_TC, Config) -> +end_per_testcase(_TC, _Config) -> ok = application:stop(emqx_replay). zone(TC) ->