From cd12338c3f2219c6778ac090412b06e35a0485c2 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 23 Dec 2022 19:21:04 +0300 Subject: [PATCH] feat(ds): Smoke tests for wildcard iterator scans --- .../src/emqx_replay_message_storage.erl | 32 +++------ .../test/emqx_replay_storage_SUITE.erl | 69 +++++++++++++++++-- 2 files changed, 75 insertions(+), 26 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index e867ad850..2e5fb95f2 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -93,14 +93,14 @@ make_iterator(#db{handle = DBHandle}, TopicFilter, StartTime) -> Hash = compute_topic_hash(TopicFilter), HashBitmask = make_bitmask(TopicFilter), HashFilter = Hash band HashBitmask, - #it{ + {ok, #it{ handle = ITHandle, next_action = {seek, combine(HashFilter, StartTime, <<>>)}, topic_filter = TopicFilter, start_time = StartTime, hash_filter = HashFilter, hash_bitmask = HashBitmask - }; + }}; Err -> Err end. @@ -193,17 +193,6 @@ ones(Bits) -> %% |123|056|678| & |fff|000|fff| = |123|000|678|. -%% Filter = |123|***|678| -%% Key1 = |123|011|108| → Seek = |123|011|678| -%% Key1 = |123|011|679| → Seek = |123|012|678| -%% Key1 = |123|999|679| → Seek = 1|123|000|678| → eos - -%% Filter = |123|***|678|***| -%% Key1 = |123|011|108|121| → Seek = |123|011|678|000| -%% Key1 = |123|011|679|919| → Seek = |123|012|678|000| -%% Key1 = |123|999|679|001| → Seek = 1|123|000|678|000| → eos -%% Key1 = |125|999|179|017| → Seek = 1|123|000|678|000| → eos - match_next( It = #it{ topic_filter = TopicFilter, @@ -256,8 +245,6 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) -> % TODO make at least remotely readable / optimize later Result = zipfoldr3( fun(LevelHash, Filter, LevelMask, Bits, Shift, {Carry, Acc}) -> - % io:format(user, "~n *** LH: ~.16B / F: ~.16B / M: ~.16B / Bs: ~B / Sh: ~B~n", [LevelHash, Filter, LevelMask, Bits, Shift]), - % io:format(user, "~n *** Carry: ~B / Acc: ~.16B~n", [Carry, Acc]), case LevelMask of 0 when Carry == 0 -> {0, Acc + (LevelHash bsl Shift)}; @@ -288,12 +275,6 @@ compute_next_seek(TopicHash, HashFilter, HashBitmask, BitsPerLevel) -> none end. -% zipfoldr3(FoldFun, Acc, I1, I2, I3, Shift, [Bits]) -> -% { Shift + Bits -% , FoldFun( I1 band ones(Bits) -% , I2 band ones(Bits) -% , I3 band ones(Bits) -% , Bits, Acc ) }; zipfoldr3(_FoldFun, Acc, _, _, _, []) -> {0, Acc}; zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) -> @@ -305,7 +286,6 @@ zipfoldr3(FoldFun, Acc, I1, I2, I3, [Bits | Rest]) -> I3, Rest ), - % { FoldFun(I1 band ones(Bits), I2 band ones(Bits), I3 band ones(Bits), Bits, AccNext). { Shift + Bits, FoldFun( @@ -377,6 +357,14 @@ compute_test_next_seek(TopicHash, HashFilter, HashBitmask) -> next_seek_test_() -> [ + ?_assertMatch( + none, + compute_test_next_seek( + 16#FD_42_4242_043, + 16#FD_42_4242_042, + 16#FF_FF_FFFF_FFF + ) + ), ?_assertMatch( 16#FD_11_0678_000, compute_test_next_seek( diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index 583665eba..e565df455 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -22,8 +22,6 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("proper/include/proper.hrl"). --define(DB_FILE, ?MODULE_STRING). - %% Smoke test of store function t_store(Config) -> DB = ?config(handle, Config), @@ -60,14 +58,77 @@ t_iterate(Config) -> ], ok. +%% Smoke test for iteration with wildcard topic filter +t_iterate_wildcard(Config) -> + DB = ?config(handle, Config), + %% Prepare data: + Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], + Timestamps = lists:seq(1, 10), + _ = [ + store(DB, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 0)]) + ), + ?assertEqual( + [], + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 10 + 1)]) + ), + ?assertEqual( + lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "#", 5)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/#", 0)]) + ), + ?assertEqual( + lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+", 0)]) + ), + ?assertEqual( + [], + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "foo/+/bar", 0)]) + ), + ?assertEqual( + lists:sort([ + {Topic, PublishedAt} + || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps + ]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "+/bar/#", 0)]) + ), + ?assertEqual( + lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/#", 0)]) + ), + ok. + +store(DB, PublishedAt, Topic, Payload) -> + ID = emqx_guid:gen(), + emqx_replay_message_storage:store(DB, ID, PublishedAt, parse_topic(Topic), Payload). + +iterate(DB, TopicFilter, StartTime) -> + {ok, It} = emqx_replay_message_storage:make_iterator(DB, parse_topic(TopicFilter), StartTime), + iterate(It). + iterate(It) -> case emqx_replay_message_storage:next(It) of - {value, Val} -> - [Val | iterate(It)]; + {value, Payload, ItNext} -> + [Payload | iterate(ItNext)]; none -> [] end. +parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> + Topic; +parse_topic(Topic) -> + emqx_topic:words(iolist_to_binary(Topic)). + %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE).