From 0cfeee0df7d4c053b70a8156c04a06c91ab97ef8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 30 Dec 2022 13:35:04 +0300 Subject: [PATCH] feat: implement keyspace partitioning across time --- .../src/emqx_replay_message_storage.erl | 354 ++++++++++++------ .../test/emqx_replay_storage_SUITE.erl | 23 +- 2 files changed, 244 insertions(+), 133 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index b591157f4..66668b23b 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -27,9 +27,8 @@ %% Debug/troubleshooting: -export([ make_message_key/4, - compute_topic_hash/2, + compute_bitstring/3, compute_hash_bitmask/2, - combine/4, hash/2 ]). @@ -104,19 +103,27 @@ keymapper :: keymapper(), next_action :: {seek, binary()} | next, topic_filter :: emqx_topic:words(), - hash_filter :: integer(), + hash_bitfilter :: integer(), hash_bitmask :: integer(), - start_time :: time() + time_bitfilter :: integer(), + time_bitmask :: integer() }). % NOTE % Keymapper decides how to map messages into RocksDB column family keyspace. -record(keymapper, { - topic_bits :: bits(), - topic_bits_per_level :: bits_per_level(), - timestamp_bits :: bits() + source :: [bitsource(), ...], + bitsize :: bits(), + tau :: non_neg_integer() }). +-type bitsource() :: + %% Consume `_Size` bits from timestamp starting at `_Offset`th bit. + %% TODO consistency + {timestamp, _Offset :: bits(), _Size :: bits()} + %% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash. + | {hash, level | levels, _Size :: bits()}. + -opaque db() :: #db{}. -opaque iterator() :: #it{}. -type keymapper() :: #keymapper{}. @@ -162,18 +169,32 @@ close(#db{handle = DB}) -> -spec make_keymapper(Options) -> keymapper() when Options :: #{ - %% Number of bits in a key allocated to a message timestamp. + %% Number of bits in a message timestamp. timestamp_bits := bits(), %% Number of bits in a key allocated to each level in a message topic. - topic_bits_per_level := bits_per_level() + topic_bits_per_level := bits_per_level(), + %% Maximum granularity of iteration over time. + max_tau := time() }. -make_keymapper(Options) -> - TimestampBits = maps:get(timestamp_bits, Options), - TopicBitsPerLevel = maps:get(topic_bits_per_level, Options), +make_keymapper(#{ + timestamp_bits := TimestampBits, + topic_bits_per_level := BitsPerLevel, + max_tau := MaxTau +}) -> + TimestampLSBs = floor(math:log2(MaxTau)), + TimestampMSBs = TimestampBits - TimestampLSBs, + NLevels = length(BitsPerLevel), + {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel), + Source = lists:flatten([ + {timestamp, TimestampLSBs, TimestampMSBs}, + [{hash, level, Bits} || Bits <- LevelBits], + {hash, levels, TailLevelsBits}, + [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0] + ]), #keymapper{ - timestamp_bits = TimestampBits, - topic_bits = lists:sum(TopicBitsPerLevel), - topic_bits_per_level = TopicBitsPerLevel + source = Source, + bitsize = lists:sum([S || {_, _, S} <- Source]), + tau = 1 bsl TimestampLSBs }. -spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> @@ -190,18 +211,21 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - Hash = compute_topic_hash(TopicFilter, DB#db.keymapper), + Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper), HashBitmask = compute_hash_bitmask(TopicFilter, DB#db.keymapper), - HashFilter = Hash band HashBitmask, - InitialSeek = combine(HashFilter, StartTime, <<>>, DB#db.keymapper), + TimeBitmask = compute_time_bitmask(DB#db.keymapper), + HashBitfilter = Bitstring band HashBitmask, + TimeBitfilter = Bitstring band TimeBitmask, + InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper), {ok, #it{ handle = ITHandle, keymapper = DB#db.keymapper, next_action = {seek, InitialSeek}, topic_filter = TopicFilter, - start_time = StartTime, - hash_filter = HashFilter, - hash_bitmask = HashBitmask + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask }}; Err -> Err @@ -212,8 +236,8 @@ next(It = #it{next_action = Action}) -> case rocksdb:iterator_move(It#it.handle, Action) of % spec says `{ok, Key}` is also possible but the implementation says it's not {ok, Key, Value} -> - {TopicHash, PublishedAt} = extract(Key, It#it.keymapper), - match_next(It, TopicHash, PublishedAt, Value); + Bitstring = extract(Key, It#it.keymapper), + match_next(It, Bitstring, Value); {error, invalid_iterator} -> stop_iteration(It); {error, iterator_closed} -> @@ -225,7 +249,7 @@ next(It = #it{next_action = Action}) -> %%================================================================================ make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> - combine(compute_topic_hash(Topic, Keymapper), PublishedAt, MessageID, Keymapper). + combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper). make_message_value(Topic, MessagePayload) -> term_to_binary({Topic, MessagePayload}). @@ -233,61 +257,74 @@ make_message_value(Topic, MessagePayload) -> unwrap_message_value(Binary) -> binary_to_term(Binary). --spec combine(_TopicHash :: integer(), time(), emqx_guid:guid(), keymapper()) -> +-spec combine(_Bitstring :: integer(), emqx_guid:guid(), keymapper()) -> key(). -combine(TopicHash, PublishedAt, MessageID, #keymapper{ - timestamp_bits = TimestampBits, - topic_bits = TopicBits -}) -> - <>. +combine(Bitstring, MessageID, #keymapper{bitsize = Size}) -> + <>. -spec extract(key(), keymapper()) -> - {_TopicHash :: integer(), time()}. -extract(Key, #keymapper{ - timestamp_bits = TimestampBits, - topic_bits = TopicBits -}) -> - <> = Key, - {TopicHash, PublishedAt}. + _Bitstring :: integer(). +extract(Key, #keymapper{bitsize = Size}) -> + <> = Key, + Bitstring. -compute_topic_hash(Topic, Keymapper) -> - compute_topic_hash(Topic, Keymapper#keymapper.topic_bits_per_level, 0). +-spec compute_bitstring(topic(), time(), keymapper()) -> integer(). +compute_bitstring(Topic, Timestamp, #keymapper{source = Source}) -> + compute_bitstring(Topic, Timestamp, Source, 0). + +-spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer(). +compute_hash_bitmask(TopicFilter, #keymapper{source = Source}) -> + compute_hash_bitmask(TopicFilter, Source, 0). + +-spec compute_time_bitmask(keymapper()) -> integer(). +compute_time_bitmask(#keymapper{source = Source}) -> + compute_time_bitmask(Source, 0). hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). --spec compute_hash_bitmask(emqx_topic:words(), keymapper()) -> integer(). -compute_hash_bitmask(TopicFilter, Keymapper) -> - compute_hash_bitmask(TopicFilter, Keymapper#keymapper.topic_bits_per_level, 0). - %%================================================================================ %% Internal functions %%================================================================================ -compute_topic_hash(LevelsRest, [Bits], Acc) -> - Hash = hash(LevelsRest, Bits), - Acc bsl Bits + Hash; -compute_topic_hash([], [Bits | BitsRest], Acc) -> - Hash = hash(<<"/">>, Bits), - compute_topic_hash([], BitsRest, Acc bsl Bits + Hash); -compute_topic_hash([Level | LevelsRest], [Bits | BitsRest], Acc) -> - Hash = hash(Level, Bits), - compute_topic_hash(LevelsRest, BitsRest, Acc bsl Bits + Hash). +compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) -> + I = (Timestamp bsr Offset) band ones(Size), + compute_bitstring(Topic, Timestamp, Rest, (Acc bsl Size) + I); +compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) -> + I = hash(<<"/">>, Size), + compute_bitstring([], Timestamp, Rest, (Acc bsl Size) + I); +compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) -> + I = hash(Level, Size), + compute_bitstring(Tail, Timestamp, Rest, (Acc bsl Size) + I); +compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) -> + I = hash(Tail, Size), + compute_bitstring(Tail, Timestamp, Rest, (Acc bsl Size) + I); +compute_bitstring(_, _, [], Acc) -> + Acc. -compute_hash_bitmask(['#'], BitsPerLevel, Acc) -> - Acc bsl lists:sum(BitsPerLevel) + 0; -compute_hash_bitmask(['+' | LevelsRest], [Bits | BitsRest], Acc) -> - compute_hash_bitmask(LevelsRest, BitsRest, Acc bsl Bits + 0); -compute_hash_bitmask(_, [Bits], Acc) -> - Acc bsl Bits + ones(Bits); -compute_hash_bitmask([], [Bits | BitsRest], Acc) -> - compute_hash_bitmask([], BitsRest, Acc bsl Bits + ones(Bits)); -compute_hash_bitmask([_ | LevelsRest], [Bits | BitsRest], Acc) -> - compute_hash_bitmask(LevelsRest, BitsRest, Acc bsl Bits + ones(Bits)); +compute_hash_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) -> + compute_hash_bitmask(Filter, Rest, (Acc bsl Size) + 0); +compute_hash_bitmask(['#'], [{hash, _, Size} | Rest], Acc) -> + compute_hash_bitmask(['#'], Rest, (Acc bsl Size) + 0); +compute_hash_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) -> + compute_hash_bitmask(Tail, Rest, (Acc bsl Size) + 0); +compute_hash_bitmask([], [{hash, level, Size} | Rest], Acc) -> + compute_hash_bitmask([], Rest, (Acc bsl Size) + ones(Size)); +compute_hash_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> + compute_hash_bitmask(Tail, Rest, (Acc bsl Size) + ones(Size)); +compute_hash_bitmask(_, [{hash, levels, Size} | Rest], Acc) -> + compute_hash_bitmask([], Rest, (Acc bsl Size) + ones(Size)); compute_hash_bitmask(_, [], Acc) -> Acc. +compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) -> + compute_time_bitmask(Rest, (Acc bsl Size) + ones(Size)); +compute_time_bitmask([{hash, _, Size} | Rest], Acc) -> + compute_time_bitmask(Rest, (Acc bsl Size) + 0); +compute_time_bitmask([], Acc) -> + Acc. + ones(Bits) -> 1 bsl Bits - 1. @@ -308,16 +345,16 @@ match_next( It = #it{ keymapper = Keymapper, topic_filter = TopicFilter, - hash_filter = HashFilter, + hash_bitfilter = HashBitfilter, hash_bitmask = HashBitmask, - start_time = StartTime + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask }, - TopicHash, - PublishedAt, + Bitstring, Value ) -> - HashMatches = (TopicHash band It#it.hash_bitmask) == It#it.hash_filter, - TimeMatches = PublishedAt >= It#it.start_time, + HashMatches = (Bitstring band HashBitmask) == HashBitfilter, + TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter, case HashMatches of true when TimeMatches -> {Topic, MessagePayload} = unwrap_message_value(Value), @@ -327,13 +364,20 @@ match_next( false -> next(It#it{next_action = next}) end; - true -> - NextSeek = combine(TopicHash, StartTime, <<>>, Keymapper), + true when not TimeMatches -> + NextBitstring = (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter, + NextSeek = combine(NextBitstring, <<>>, Keymapper), next(It#it{next_action = {seek, NextSeek}}); false -> - case compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper) of - NextHash when is_integer(NextHash) -> - NextSeek = combine(NextHash, StartTime, <<>>, Keymapper), + % _ -> + case compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) of + NextBitstring when is_integer(NextBitstring) -> + % ct:pal("Bitstring = ~32.16.0B", [Bitstring]), + % ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]), + % ct:pal("HBitmask = ~32.16.0B", [HashBitmask]), + % ct:pal("TBitmask = ~32.16.0B", [TimeBitmask]), + % ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]), + NextSeek = combine(NextBitstring, <<>>, Keymapper), next(It#it{next_action = {seek, NextSeek}}); none -> stop_iteration(It) @@ -344,10 +388,12 @@ stop_iteration(It) -> ok = rocksdb:iterator_close(It#it.handle), none. -compute_next_seek(TopicHash, HashFilter, HashBitmask, Keymapper = #keymapper{}) -> - BitsPerLevel = Keymapper#keymapper.topic_bits_per_level, - compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel); -compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) -> +compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> + Sources = Keymapper#keymapper.source, + Size = Keymapper#keymapper.bitsize, + compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size). + +compute_next_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> % NOTE % Ok, this convoluted mess implements a sort of _increment operation_ for some % strange number in variable bit-width base. There are `Levels` "digits", those @@ -356,66 +402,117 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) -> % with exacly one possible value). % TODO make at least remotely readable / optimize later Result = zipfoldr3( - fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) -> - case LevelMask of - 0 when Carry == 0 -> - {0, Acc + (LevelHash bsl Shift)}; - 0 -> - LevelHash1 = LevelHash + Carry, - NextCarry = LevelHash1 bsr Bits, - NextAcc = (LevelHash1 band ones(Bits)) bsl Shift, - {NextCarry, NextAcc}; - _ when (LevelHash + Carry) == Filter -> - {0, Acc + (Filter bsl Shift)}; - _ when (LevelHash + Carry) > Filter -> - {1, Filter bsl Shift}; - _ -> - {0, Filter bsl Shift} + fun(Source, Substring, Filter, LBitmask, Offset, {Carry, Acc}) -> + case Source of + {hash, _, _} when LBitmask =:= 0, Carry =:= 0 -> + {0, Acc + (Substring bsl Offset)}; + {hash, _, S} when LBitmask =:= 0 -> + Substring1 = Substring + Carry, + Carry1 = Substring1 bsr S, + Acc1 = (Substring1 band ones(S)) bsl Offset, + {Carry1, Acc1}; + {hash, _, _} when LBitmask =/= 0, (Substring + Carry) =:= Filter -> + {0, Acc + (Filter bsl Offset)}; + {hash, _, _} when LBitmask =/= 0, (Substring + Carry) > Filter -> + {1, Filter bsl Offset}; + {hash, _, _} when LBitmask =/= 0 -> + {0, Filter bsl Offset}; + {timestamp, _, _} when Carry =:= 0 -> + {0, Acc + (Substring bsl Offset)}; + {timestamp, _, S} -> + Substring1 = Substring + Carry, + Carry1 = Substring1 bsr S, + Acc1 = (Substring1 band ones(S)) bsl Offset, + {Carry1, Acc1} end end, + % TODO + % We can put carry bit into the `Acc`'s MSB instead of wrapping it into a tuple. + % This could save us a heap alloc which might be imporatant in a hot path. {1, 0}, - TopicHash, - HashFilter, + Bitstring, + HashBitfilter, HashBitmask, - BitsPerLevel + Size, + Sources ), case Result of - {_, {_Carry = 0, Next}} -> - Next bor HashFilter; - {_, {_Carry = 1, _}} -> + {_Carry = 0, Next} -> + Next bor (HashBitfilter band HashBitmask); + {_Carry = 1, _} -> % we got "carried away" past the range, time to stop iteration none end. -zipfoldr3(_FoldFun, Acc, _, _, _, []) -> - {0, Acc}; -zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) -> - {Shift, AccNext} = zipfoldr3( - FoldFun, - Acc, - I1, - I2, - I3, - Rest - ), - { - Shift + Bits, - FoldFun( - (I1 bsr Shift) band ones(Bits), - (I2 bsr Shift) band ones(Bits), - (I3 bsr Shift) band ones(Bits), - Bits, - Shift, - AccNext - ) - }. +zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) -> + Acc; +zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) -> + OffsetNext = Offset - S, + AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest), + FoldFun( + Source, + substring(I1, OffsetNext, S), + substring(I2, OffsetNext, S), + substring(I3, OffsetNext, S), + OffsetNext, + AccNext + ). + +substring(I, Offset, Size) -> + (I bsr Offset) band ones(Size). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +make_keymapper_test_() -> + [ + ?_assertEqual( + #keymapper{ + source = [ + {timestamp, 9, 23}, + {hash, level, 2}, + {hash, level, 4}, + {hash, levels, 8}, + {timestamp, 0, 9} + ], + bitsize = 46, + tau = 512 + }, + make_keymapper(#{ + timestamp_bits => 32, + topic_bits_per_level => [2, 4, 8], + max_tau => 1000 + }) + ), + ?_assertEqual( + #keymapper{ + source = [ + {timestamp, 0, 32}, + {hash, levels, 16} + ], + bitsize = 48, + tau = 1 + }, + make_keymapper(#{ + timestamp_bits => 32, + topic_bits_per_level => [16], + max_tau => 1 + }) + ) + ]. + compute_test_bitmask(TopicFilter) -> - compute_hash_bitmask(TopicFilter, [3, 4, 5, 2], 0). + compute_hash_bitmask( + TopicFilter, + [ + {hash, level, 3}, + {hash, level, 4}, + {hash, level, 5}, + {hash, levels, 2} + ], + 0 + ). bitmask_test_() -> [ @@ -464,8 +561,19 @@ wildcard_bitmask_test_() -> %% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos %% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos -compute_test_next_seek(TopicHash, HashFilter, HashBitmask) -> - compute_next_seek(TopicHash, HashFilter, HashBitmask, [8, 8, 16, 12]). +compute_test_next_seek(Bitstring, Bitfilter, HBitmask) -> + compute_next_seek( + Bitstring, + Bitfilter, + HBitmask, + [ + {hash, level, 8}, + {hash, level, 8}, + {hash, level, 16}, + {hash, levels, 12} + ], + 8 + 8 + 16 + 12 + ). next_seek_test_() -> [ diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index b1a8a396b..5608f6008 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -52,7 +52,7 @@ t_iterate(Config) -> begin {ok, It} = emqx_replay_message_storage:make_iterator(DB, Topic, 0), Values = iterate(It), - ?assertEqual(Values, lists:map(fun integer_to_binary/1, Timestamps)) + ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) end || Topic <- Topics ], @@ -137,28 +137,30 @@ parse_topic(Topic) -> t_prop_topic_hash_computes(_) -> Keymapper = emqx_replay_message_storage:make_keymapper(#{ + timestamp_bits => 32, topic_bits_per_level => [8, 12, 16, 24], - timestamp_bits => 0 + max_tau => 10000 }), ?assert( proper:quickcheck( - ?FORALL(Topic, topic(), begin - Hash = emqx_replay_message_storage:compute_topic_hash(Topic, Keymapper), - is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) + ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin + BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), + is_integer(BS) andalso (BS < (1 bsl 92)) end) ) ). t_prop_hash_bitmask_computes(_) -> Keymapper = emqx_replay_message_storage:make_keymapper(#{ - topic_bits_per_level => [8, 12, 16, 24], - timestamp_bits => 0 + timestamp_bits => 16, + topic_bits_per_level => [8, 12, 16], + max_tau => 100 }), ?assert( proper:quickcheck( ?FORALL(TopicFilter, topic_filter(), begin - Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper), - is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) + Mask = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter, Keymapper), + is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) end) ) ). @@ -252,8 +254,9 @@ init_per_testcase(TC, Config) -> {ok, DB} = emqx_replay_message_storage:open(Filename, #{ column_family => {atom_to_list(TC), []}, keymapper => emqx_replay_message_storage:make_keymapper(#{ + timestamp_bits => 64, topic_bits_per_level => [8, 8, 32, 16], - timestamp_bits => 64 + max_tau => 5 }) }), [{handle, DB} | Config].