diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index 4b6fcbcdf..5c3ae42d8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -88,7 +88,8 @@ bin_key_to_vector/2, next_range/3, key_to_bitstring/2, - bitstring_to_key/2 + bitstring_to_key/2, + bitsize/1 ]). -export_type([vector/0, key/0, dimension/0, offset/0, bitsize/0, bitsource/0, keymapper/0]). @@ -146,7 +147,7 @@ -opaque keymapper() :: #keymapper{}. --type scalar_range() :: any | {'=', scalar()} | {'>=', scalar()}. +-type scalar_range() :: any | {'=', scalar() | infinity} | {'>=', scalar()}. %%================================================================================ %% API functions @@ -179,6 +180,10 @@ make_keymapper(Bitsources) -> dim_sizeof = DimSizeof }. +-spec bitsize(keymapper()) -> pos_integer(). +bitsize(#keymapper{size = Size}) -> + Size. + %% @doc Map N-dimensional vector to a scalar key. %% %% Note: this function is not injective. @@ -264,8 +269,12 @@ next_range(Keymapper, Filter0, PrevKey) -> -spec bitstring_to_key(keymapper(), bitstring()) -> key(). bitstring_to_key(#keymapper{size = Size}, Bin) -> - <> = Bin, - Key. + case Bin of + <> -> + Key; + _ -> + error({invalid_key, Bin, Size}) + end. -spec key_to_bitstring(keymapper(), key()) -> bitstring(). key_to_bitstring(#keymapper{size = Size}, Key) -> @@ -329,6 +338,9 @@ desugar_filter(#keymapper{dim_sizeof = DimSizeof}, Filter) -> fun ({any, Bitsize}) -> {0, ones(Bitsize)}; + ({{'=', infinity}, Bitsize}) -> + Val = ones(Bitsize), + {Val, Val}; ({{'=', Val}, _Bitsize}) -> {Val, Val}; ({{'>=', Val}, Bitsize}) -> @@ -470,6 +482,14 @@ vector_to_key4_test() -> Schema = [{1, 0, 8}, {2, 0, 8}, {1, 8, 8}, {2, 16, 8}], ?assertEqual(16#bb112211, vec2key(Schema, [16#aa1111, 16#bb2222])). +%% Test with binaries: +vector_to_key_bin_test() -> + Schema = [{1, 0, 8 * 4}, {2, 0, 8 * 5}, {3, 0, 8 * 5}], + Keymapper = make_keymapper(lists:reverse(Schema)), + ?assertMatch( + <<"wellhelloworld">>, bin_vector_to_key(Keymapper, [<<"well">>, <<"hello">>, <<"world">>]) + ). + key_to_vector0_test() -> Schema = [], key2vec(Schema, []). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index e8bfdaa2e..7b8fbab0d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -35,6 +35,7 @@ -export_type([options/0]). -include_lib("emqx/include/emqx.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ %% Type declarations @@ -52,7 +53,7 @@ #{ bits_per_wildcard_level := pos_integer(), topic_index_bytes := pos_integer(), - epoch_bits := non_neg_integer(), + ts_bits := non_neg_integer(), ts_offset_bits := non_neg_integer() }. @@ -80,6 +81,8 @@ ((KEY band BITMASK) =:= BITFILTER) ). +-define(COUNTER, emqx_ds_storage_bitfield_lts_counter). + %%================================================================================ %% API funcions %%================================================================================ @@ -92,20 +95,17 @@ create(_ShardId, DBHandle, GenId, Options) -> %% Get options: BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), - TSOffsetBits = maps:get(epoch_bits, Options, 5), + TSOffsetBits = maps:get(epoch_bits, Options, 8), %% TODO: change to 10 to make it around ~1 sec %% Create column families: DataCFName = data_cf(GenId), TrieCFName = trie_cf(GenId), {ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []), {ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []), %% Create schema: - - % Fixed size_of MQTT message timestamp - SizeOfTS = 64, Schema = #{ bits_per_wildcard_level => BitsPerTopicLevel, topic_index_bytes => TopicIndexBytes, - epoch_bits => SizeOfTS - TSOffsetBits, + ts_bits => 64, ts_offset_bits => TSOffsetBits }, {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}. @@ -114,19 +114,19 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> #{ bits_per_wildcard_level := BitsPerTopicLevel, topic_index_bytes := TopicIndexBytes, - epoch_bits := EpochBits, + ts_bits := TSBits, ts_offset_bits := TSOffsetBits } = Schema, {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), Trie = restore_trie(TopicIndexBytes, DBHandle, TrieCF), - %% If user's topics have more than learned 10 wildcard levels then - %% total carnage is going on; learned topic structure doesn't - %% really apply: + %% If user's topics have more than learned 10 wildcard levels, + %% then it's total carnage; learned topic structure won't help + %% much: MaxWildcardLevels = 10, Keymappers = array:from_list( [ - make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N) + make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) || N <- lists:seq(0, MaxWildcardLevels) ] ), @@ -180,11 +180,18 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> % TODO: ugh, so ugly NVarying = length(KeyFilter) - 2, Keymapper = array:get(NVarying, Keymappers), - {ok, ITHandle} = rocksdb:iterator(DB, CF, []), + %% Calculate lower and upper bounds for iteration: + LowerBound = lower_bound(Keymapper, KeyFilter), + UpperBound = upper_bound(Keymapper, KeyFilter), + {ok, ITHandle} = rocksdb:iterator(DB, CF, [ + {iterate_lower_bound, LowerBound}, {iterate_upper_bound, UpperBound} + ]), try + put(?COUNTER, 0), next_loop(ITHandle, Keymapper, It0, [], BatchSize) after - rocksdb:iterator_close(ITHandle) + rocksdb:iterator_close(ITHandle), + erase(?COUNTER) end. %%================================================================================ @@ -193,35 +200,42 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> next_loop(_, _, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; -next_loop(ITHandle, KeyMapper, It0, Acc0, N0) -> - {Key1, Bitmask, Bitfilter} = next_range(KeyMapper, It0), - case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of - {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) -> - Msg = deserialize(Val), - It1 = It0#it{last_seen_key = Key}, - case check_message(It1, Msg) of - true -> - N1 = N0 - 1, - Acc1 = [Msg | Acc0]; - false -> - N1 = N0, - Acc1 = Acc0 - end, - {N, It, Acc} = traverse_interval( - ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1 - ), - next_loop(ITHandle, KeyMapper, It, Acc, N); - {ok, Key, _Val} -> - It = It0#it{last_seen_key = Key}, - next_loop(ITHandle, KeyMapper, It, Acc0, N0); - {error, invalid_iterator} -> +next_loop(ITHandle, KeyMapper, It0 = #it{last_seen_key = Key0, key_filter = KeyFilter}, Acc0, N0) -> + inc_counter(), + case next_range(KeyMapper, It0) of + {Key1, Bitmask, Bitfilter} when Key1 > Key0 -> + case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of + {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) -> + assert_progress(bitmask_match, KeyMapper, KeyFilter, Key0, Key1), + Msg = deserialize(Val), + It1 = It0#it{last_seen_key = Key}, + case check_message(It1, Msg) of + true -> + N1 = N0 - 1, + Acc1 = [Msg | Acc0]; + false -> + N1 = N0, + Acc1 = Acc0 + end, + {N, It, Acc} = traverse_interval( + ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1 + ), + next_loop(ITHandle, KeyMapper, It, Acc, N); + {ok, Key, _Val} -> + assert_progress(bitmask_miss, KeyMapper, KeyFilter, Key0, Key1), + It = It0#it{last_seen_key = Key}, + next_loop(ITHandle, KeyMapper, It, Acc0, N0); + {error, invalid_iterator} -> + {ok, It0, lists:reverse(Acc0)} + end; + _ -> {ok, It0, lists:reverse(Acc0)} end. traverse_interval(_, _, _, _, It, Acc, 0) -> {0, It, Acc}; traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It0, Acc, N) -> - %% TODO: supply the upper limit to rocksdb to the last extra seek: + inc_counter(), case iterator_move(KeyMapper, ITHandle, next) of {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) -> Msg = deserialize(Val), @@ -265,6 +279,28 @@ iterator_move(KeyMapper, ITHandle, Action0) -> Other end. +assert_progress(_Msg, _KeyMapper, _KeyFilter, Key0, Key1) when Key1 > Key0 -> + ?tp_ignore_side_effects_in_prod( + emqx_ds_storage_bitfield_lts_iter_move, + #{ location => _Msg + , key0 => format_key(_KeyMapper, Key0) + , key1 => format_key(_KeyMapper, Key1) + }), + ok; +assert_progress(Msg, KeyMapper, KeyFilter, Key0, Key1) -> + Str0 = format_key(KeyMapper, Key0), + Str1 = format_key(KeyMapper, Key1), + error(#{'$msg' => Msg, key0 => Str0, key1 => Str1, step => get(?COUNTER), keyfilter => lists:map(fun format_keyfilter/1, KeyFilter)}). + +format_key(KeyMapper, Key) -> + Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], + lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). + +format_keyfilter(any) -> + any; +format_keyfilter({Op, Val}) -> + {Op, integer_to_list(Val, 16)}. + -spec make_key(#s{}, #message{}) -> {binary(), [binary()]}. make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) -> Tokens = emqx_topic:tokens(TopicBin), @@ -303,15 +339,33 @@ deserialize(Blob) -> -define(BYTE_SIZE, 8). %% erlfmt-ignore -make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N) -> +make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> Bitsources = %% Dimension Offset Bitsize [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index - {2, TSOffsetBits, EpochBits }] ++ %% Timestamp epoch + {2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch [{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels || I <- lists:seq(1, N)] ++ [{2, 0, TSOffsetBits }], %% Timestamp offset - emqx_ds_bitmask_keymapper:make_keymapper(Bitsources). + Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)), + %% Assert: + case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of + 0 -> + ok; + _ -> + error(#{'$msg' => "Non-even key size", bitsources => Bitsources}) + end, + Keymapper. + +upper_bound(Keymapper, [TopicIndex | Rest]) -> + filter_to_key(Keymapper, [TopicIndex | [{'=', infinity} || _ <- Rest]]). + +lower_bound(Keymapper, [TopicIndex | Rest]) -> + filter_to_key(Keymapper, [TopicIndex | [{'=', 0} || _ <- Rest]]). + +filter_to_key(KeyMapper, KeyFilter) -> + {Key, _, _} = emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, 0), + emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Key). -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). restore_trie(TopicIndexBytes, DB, CF) -> @@ -335,6 +389,10 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) -> read_persisted_trie(IT, {error, invalid_iterator}) -> []. +inc_counter() -> + N = get(?COUNTER), + put(?COUNTER, N + 1). + %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 22a608a7f..957383f30 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -8,6 +8,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/assert.hrl"). -define(SHARD, shard(?FUNCTION_NAME)). @@ -72,10 +73,10 @@ t_iterate(_Config) -> -define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). -%% Smoke test that verifies that concrete topics become individual -%% streams, unless there's too many of them +%% Smoke test that verifies that concrete topics are mapped to +%% individual streams, unless there's too many of them. t_get_streams(_Config) -> - %% Prepare data: + %% Prepare data (without wildcards): Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ @@ -91,11 +92,13 @@ t_get_streams(_Config) -> [FooBar = {_, _}] = GetStream(<<"foo/bar">>), [FooBarBaz] = GetStream(<<"foo/bar/baz">>), [A] = GetStream(<<"a">>), - %% Restart shard to make sure trie is persisted: + %% Restart shard to make sure trie is persisted and restored: ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}), - %% Test various wildcards: + %% Verify that there are no "ghost streams" for topics that don't + %% have any messages: [] = GetStream(<<"bar/foo">>), + %% Test some wildcard patterns: ?assertEqual([FooBar], GetStream("+/+")), ?assertSameSet([FooBar, FooBarBaz], GetStream(<<"foo/#">>)), ?assertSameSet([FooBar, FooBarBaz, A], GetStream(<<"#">>)), @@ -110,11 +113,139 @@ t_get_streams(_Config) -> ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []), %% Check that "foo/bar/baz" topic now appears in two streams: %% "foo/bar/baz" and "foo/bar/+": - NewStreams = lists:sort(GetStream(<<"foo/bar/baz">>)), + NewStreams = lists:sort(GetStream("foo/bar/baz")), ?assertMatch([_, _], NewStreams), - ?assertMatch([_], NewStreams -- [FooBarBaz]), + ?assert(lists:member(FooBarBaz, NewStreams)), + %% Verify that size of the trie is still relatively small, even + %% after processing 200+ topics: + AllStreams = GetStream("#"), + NTotal = length(AllStreams), + ?assert(NTotal < 30, {NTotal, '<', 30}), + ?assert(lists:member(FooBar, AllStreams)), + ?assert(lists:member(FooBarBaz, AllStreams)), + ?assert(lists:member(A, AllStreams)), ok. +t_replay(_Config) -> + %% Create concrete topics: + Topics = [<<"foo/bar">>, <<"foo/bar/baz">>], + Timestamps = lists:seq(1, 10), + Batch1 = [ + make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), + %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': + Batch2 = [ + begin + B = integer_to_binary(I), + make_message( + TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS) + ) + end + || I <- lists:seq(1, 200), TS <- lists:seq(1, 10), Suffix <- [<<"foo">>, <<"bar">>] + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + %% Check various topic filters: + Messages = Batch1 ++ Batch2, + %% Missing topics (no ghost messages): + ?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)), + %% Regular topics: + ?assert(check(?SHARD, <<"foo/bar">>, 0, Messages)), + ?assert(check(?SHARD, <<"foo/bar/baz">>, 0, Messages)), + ?assert(check(?SHARD, <<"foo/#">>, 0, Messages)), + ?assert(check(?SHARD, <<"foo/+">>, 0, Messages)), + ?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)), + ?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)), + ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)), + %% Learned wildcard topics: + ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])), + ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)), + ?assert(check(?SHARD, <<"wildcard/100/suffix/foo">>, 0, Messages)), + ?assert(check(?SHARD, <<"wildcard/+/suffix/foo">>, 0, Messages)), + ?assert(check(?SHARD, <<"wildcard/1/suffix/+">>, 0, Messages)), + ?assert(check(?SHARD, <<"wildcard/100/suffix/+">>, 0, Messages)), + ?assert(check(?SHARD, <<"wildcard/#">>, 0, Messages)), + ?assert(check(?SHARD, <<"wildcard/1/#">>, 0, Messages)), + ?assert(check(?SHARD, <<"wildcard/100/#">>, 0, Messages)), + ?assert(check(?SHARD, <<"#">>, 0, Messages)), + ok. + +check(Shard, TopicFilter, StartTime, ExpectedMessages) -> + ExpectedFiltered = lists:filter( + fun(#message{topic = Topic, timestamp = TS}) -> + emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime + end, + ExpectedMessages + ), + ?check_trace( + #{timetrap => 10_000}, + begin + Dump = dump_messages(Shard, TopicFilter, StartTime), + verify_dump(TopicFilter, StartTime, Dump), + Missing = ExpectedFiltered -- Dump, + Extras = Dump -- ExpectedFiltered, + ?assertMatch( + #{missing := [], unexpected := []}, + #{ + missing => Missing, + unexpected => Extras, + topic_filter => TopicFilter, + start_time => StartTime + } + ) + end, + []), + length(ExpectedFiltered) > 0. + +verify_dump(TopicFilter, StartTime, Dump) -> + lists:foldl( + fun(#message{topic = Topic, timestamp = TS}, Acc) -> + %% Verify that the topic of the message returned by the + %% iterator matches the expected topic filter: + ?assert(emqx_topic:match(Topic, TopicFilter), {unexpected_topic, Topic, TopicFilter}), + %% Verify that timestamp of the message is greater than + %% the StartTime of the iterator: + ?assert(TS >= StartTime, {start_time, TopicFilter, TS, StartTime}), + %% Verify that iterator didn't reorder messages + %% (timestamps for each topic are growing): + LastTopicTs = maps:get(Topic, Acc, -1), + ?assert(TS >= LastTopicTs, {topic_ts_reordering, Topic, TS, LastTopicTs}), + Acc#{Topic => TS} + end, + #{}, + Dump + ). + +dump_messages(Shard, TopicFilter, StartTime) -> + Streams = emqx_ds_storage_layer:get_streams(Shard, parse_topic(TopicFilter), StartTime), + lists:flatmap( + fun({_Rank, Stream}) -> + dump_stream(Shard, Stream, TopicFilter, StartTime) + end, + Streams + ). + +dump_stream(Shard, Stream, TopicFilter, StartTime) -> + BatchSize = 3, + {ok, Iterator} = emqx_ds_storage_layer:make_iterator( + Shard, Stream, parse_topic(TopicFilter), StartTime + ), + Loop = fun F(It, 0) -> + error({too_many_iterations, It}); + F(It, N) -> + case emqx_ds_storage_layer:next(Shard, It, BatchSize) of + end_of_stream -> + []; + {ok, _NextIt, []} -> + []; + {ok, NextIt, Batch} -> + Batch ++ F(NextIt, N - 1) + end + end, + MaxIterations = 1000, + Loop(Iterator, MaxIterations). + %% Smoke test for iteration with wildcard topic filter %% t_iterate_wildcard(_Config) -> %% %% Prepare data: @@ -317,6 +448,7 @@ parse_topic(Topic) -> %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE). +suite() -> [{timetrap, {seconds, 20}}]. init_per_suite(Config) -> {ok, _} = application:ensure_all_started(emqx_durable_storage),