feat(iter): wildcard smoke tests
This commit is contained in:
parent
5e612c910c
commit
8707504245
|
@ -93,14 +93,14 @@ make_iterator(#db{handle = DBHandle}, TopicFilter, StartTime) ->
|
||||||
Hash = compute_topic_hash(TopicFilter),
|
Hash = compute_topic_hash(TopicFilter),
|
||||||
HashBitmask = make_bitmask(TopicFilter),
|
HashBitmask = make_bitmask(TopicFilter),
|
||||||
HashFilter = Hash band HashBitmask,
|
HashFilter = Hash band HashBitmask,
|
||||||
#it{
|
{ok, #it{
|
||||||
handle = ITHandle,
|
handle = ITHandle,
|
||||||
next_action = {seek, combine(HashFilter, StartTime, <<>>)},
|
next_action = {seek, combine(HashFilter, StartTime, <<>>)},
|
||||||
topic_filter = TopicFilter,
|
topic_filter = TopicFilter,
|
||||||
start_time = StartTime,
|
start_time = StartTime,
|
||||||
hash_filter = HashFilter,
|
hash_filter = HashFilter,
|
||||||
hash_bitmask = HashBitmask
|
hash_bitmask = HashBitmask
|
||||||
};
|
}};
|
||||||
Err ->
|
Err ->
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
@ -193,17 +193,6 @@ ones(Bits) ->
|
||||||
|
|
||||||
%% |123|056|678| & |fff|000|fff| = |123|000|678|.
|
%% |123|056|678| & |fff|000|fff| = |123|000|678|.
|
||||||
|
|
||||||
%% Filter = |123|***|678|
|
|
||||||
%% Key1 = |123|011|108| → Seek = |123|011|678|
|
|
||||||
%% Key1 = |123|011|679| → Seek = |123|012|678|
|
|
||||||
%% Key1 = |123|999|679| → Seek = 1|123|000|678| → eos
|
|
||||||
|
|
||||||
%% Filter = |123|***|678|***|
|
|
||||||
%% Key1 = |123|011|108|121| → Seek = |123|011|678|000|
|
|
||||||
%% Key1 = |123|011|679|919| → Seek = |123|012|678|000|
|
|
||||||
%% Key1 = |123|999|679|001| → Seek = 1|123|000|678|000| → eos
|
|
||||||
%% Key1 = |125|999|179|017| → Seek = 1|123|000|678|000| → eos
|
|
||||||
|
|
||||||
match_next(
|
match_next(
|
||||||
It = #it{
|
It = #it{
|
||||||
topic_filter = TopicFilter,
|
topic_filter = TopicFilter,
|
||||||
|
@ -256,8 +245,6 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
|
||||||
% TODO make at least remotely readable / optimize later
|
% TODO make at least remotely readable / optimize later
|
||||||
Result = zipfoldr3(
|
Result = zipfoldr3(
|
||||||
fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) ->
|
fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) ->
|
||||||
% io:format(user, "~n *** LH: ~.16B / F: ~.16B / M: ~.16B / Bs: ~B / Sh: ~B~n", [LevelHash, Filter, LevelMask, Bits, Shift]),
|
|
||||||
% io:format(user, "~n *** Carry: ~B / Acc: ~.16B~n", [Carry, Acc]),
|
|
||||||
case LevelMask of
|
case LevelMask of
|
||||||
0 when Carry == 0 ->
|
0 when Carry == 0 ->
|
||||||
{0, Acc + (LevelHash bsl Shift)};
|
{0, Acc + (LevelHash bsl Shift)};
|
||||||
|
@ -288,12 +275,6 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) ->
|
||||||
none
|
none
|
||||||
end.
|
end.
|
||||||
|
|
||||||
% zipfoldr3(FoldFun, Acc, I1, I2, I3, Shift, [Bits]) ->
|
|
||||||
% { Shift + Bits
|
|
||||||
% , FoldFun( I1 band ones(Bits)
|
|
||||||
% , I2 band ones(Bits)
|
|
||||||
% , I3 band ones(Bits)
|
|
||||||
% , Bits, Acc ) };
|
|
||||||
zipfoldr3(_FoldFun, Acc, _, _, _, []) ->
|
zipfoldr3(_FoldFun, Acc, _, _, _, []) ->
|
||||||
{0, Acc};
|
{0, Acc};
|
||||||
zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) ->
|
zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) ->
|
||||||
|
@ -305,7 +286,6 @@ zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) ->
|
||||||
I3,
|
I3,
|
||||||
Rest
|
Rest
|
||||||
),
|
),
|
||||||
% { FoldFun(I1 band ones(Bits), I2 band ones(Bits), I3 band ones(Bits), Bits, AccNext).
|
|
||||||
{
|
{
|
||||||
Shift + Bits,
|
Shift + Bits,
|
||||||
FoldFun(
|
FoldFun(
|
||||||
|
@ -377,6 +357,14 @@ compute_test_next_seek(TopicHash, HashFilter, HashBitmask) ->
|
||||||
|
|
||||||
next_seek_test_() ->
|
next_seek_test_() ->
|
||||||
[
|
[
|
||||||
|
?_assertMatch(
|
||||||
|
none,
|
||||||
|
compute_test_next_seek(
|
||||||
|
16#FD_42_4242_043,
|
||||||
|
16#FD_42_4242_042,
|
||||||
|
16#FF_FF_FFFF_FFF
|
||||||
|
)
|
||||||
|
),
|
||||||
?_assertMatch(
|
?_assertMatch(
|
||||||
16#FD_11_0678_000,
|
16#FD_11_0678_000,
|
||||||
compute_test_next_seek(
|
compute_test_next_seek(
|
||||||
|
|
|
@ -22,8 +22,6 @@
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
-include_lib("proper/include/proper.hrl").
|
-include_lib("proper/include/proper.hrl").
|
||||||
|
|
||||||
-define(DB_FILE, ?MODULE_STRING).
|
|
||||||
|
|
||||||
%% Smoke test of store function
|
%% Smoke test of store function
|
||||||
t_store(Config) ->
|
t_store(Config) ->
|
||||||
DB = ?config(handle, Config),
|
DB = ?config(handle, Config),
|
||||||
|
@ -60,14 +58,77 @@ t_iterate(Config) ->
|
||||||
],
|
],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Smoke test for iteration with wildcard topic filter
|
||||||
|
t_iterate_wildcard(Config) ->
|
||||||
|
DB = ?config(handle, Config),
|
||||||
|
%% Prepare data:
|
||||||
|
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
||||||
|
Timestamps = lists:seq(1, 10),
|
||||||
|
_ = [
|
||||||
|
store(DB, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
||||||
|
|| Topic <- Topics, PublishedAt <- Timestamps
|
||||||
|
],
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 0)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 10 + 1)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 5)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([
|
||||||
|
{Topic, PublishedAt}
|
||||||
|
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
|
||||||
|
]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/#", 0)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+", 0)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+/bar", 0)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([
|
||||||
|
{Topic, PublishedAt}
|
||||||
|
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
|
||||||
|
]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "+/bar/#", 0)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/#", 0)])
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
store(DB, PublishedAt, Topic, Payload) ->
|
||||||
|
ID = emqx_guid:gen(),
|
||||||
|
emqx_replay_message_storage:store(DB, ID, PublishedAt, parse_topic(Topic), Payload).
|
||||||
|
|
||||||
|
iterate(DB, TopicFilter, StartTime) ->
|
||||||
|
{ok, It} = emqx_replay_message_storage:make_iterator(DB, parse_topic(TopicFilter), StartTime),
|
||||||
|
iterate(It).
|
||||||
|
|
||||||
iterate(It) ->
|
iterate(It) ->
|
||||||
case emqx_replay_message_storage:next(It) of
|
case emqx_replay_message_storage:next(It) of
|
||||||
{value, Val} ->
|
{value, Payload, ItNext} ->
|
||||||
[Val | iterate(It)];
|
[Payload | iterate(ItNext)];
|
||||||
none ->
|
none ->
|
||||||
[]
|
[]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
|
||||||
|
Topic;
|
||||||
|
parse_topic(Topic) ->
|
||||||
|
emqx_topic:words(iolist_to_binary(Topic)).
|
||||||
|
|
||||||
%% CT callbacks
|
%% CT callbacks
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
Loading…
Reference in New Issue