refactor: introduce keyspace filter concept

So we could conveniently test it separately.
This commit is contained in:
Andrew Mayorov 2023-01-04 22:02:53 +03:00
parent 3de384e806
commit 4b8dbca232
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 125 additions and 60 deletions

View File

@ -98,16 +98,24 @@
-export([next/1]).
%% Debug/troubleshooting:
%% Keymappers
-export([
make_message_key/4,
bitsize/1,
compute_bitstring/3,
compute_topic_bitmask/2,
compute_next_seek/4,
compute_time_seek/3,
compute_topic_seek/4,
compute_time_bitmask/1,
hash/2
]).
%% Keyspace filters
-export([
make_keyspace_filter/3,
compute_initial_seek/1,
compute_next_seek/2,
compute_time_seek/3,
compute_topic_seek/4
]).
-export_type([db/0, iterator/0, schema/0]).
-compile({inline, [ones/1, bitwise_concat/3]}).
@ -159,8 +167,12 @@
-record(it, {
handle :: rocksdb:itr_handle(),
filter :: keyspace_filter(),
next_action :: {seek, binary()} | next
}).
-record(filter, {
keymapper :: keymapper(),
next_action :: {seek, binary()} | next,
topic_filter :: emqx_topic:words(),
hash_bitfilter :: integer(),
hash_bitmask :: integer(),
@ -186,6 +198,7 @@
-opaque db() :: #db{}.
-opaque iterator() :: #it{}.
-type keymapper() :: #keymapper{}.
-type keyspace_filter() :: #filter{}.
%%================================================================================
%% API funcions
@ -254,33 +267,35 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
{ok, ITHandle} ->
Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper),
HashBitmask = compute_topic_bitmask(TopicFilter, DB#db.keymapper),
TimeBitmask = compute_time_bitmask(DB#db.keymapper),
HashBitfilter = Bitstring band HashBitmask,
TimeBitfilter = Bitstring band TimeBitmask,
InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper),
% TODO earliest
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
{ok, #it{
handle = ITHandle,
keymapper = DB#db.keymapper,
next_action = {seek, InitialSeek},
topic_filter = TopicFilter,
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
filter = Filter,
next_action = {seek, InitialSeek}
}};
Err ->
Err
end.
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
next(It = #it{next_action = Action}) ->
case rocksdb:iterator_move(It#it.handle, Action) of
next(It = #it{filter = #filter{keymapper = Keymapper}}) ->
case rocksdb:iterator_move(It#it.handle, It#it.next_action) of
% spec says `{ok, Key}` is also possible but the implementation says it's not
{ok, Key, Value} ->
Bitstring = extract(Key, It#it.keymapper),
match_next(It, Bitstring, Value);
Bitstring = extract(Key, Keymapper),
case match_next(Bitstring, Value, It#it.filter) of
{_Topic, Payload} ->
{value, Payload, It#it{next_action = next}};
next ->
next(It#it{next_action = next});
NextBitstring when is_integer(NextBitstring) ->
NextSeek = combine(NextBitstring, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}});
none ->
stop_iteration(It)
end;
{error, invalid_iterator} ->
stop_iteration(It);
{error, iterator_closed} ->
@ -291,6 +306,18 @@ next(It = #it{next_action = Action}) ->
%% Internal exports
%%================================================================================
-define(topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
(Bitstring band HashBitmask) == HashBitfilter
).
-define(time_matches(Bitstring, TimeBitfilter, TimeBitmask),
(Bitstring band TimeBitmask) >= TimeBitfilter
).
-spec bitsize(keymapper()) -> bits().
bitsize(#keymapper{bitsize = Bitsize}) ->
Bitsize.
make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
@ -323,10 +350,46 @@ compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) ->
compute_time_bitmask(#keymapper{source = Source}) ->
compute_time_bitmask(Source, 0).
-spec hash(term(), bits()) -> integer().
hash(Input, Bits) ->
% at most 32 bits
erlang:phash2(Input, 1 bsl Bits).
-spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter().
make_keyspace_filter(TopicFilter, StartTime, Keymapper) ->
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
TimeBitmask = compute_time_bitmask(Keymapper),
HashBitfilter = Bitstring band HashBitmask,
TimeBitfilter = Bitstring band TimeBitmask,
#filter{
keymapper = Keymapper,
topic_filter = TopicFilter,
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}.
-spec compute_initial_seek(keyspace_filter()) -> integer().
compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) ->
% Should be the same as `compute_initial_seek(0, Filter)`.
HashBitfilter bor TimeBitfilter.
-spec compute_next_seek(integer(), keyspace_filter()) -> integer().
compute_next_seek(
Bitstring,
Filter = #filter{
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}
) ->
HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask),
compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter).
%%================================================================================
%% Internal functions
%%================================================================================
@ -388,65 +451,63 @@ ones(Bits) ->
%% |123|056|678| & |fff|000|fff| = |123|000|678|.
match_next(
It = #it{
keymapper = Keymapper,
Bitstring,
Value,
Filter = #filter{
topic_filter = TopicFilter,
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
},
Bitstring,
Value
}
) ->
HashMatches = (Bitstring band HashBitmask) == HashBitfilter,
TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter,
HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask),
case HashMatches and TimeMatches of
true ->
{Topic, MessagePayload} = unwrap_message_value(Value),
Message = {Topic, _Payload} = unwrap_message_value(Value),
case emqx_topic:match(Topic, TopicFilter) of
true ->
{value, MessagePayload, It#it{next_action = next}};
Message;
false ->
next(It#it{next_action = next})
next
end;
false ->
case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of
NextBitstring when is_integer(NextBitstring) ->
% ct:pal("Bitstring = ~32.16.0B", [Bitstring]),
% ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]),
% ct:pal("HBitmask = ~32.16.0B", [HashBitmask]),
% ct:pal("TBitmask = ~32.16.0B", [TimeBitmask]),
% ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]),
NextSeek = combine(NextBitstring, <<>>, Keymapper),
next(It#it{next_action = {seek, NextSeek}});
none ->
stop_iteration(It)
end
compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter)
end.
stop_iteration(It) ->
ok = rocksdb:iterator_close(It#it.handle),
none.
%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
compute_next_seek(_HashMatches = false, _, Bitstring, It) ->
NextBitstring = compute_topic_seek(
Bitstring,
It#it.hash_bitfilter,
It#it.hash_bitmask,
It#it.keymapper
),
compute_next_seek(
_HashMatches = false,
_TimeMatches,
Bitstring,
Filter = #filter{
keymapper = Keymapper,
hash_bitfilter = HashBitfilter,
hash_bitmask = HashBitmask,
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}
) ->
NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper),
case NextBitstring of
none ->
none;
_ ->
TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter,
compute_next_seek(true, TimeMatches, NextBitstring, It)
TimeMatches = ?time_matches(NextBitstring, TimeBitfilter, TimeBitmask),
compute_next_seek(true, TimeMatches, NextBitstring, Filter)
end;
%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) ->
compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask);
compute_next_seek(
_HashMatches = true,
_TimeMatches = false,
Bitstring,
#filter{
time_bitfilter = TimeBitfilter,
time_bitmask = TimeBitmask
}
) ->
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask);
compute_next_seek(true, true, Bitstring, _It) ->
Bitstring.
@ -466,7 +527,7 @@ compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
% NOTE
% We're iterating through `Substring` here, in lockstep with `HashBitfilter`
% and`HashBitmask`, starting from least signigicant bits. Each bitsource in
% and `HashBitmask`, starting from least signigicant bits. Each bitsource in
% `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
% bits long which we interpret as a "digit". There are 2 flavors of those
% "digits":
@ -573,6 +634,10 @@ substring(I, Offset, Size) ->
data_cf(GenId) ->
?MODULE_STRING ++ integer_to_list(GenId).
stop_iteration(It) ->
ok = rocksdb:iterator_close(It#it.handle),
none.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").