feat(ds): LTS bitfield storage passes all tests
This commit is contained in:
parent
56b6b176c2
commit
164ae9e94a
|
@ -88,7 +88,8 @@
|
|||
bin_key_to_vector/2,
|
||||
next_range/3,
|
||||
key_to_bitstring/2,
|
||||
bitstring_to_key/2
|
||||
bitstring_to_key/2,
|
||||
bitsize/1
|
||||
]).
|
||||
|
||||
-export_type([vector/0, key/0, dimension/0, offset/0, bitsize/0, bitsource/0, keymapper/0]).
|
||||
|
@ -146,7 +147,7 @@
|
|||
|
||||
-opaque keymapper() :: #keymapper{}.
|
||||
|
||||
-type scalar_range() :: any | {'=', scalar()} | {'>=', scalar()}.
|
||||
-type scalar_range() :: any | {'=', scalar() | infinity} | {'>=', scalar()}.
|
||||
|
||||
%%================================================================================
|
||||
%% API functions
|
||||
|
@ -179,6 +180,10 @@ make_keymapper(Bitsources) ->
|
|||
dim_sizeof = DimSizeof
|
||||
}.
|
||||
|
||||
-spec bitsize(keymapper()) -> pos_integer().
|
||||
bitsize(#keymapper{size = Size}) ->
|
||||
Size.
|
||||
|
||||
%% @doc Map N-dimensional vector to a scalar key.
|
||||
%%
|
||||
%% Note: this function is not injective.
|
||||
|
@ -264,8 +269,12 @@ next_range(Keymapper, Filter0, PrevKey) ->
|
|||
|
||||
-spec bitstring_to_key(keymapper(), bitstring()) -> key().
|
||||
bitstring_to_key(#keymapper{size = Size}, Bin) ->
|
||||
<<Key:Size>> = Bin,
|
||||
Key.
|
||||
case Bin of
|
||||
<<Key:Size>> ->
|
||||
Key;
|
||||
_ ->
|
||||
error({invalid_key, Bin, Size})
|
||||
end.
|
||||
|
||||
-spec key_to_bitstring(keymapper(), key()) -> bitstring().
|
||||
key_to_bitstring(#keymapper{size = Size}, Key) ->
|
||||
|
@ -329,6 +338,9 @@ desugar_filter(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
|
|||
fun
|
||||
({any, Bitsize}) ->
|
||||
{0, ones(Bitsize)};
|
||||
({{'=', infinity}, Bitsize}) ->
|
||||
Val = ones(Bitsize),
|
||||
{Val, Val};
|
||||
({{'=', Val}, _Bitsize}) ->
|
||||
{Val, Val};
|
||||
({{'>=', Val}, Bitsize}) ->
|
||||
|
@ -470,6 +482,14 @@ vector_to_key4_test() ->
|
|||
Schema = [{1, 0, 8}, {2, 0, 8}, {1, 8, 8}, {2, 16, 8}],
|
||||
?assertEqual(16#bb112211, vec2key(Schema, [16#aa1111, 16#bb2222])).
|
||||
|
||||
%% Test with binaries:
|
||||
vector_to_key_bin_test() ->
|
||||
Schema = [{1, 0, 8 * 4}, {2, 0, 8 * 5}, {3, 0, 8 * 5}],
|
||||
Keymapper = make_keymapper(lists:reverse(Schema)),
|
||||
?assertMatch(
|
||||
<<"wellhelloworld">>, bin_vector_to_key(Keymapper, [<<"well">>, <<"hello">>, <<"world">>])
|
||||
).
|
||||
|
||||
key_to_vector0_test() ->
|
||||
Schema = [],
|
||||
key2vec(Schema, []).
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
-export_type([options/0]).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
|
@ -52,7 +53,7 @@
|
|||
#{
|
||||
bits_per_wildcard_level := pos_integer(),
|
||||
topic_index_bytes := pos_integer(),
|
||||
epoch_bits := non_neg_integer(),
|
||||
ts_bits := non_neg_integer(),
|
||||
ts_offset_bits := non_neg_integer()
|
||||
}.
|
||||
|
||||
|
@ -80,6 +81,8 @@
|
|||
((KEY band BITMASK) =:= BITFILTER)
|
||||
).
|
||||
|
||||
-define(COUNTER, emqx_ds_storage_bitfield_lts_counter).
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
%%================================================================================
|
||||
|
@ -92,20 +95,17 @@ create(_ShardId, DBHandle, GenId, Options) ->
|
|||
%% Get options:
|
||||
BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
|
||||
TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
|
||||
TSOffsetBits = maps:get(epoch_bits, Options, 5),
|
||||
TSOffsetBits = maps:get(epoch_bits, Options, 8), %% TODO: change to 10 to make it around ~1 sec
|
||||
%% Create column families:
|
||||
DataCFName = data_cf(GenId),
|
||||
TrieCFName = trie_cf(GenId),
|
||||
{ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []),
|
||||
{ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []),
|
||||
%% Create schema:
|
||||
|
||||
% Fixed size_of MQTT message timestamp
|
||||
SizeOfTS = 64,
|
||||
Schema = #{
|
||||
bits_per_wildcard_level => BitsPerTopicLevel,
|
||||
topic_index_bytes => TopicIndexBytes,
|
||||
epoch_bits => SizeOfTS - TSOffsetBits,
|
||||
ts_bits => 64,
|
||||
ts_offset_bits => TSOffsetBits
|
||||
},
|
||||
{Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
|
||||
|
@ -114,19 +114,19 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
|
|||
#{
|
||||
bits_per_wildcard_level := BitsPerTopicLevel,
|
||||
topic_index_bytes := TopicIndexBytes,
|
||||
epoch_bits := EpochBits,
|
||||
ts_bits := TSBits,
|
||||
ts_offset_bits := TSOffsetBits
|
||||
} = Schema,
|
||||
{_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
|
||||
{_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
|
||||
Trie = restore_trie(TopicIndexBytes, DBHandle, TrieCF),
|
||||
%% If user's topics have more than learned 10 wildcard levels then
|
||||
%% total carnage is going on; learned topic structure doesn't
|
||||
%% really apply:
|
||||
%% If user's topics have more than learned 10 wildcard levels,
|
||||
%% then it's total carnage; learned topic structure won't help
|
||||
%% much:
|
||||
MaxWildcardLevels = 10,
|
||||
Keymappers = array:from_list(
|
||||
[
|
||||
make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N)
|
||||
make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N)
|
||||
|| N <- lists:seq(0, MaxWildcardLevels)
|
||||
]
|
||||
),
|
||||
|
@ -180,11 +180,18 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
|
|||
% TODO: ugh, so ugly
|
||||
NVarying = length(KeyFilter) - 2,
|
||||
Keymapper = array:get(NVarying, Keymappers),
|
||||
{ok, ITHandle} = rocksdb:iterator(DB, CF, []),
|
||||
%% Calculate lower and upper bounds for iteration:
|
||||
LowerBound = lower_bound(Keymapper, KeyFilter),
|
||||
UpperBound = upper_bound(Keymapper, KeyFilter),
|
||||
{ok, ITHandle} = rocksdb:iterator(DB, CF, [
|
||||
{iterate_lower_bound, LowerBound}, {iterate_upper_bound, UpperBound}
|
||||
]),
|
||||
try
|
||||
put(?COUNTER, 0),
|
||||
next_loop(ITHandle, Keymapper, It0, [], BatchSize)
|
||||
after
|
||||
rocksdb:iterator_close(ITHandle)
|
||||
rocksdb:iterator_close(ITHandle),
|
||||
erase(?COUNTER)
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
|
@ -193,35 +200,42 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
|
|||
|
||||
next_loop(_, _, It, Acc, 0) ->
|
||||
{ok, It, lists:reverse(Acc)};
|
||||
next_loop(ITHandle, KeyMapper, It0, Acc0, N0) ->
|
||||
{Key1, Bitmask, Bitfilter} = next_range(KeyMapper, It0),
|
||||
case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of
|
||||
{ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) ->
|
||||
Msg = deserialize(Val),
|
||||
It1 = It0#it{last_seen_key = Key},
|
||||
case check_message(It1, Msg) of
|
||||
true ->
|
||||
N1 = N0 - 1,
|
||||
Acc1 = [Msg | Acc0];
|
||||
false ->
|
||||
N1 = N0,
|
||||
Acc1 = Acc0
|
||||
end,
|
||||
{N, It, Acc} = traverse_interval(
|
||||
ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1
|
||||
),
|
||||
next_loop(ITHandle, KeyMapper, It, Acc, N);
|
||||
{ok, Key, _Val} ->
|
||||
It = It0#it{last_seen_key = Key},
|
||||
next_loop(ITHandle, KeyMapper, It, Acc0, N0);
|
||||
{error, invalid_iterator} ->
|
||||
next_loop(ITHandle, KeyMapper, It0 = #it{last_seen_key = Key0, key_filter = KeyFilter}, Acc0, N0) ->
|
||||
inc_counter(),
|
||||
case next_range(KeyMapper, It0) of
|
||||
{Key1, Bitmask, Bitfilter} when Key1 > Key0 ->
|
||||
case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of
|
||||
{ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) ->
|
||||
assert_progress(bitmask_match, KeyMapper, KeyFilter, Key0, Key1),
|
||||
Msg = deserialize(Val),
|
||||
It1 = It0#it{last_seen_key = Key},
|
||||
case check_message(It1, Msg) of
|
||||
true ->
|
||||
N1 = N0 - 1,
|
||||
Acc1 = [Msg | Acc0];
|
||||
false ->
|
||||
N1 = N0,
|
||||
Acc1 = Acc0
|
||||
end,
|
||||
{N, It, Acc} = traverse_interval(
|
||||
ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1
|
||||
),
|
||||
next_loop(ITHandle, KeyMapper, It, Acc, N);
|
||||
{ok, Key, _Val} ->
|
||||
assert_progress(bitmask_miss, KeyMapper, KeyFilter, Key0, Key1),
|
||||
It = It0#it{last_seen_key = Key},
|
||||
next_loop(ITHandle, KeyMapper, It, Acc0, N0);
|
||||
{error, invalid_iterator} ->
|
||||
{ok, It0, lists:reverse(Acc0)}
|
||||
end;
|
||||
_ ->
|
||||
{ok, It0, lists:reverse(Acc0)}
|
||||
end.
|
||||
|
||||
traverse_interval(_, _, _, _, It, Acc, 0) ->
|
||||
{0, It, Acc};
|
||||
traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It0, Acc, N) ->
|
||||
%% TODO: supply the upper limit to rocksdb to the last extra seek:
|
||||
inc_counter(),
|
||||
case iterator_move(KeyMapper, ITHandle, next) of
|
||||
{ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) ->
|
||||
Msg = deserialize(Val),
|
||||
|
@ -265,6 +279,28 @@ iterator_move(KeyMapper, ITHandle, Action0) ->
|
|||
Other
|
||||
end.
|
||||
|
||||
assert_progress(_Msg, _KeyMapper, _KeyFilter, Key0, Key1) when Key1 > Key0 ->
|
||||
?tp_ignore_side_effects_in_prod(
|
||||
emqx_ds_storage_bitfield_lts_iter_move,
|
||||
#{ location => _Msg
|
||||
, key0 => format_key(_KeyMapper, Key0)
|
||||
, key1 => format_key(_KeyMapper, Key1)
|
||||
}),
|
||||
ok;
|
||||
assert_progress(Msg, KeyMapper, KeyFilter, Key0, Key1) ->
|
||||
Str0 = format_key(KeyMapper, Key0),
|
||||
Str1 = format_key(KeyMapper, Key1),
|
||||
error(#{'$msg' => Msg, key0 => Str0, key1 => Str1, step => get(?COUNTER), keyfilter => lists:map(fun format_keyfilter/1, KeyFilter)}).
|
||||
|
||||
format_key(KeyMapper, Key) ->
|
||||
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
|
||||
lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
|
||||
|
||||
format_keyfilter(any) ->
|
||||
any;
|
||||
format_keyfilter({Op, Val}) ->
|
||||
{Op, integer_to_list(Val, 16)}.
|
||||
|
||||
-spec make_key(#s{}, #message{}) -> {binary(), [binary()]}.
|
||||
make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
|
||||
Tokens = emqx_topic:tokens(TopicBin),
|
||||
|
@ -303,15 +339,33 @@ deserialize(Blob) ->
|
|||
-define(BYTE_SIZE, 8).
|
||||
|
||||
%% erlfmt-ignore
|
||||
make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N) ->
|
||||
make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
|
||||
Bitsources =
|
||||
%% Dimension Offset Bitsize
|
||||
[{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index
|
||||
{2, TSOffsetBits, EpochBits }] ++ %% Timestamp epoch
|
||||
{2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch
|
||||
[{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels
|
||||
|| I <- lists:seq(1, N)] ++
|
||||
[{2, 0, TSOffsetBits }], %% Timestamp offset
|
||||
emqx_ds_bitmask_keymapper:make_keymapper(Bitsources).
|
||||
Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)),
|
||||
%% Assert:
|
||||
case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of
|
||||
0 ->
|
||||
ok;
|
||||
_ ->
|
||||
error(#{'$msg' => "Non-even key size", bitsources => Bitsources})
|
||||
end,
|
||||
Keymapper.
|
||||
|
||||
upper_bound(Keymapper, [TopicIndex | Rest]) ->
|
||||
filter_to_key(Keymapper, [TopicIndex | [{'=', infinity} || _ <- Rest]]).
|
||||
|
||||
lower_bound(Keymapper, [TopicIndex | Rest]) ->
|
||||
filter_to_key(Keymapper, [TopicIndex | [{'=', 0} || _ <- Rest]]).
|
||||
|
||||
filter_to_key(KeyMapper, KeyFilter) ->
|
||||
{Key, _, _} = emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, 0),
|
||||
emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Key).
|
||||
|
||||
-spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
|
||||
restore_trie(TopicIndexBytes, DB, CF) ->
|
||||
|
@ -335,6 +389,10 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) ->
|
|||
read_persisted_trie(IT, {error, invalid_iterator}) ->
|
||||
[].
|
||||
|
||||
inc_counter() ->
|
||||
N = get(?COUNTER),
|
||||
put(?COUNTER, N + 1).
|
||||
|
||||
%% @doc Generate a column family ID for the MQTT messages
|
||||
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
|
||||
data_cf(GenId) ->
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
-define(SHARD, shard(?FUNCTION_NAME)).
|
||||
|
@ -72,10 +73,10 @@ t_iterate(_Config) ->
|
|||
|
||||
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
|
||||
|
||||
%% Smoke test that verifies that concrete topics become individual
|
||||
%% streams, unless there's too many of them
|
||||
%% Smoke test that verifies that concrete topics are mapped to
|
||||
%% individual streams, unless there's too many of them.
|
||||
t_get_streams(_Config) ->
|
||||
%% Prepare data:
|
||||
%% Prepare data (without wildcards):
|
||||
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
|
||||
Timestamps = lists:seq(1, 10),
|
||||
Batch = [
|
||||
|
@ -91,11 +92,13 @@ t_get_streams(_Config) ->
|
|||
[FooBar = {_, _}] = GetStream(<<"foo/bar">>),
|
||||
[FooBarBaz] = GetStream(<<"foo/bar/baz">>),
|
||||
[A] = GetStream(<<"a">>),
|
||||
%% Restart shard to make sure trie is persisted:
|
||||
%% Restart shard to make sure trie is persisted and restored:
|
||||
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
|
||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
|
||||
%% Test various wildcards:
|
||||
%% Verify that there are no "ghost streams" for topics that don't
|
||||
%% have any messages:
|
||||
[] = GetStream(<<"bar/foo">>),
|
||||
%% Test some wildcard patterns:
|
||||
?assertEqual([FooBar], GetStream("+/+")),
|
||||
?assertSameSet([FooBar, FooBarBaz], GetStream(<<"foo/#">>)),
|
||||
?assertSameSet([FooBar, FooBarBaz, A], GetStream(<<"#">>)),
|
||||
|
@ -110,11 +113,139 @@ t_get_streams(_Config) ->
|
|||
ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []),
|
||||
%% Check that "foo/bar/baz" topic now appears in two streams:
|
||||
%% "foo/bar/baz" and "foo/bar/+":
|
||||
NewStreams = lists:sort(GetStream(<<"foo/bar/baz">>)),
|
||||
NewStreams = lists:sort(GetStream("foo/bar/baz")),
|
||||
?assertMatch([_, _], NewStreams),
|
||||
?assertMatch([_], NewStreams -- [FooBarBaz]),
|
||||
?assert(lists:member(FooBarBaz, NewStreams)),
|
||||
%% Verify that size of the trie is still relatively small, even
|
||||
%% after processing 200+ topics:
|
||||
AllStreams = GetStream("#"),
|
||||
NTotal = length(AllStreams),
|
||||
?assert(NTotal < 30, {NTotal, '<', 30}),
|
||||
?assert(lists:member(FooBar, AllStreams)),
|
||||
?assert(lists:member(FooBarBaz, AllStreams)),
|
||||
?assert(lists:member(A, AllStreams)),
|
||||
ok.
|
||||
|
||||
t_replay(_Config) ->
|
||||
%% Create concrete topics:
|
||||
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],
|
||||
Timestamps = lists:seq(1, 10),
|
||||
Batch1 = [
|
||||
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|
||||
|| Topic <- Topics, PublishedAt <- Timestamps
|
||||
],
|
||||
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
|
||||
%% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
|
||||
Batch2 = [
|
||||
begin
|
||||
B = integer_to_binary(I),
|
||||
make_message(
|
||||
TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS)
|
||||
)
|
||||
end
|
||||
|| I <- lists:seq(1, 200), TS <- lists:seq(1, 10), Suffix <- [<<"foo">>, <<"bar">>]
|
||||
],
|
||||
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
|
||||
%% Check various topic filters:
|
||||
Messages = Batch1 ++ Batch2,
|
||||
%% Missing topics (no ghost messages):
|
||||
?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)),
|
||||
%% Regular topics:
|
||||
?assert(check(?SHARD, <<"foo/bar">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"foo/bar/baz">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"foo/#">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"foo/+">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
|
||||
%% Learned wildcard topics:
|
||||
?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
|
||||
?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"wildcard/100/suffix/foo">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"wildcard/+/suffix/foo">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"wildcard/1/suffix/+">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"wildcard/100/suffix/+">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"wildcard/#">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"wildcard/1/#">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"wildcard/100/#">>, 0, Messages)),
|
||||
?assert(check(?SHARD, <<"#">>, 0, Messages)),
|
||||
ok.
|
||||
|
||||
check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
|
||||
ExpectedFiltered = lists:filter(
|
||||
fun(#message{topic = Topic, timestamp = TS}) ->
|
||||
emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime
|
||||
end,
|
||||
ExpectedMessages
|
||||
),
|
||||
?check_trace(
|
||||
#{timetrap => 10_000},
|
||||
begin
|
||||
Dump = dump_messages(Shard, TopicFilter, StartTime),
|
||||
verify_dump(TopicFilter, StartTime, Dump),
|
||||
Missing = ExpectedFiltered -- Dump,
|
||||
Extras = Dump -- ExpectedFiltered,
|
||||
?assertMatch(
|
||||
#{missing := [], unexpected := []},
|
||||
#{
|
||||
missing => Missing,
|
||||
unexpected => Extras,
|
||||
topic_filter => TopicFilter,
|
||||
start_time => StartTime
|
||||
}
|
||||
)
|
||||
end,
|
||||
[]),
|
||||
length(ExpectedFiltered) > 0.
|
||||
|
||||
verify_dump(TopicFilter, StartTime, Dump) ->
|
||||
lists:foldl(
|
||||
fun(#message{topic = Topic, timestamp = TS}, Acc) ->
|
||||
%% Verify that the topic of the message returned by the
|
||||
%% iterator matches the expected topic filter:
|
||||
?assert(emqx_topic:match(Topic, TopicFilter), {unexpected_topic, Topic, TopicFilter}),
|
||||
%% Verify that timestamp of the message is greater than
|
||||
%% the StartTime of the iterator:
|
||||
?assert(TS >= StartTime, {start_time, TopicFilter, TS, StartTime}),
|
||||
%% Verify that iterator didn't reorder messages
|
||||
%% (timestamps for each topic are growing):
|
||||
LastTopicTs = maps:get(Topic, Acc, -1),
|
||||
?assert(TS >= LastTopicTs, {topic_ts_reordering, Topic, TS, LastTopicTs}),
|
||||
Acc#{Topic => TS}
|
||||
end,
|
||||
#{},
|
||||
Dump
|
||||
).
|
||||
|
||||
dump_messages(Shard, TopicFilter, StartTime) ->
|
||||
Streams = emqx_ds_storage_layer:get_streams(Shard, parse_topic(TopicFilter), StartTime),
|
||||
lists:flatmap(
|
||||
fun({_Rank, Stream}) ->
|
||||
dump_stream(Shard, Stream, TopicFilter, StartTime)
|
||||
end,
|
||||
Streams
|
||||
).
|
||||
|
||||
dump_stream(Shard, Stream, TopicFilter, StartTime) ->
|
||||
BatchSize = 3,
|
||||
{ok, Iterator} = emqx_ds_storage_layer:make_iterator(
|
||||
Shard, Stream, parse_topic(TopicFilter), StartTime
|
||||
),
|
||||
Loop = fun F(It, 0) ->
|
||||
error({too_many_iterations, It});
|
||||
F(It, N) ->
|
||||
case emqx_ds_storage_layer:next(Shard, It, BatchSize) of
|
||||
end_of_stream ->
|
||||
[];
|
||||
{ok, _NextIt, []} ->
|
||||
[];
|
||||
{ok, NextIt, Batch} ->
|
||||
Batch ++ F(NextIt, N - 1)
|
||||
end
|
||||
end,
|
||||
MaxIterations = 1000,
|
||||
Loop(Iterator, MaxIterations).
|
||||
|
||||
%% Smoke test for iteration with wildcard topic filter
|
||||
%% t_iterate_wildcard(_Config) ->
|
||||
%% %% Prepare data:
|
||||
|
@ -317,6 +448,7 @@ parse_topic(Topic) ->
|
|||
%% CT callbacks
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
suite() -> [{timetrap, {seconds, 20}}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
{ok, _} = application:ensure_all_started(emqx_durable_storage),
|
||||
|
|
Loading…
Reference in New Issue