From 4b8dbca232800b99f08660abeac4565b42219d70 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 4 Jan 2023 22:02:53 +0300 Subject: [PATCH] refactor: introduce keyspace filter concept So we could conveniently test it separately. --- .../src/emqx_replay_message_storage.erl | 185 ++++++++++++------ 1 file changed, 125 insertions(+), 60 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index dafaf8c68..3988e97dc 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -98,16 +98,24 @@ -export([next/1]). %% Debug/troubleshooting: +%% Keymappers -export([ - make_message_key/4, + bitsize/1, compute_bitstring/3, compute_topic_bitmask/2, - compute_next_seek/4, - compute_time_seek/3, - compute_topic_seek/4, + compute_time_bitmask/1, hash/2 ]). +%% Keyspace filters +-export([ + make_keyspace_filter/3, + compute_initial_seek/1, + compute_next_seek/2, + compute_time_seek/3, + compute_topic_seek/4 +]). + -export_type([db/0, iterator/0, schema/0]). -compile({inline, [ones/1, bitwise_concat/3]}). @@ -159,8 +167,12 @@ -record(it, { handle :: rocksdb:itr_handle(), + filter :: keyspace_filter(), + next_action :: {seek, binary()} | next +}). + +-record(filter, { keymapper :: keymapper(), - next_action :: {seek, binary()} | next, topic_filter :: emqx_topic:words(), hash_bitfilter :: integer(), hash_bitmask :: integer(), @@ -186,6 +198,7 @@ -opaque db() :: #db{}. -opaque iterator() :: #it{}. -type keymapper() :: #keymapper{}. +-type keyspace_filter() :: #filter{}. %%================================================================================ %% API funcions @@ -254,33 +267,35 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper), - HashBitmask = compute_topic_bitmask(TopicFilter, DB#db.keymapper), - TimeBitmask = compute_time_bitmask(DB#db.keymapper), - HashBitfilter = Bitstring band HashBitmask, - TimeBitfilter = Bitstring band TimeBitmask, - InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper), + % TODO earliest + Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), + InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), {ok, #it{ handle = ITHandle, - keymapper = DB#db.keymapper, - next_action = {seek, InitialSeek}, - topic_filter = TopicFilter, - hash_bitfilter = HashBitfilter, - hash_bitmask = HashBitmask, - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask + filter = Filter, + next_action = {seek, InitialSeek} }}; Err -> Err end. -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(It = #it{next_action = Action}) -> - case rocksdb:iterator_move(It#it.handle, Action) of +next(It = #it{filter = #filter{keymapper = Keymapper}}) -> + case rocksdb:iterator_move(It#it.handle, It#it.next_action) of % spec says `{ok, Key}` is also possible but the implementation says it's not {ok, Key, Value} -> - Bitstring = extract(Key, It#it.keymapper), - match_next(It, Bitstring, Value); + Bitstring = extract(Key, Keymapper), + case match_next(Bitstring, Value, It#it.filter) of + {_Topic, Payload} -> + {value, Payload, It#it{next_action = next}}; + next -> + next(It#it{next_action = next}); + NextBitstring when is_integer(NextBitstring) -> + NextSeek = combine(NextBitstring, <<>>, Keymapper), + next(It#it{next_action = {seek, NextSeek}}); + none -> + stop_iteration(It) + end; {error, invalid_iterator} -> stop_iteration(It); {error, iterator_closed} -> @@ -291,6 +306,18 @@ next(It = #it{next_action = Action}) -> %% Internal exports %%================================================================================ +-define(topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + (Bitstring band HashBitmask) == HashBitfilter +). + +-define(time_matches(Bitstring, TimeBitfilter, TimeBitmask), + (Bitstring band TimeBitmask) >= TimeBitfilter +). + +-spec bitsize(keymapper()) -> bits(). +bitsize(#keymapper{bitsize = Bitsize}) -> + Bitsize. + make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper). @@ -323,10 +350,46 @@ compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> compute_time_bitmask(#keymapper{source = Source}) -> compute_time_bitmask(Source, 0). +-spec hash(term(), bits()) -> integer(). hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). +-spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter(). +make_keyspace_filter(TopicFilter, StartTime, Keymapper) -> + Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), + HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), + TimeBitmask = compute_time_bitmask(Keymapper), + HashBitfilter = Bitstring band HashBitmask, + TimeBitfilter = Bitstring band TimeBitmask, + #filter{ + keymapper = Keymapper, + topic_filter = TopicFilter, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + }. + +-spec compute_initial_seek(keyspace_filter()) -> integer(). +compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) -> + % Should be the same as `compute_initial_seek(0, Filter)`. + HashBitfilter bor TimeBitfilter. + +-spec compute_next_seek(integer(), keyspace_filter()) -> integer(). +compute_next_seek( + Bitstring, + Filter = #filter{ + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter). + %%================================================================================ %% Internal functions %%================================================================================ @@ -388,65 +451,63 @@ ones(Bits) -> %% |123|056|678| & |fff|000|fff| = |123|000|678|. match_next( - It = #it{ - keymapper = Keymapper, + Bitstring, + Value, + Filter = #filter{ topic_filter = TopicFilter, hash_bitfilter = HashBitfilter, hash_bitmask = HashBitmask, time_bitfilter = TimeBitfilter, time_bitmask = TimeBitmask - }, - Bitstring, - Value + } ) -> - HashMatches = (Bitstring band HashBitmask) == HashBitfilter, - TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter, + HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask), case HashMatches and TimeMatches of true -> - {Topic, MessagePayload} = unwrap_message_value(Value), + Message = {Topic, _Payload} = unwrap_message_value(Value), case emqx_topic:match(Topic, TopicFilter) of true -> - {value, MessagePayload, It#it{next_action = next}}; + Message; false -> - next(It#it{next_action = next}) + next end; false -> - case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of - NextBitstring when is_integer(NextBitstring) -> - % ct:pal("Bitstring = ~32.16.0B", [Bitstring]), - % ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]), - % ct:pal("HBitmask = ~32.16.0B", [HashBitmask]), - % ct:pal("TBitmask = ~32.16.0B", [TimeBitmask]), - % ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]), - NextSeek = combine(NextBitstring, <<>>, Keymapper), - next(It#it{next_action = {seek, NextSeek}}); - none -> - stop_iteration(It) - end + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter) end. -stop_iteration(It) -> - ok = rocksdb:iterator_close(It#it.handle), - none. - %% `Bitstring` is out of the hash space defined by `HashBitfilter`. -compute_next_seek(_HashMatches = false, _, Bitstring, It) -> - NextBitstring = compute_topic_seek( - Bitstring, - It#it.hash_bitfilter, - It#it.hash_bitmask, - It#it.keymapper - ), +compute_next_seek( + _HashMatches = false, + _TimeMatches, + Bitstring, + Filter = #filter{ + keymapper = Keymapper, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper), case NextBitstring of none -> none; _ -> - TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter, - compute_next_seek(true, TimeMatches, NextBitstring, It) + TimeMatches = ?time_matches(NextBitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(true, TimeMatches, NextBitstring, Filter) end; %% `Bitstring` is out of the time range defined by `TimeBitfilter`. -compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) -> - compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask); +compute_next_seek( + _HashMatches = true, + _TimeMatches = false, + Bitstring, + #filter{ + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask); compute_next_seek(true, true, Bitstring, _It) -> Bitstring. @@ -466,7 +527,7 @@ compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> % NOTE % We're iterating through `Substring` here, in lockstep with `HashBitfilter` - % and`HashBitmask`, starting from least signigicant bits. Each bitsource in + % and `HashBitmask`, starting from least signigicant bits. Each bitsource in % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S` % bits long which we interpret as a "digit". There are 2 flavors of those % "digits": @@ -573,6 +634,10 @@ substring(I, Offset, Size) -> data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). +stop_iteration(It) -> + ok = rocksdb:iterator_close(It#it.handle), + none. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl").