feat(ds): Smoke tests for wildcard iterator scans

This commit is contained in:
Andrew Mayorov 2022-12-23 19:21:04 +03:00 committed by ieQu1
parent 9c1cd4911d
commit cd12338c3f
2 changed files with 75 additions and 26 deletions

View File

@ -93,14 +93,14 @@ make_iterator(#db{handle = DBHandle}, TopicFilter, StartTime) ->
Hash = compute_topic_hash(TopicFilter),
HashBitmask = make_bitmask(TopicFilter),
HashFilter = Hash band HashBitmask,
#it{
{ok, #it{
handle = ITHandle,
next_action = {seek, combine(HashFilter, StartTime, <<>>)},
topic_filter = TopicFilter,
start_time = StartTime,
hash_filter = HashFilter,
hash_bitmask = HashBitmask
};
}};
Err ->
Err
end.
@ -193,17 +193,6 @@ ones(Bits) ->
%% |123|056|678| & |fff|000|fff| = |123|000|678|.
%% Filter = |123|***|678|
%% Key1 = |123|011|108| Seek = |123|011|678|
%% Key1 = |123|011|679| Seek = |123|012|678|
%% Key1 = |123|999|679| Seek = 1|123|000|678| eos
%% Filter = |123|***|678|***|
%% Key1 = |123|011|108|121| Seek = |123|011|678|000|
%% Key1 = |123|011|679|919| Seek = |123|012|678|000|
%% Key1 = |123|999|679|001| Seek = 1|123|000|678|000| eos
%% Key1 = |125|999|179|017| Seek = 1|123|000|678|000| eos
match_next(
It = #it{
topic_filter = TopicFilter,
@ -256,8 +245,6 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
% TODO make at least remotely readable / optimize later
Result = zipfoldr3(
fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) ->
% io:format(user, "~n *** LH: ~.16B / F: ~.16B / M: ~.16B / Bs: ~B / Sh: ~B~n", [LevelHash, Filter, LevelMask, Bits, Shift]),
% io:format(user, "~n *** Carry: ~B / Acc: ~.16B~n", [Carry, Acc]),
case LevelMask of
0 when Carry == 0 ->
{0, Acc + (LevelHash bsl Shift)};
@ -288,12 +275,6 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
none
end.
% zipfoldr3(FoldFun, Acc, I1, I2, I3, Shift, [Bits]) ->
% { Shift + Bits
% , FoldFun( I1 band ones(Bits)
% , I2 band ones(Bits)
% , I3 band ones(Bits)
% , Bits, Acc ) };
zipfoldr3(_FoldFun, Acc, _, _, _, []) ->
{0, Acc};
zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) ->
@ -305,7 +286,6 @@ zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) ->
I3,
Rest
),
% { FoldFun(I1 band ones(Bits), I2 band ones(Bits), I3 band ones(Bits), Bits, AccNext).
{
Shift + Bits,
FoldFun(
@ -377,6 +357,14 @@ compute_test_next_seek(TopicHash, HashFilter, HashBitmask) ->
next_seek_test_() ->
[
?_assertMatch(
none,
compute_test_next_seek(
16#FD_42_4242_043,
16#FD_42_4242_042,
16#FF_FF_FFFF_FFF
)
),
?_assertMatch(
16#FD_11_0678_000,
compute_test_next_seek(

View File

@ -22,8 +22,6 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("proper/include/proper.hrl").
-define(DB_FILE, ?MODULE_STRING).
%% Smoke test of store function
t_store(Config) ->
DB = ?config(handle, Config),
@ -60,14 +58,77 @@ t_iterate(Config) ->
],
ok.
%% Smoke test for iteration with wildcard topic filter
t_iterate_wildcard(Config) ->
DB = ?config(handle, Config),
%% Prepare data:
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 10),
_ = [
store(DB, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 10 + 1)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 5)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/#", 0)])
),
?assertEqual(
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+/bar", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "+/bar/#", 0)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/#", 0)])
),
ok.
store(DB, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_message_storage:store(DB, ID, PublishedAt, parse_topic(Topic), Payload).
iterate(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_message_storage:make_iterator(DB, parse_topic(TopicFilter), StartTime),
iterate(It).
iterate(It) ->
case emqx_replay_message_storage:next(It) of
{value, Val} ->
[Val | iterate(It)];
{value, Payload, ItNext} ->
[Payload | iterate(ItNext)];
none ->
[]
end.
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
Topic;
parse_topic(Topic) ->
emqx_topic:words(iolist_to_binary(Topic)).
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).