diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index dd6af9a03..b1a003e93 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -34,6 +34,8 @@ -export([]). -export_type([ + create_db_opts/0, + builtin_db_opts/0, db/0, time/0, topic_filter/0, @@ -58,7 +60,7 @@ %% Parsed topic filter. -type topic_filter() :: list(binary() | '+' | '#' | ''). --type stream_rank() :: {integer(), integer()}. +-type stream_rank() :: {term(), integer()}. -opaque stream() :: emqx_ds_replication_layer:stream(). @@ -83,9 +85,14 @@ -type message_store_opts() :: #{}. +-type builtin_db_opts() :: + #{ + backend := builtin, + storage := emqx_ds_storage_layer:prototype() + }. + -type create_db_opts() :: - %% TODO: keyspace - #{}. + builtin_db_opts(). -type message_id() :: emqx_ds_replication_layer:message_id(). @@ -96,7 +103,7 @@ %% @doc Different DBs are completely independent from each other. They %% could represent something like different tenants. -spec open_db(db(), create_db_opts()) -> ok. -open_db(DB, Opts) -> +open_db(DB, Opts = #{backend := builtin}) -> emqx_ds_replication_layer:open_db(DB, Opts). %% @doc TODO: currently if one or a few shards are down, they won't be @@ -109,8 +116,7 @@ drop_db(DB) -> store_batch(DB, Msgs, Opts) -> emqx_ds_replication_layer:store_batch(DB, Msgs, Opts). -%% TODO: Do we really need to return message IDs? It's extra work... --spec store_batch(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. +-spec store_batch(db(), [emqx_types:message()]) -> store_batch_result(). store_batch(DB, Msgs) -> store_batch(DB, Msgs, #{}). diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index 2f28de293..4b6fcbcdf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -80,20 +80,24 @@ %%================================================================================ %% API: --export([make_keymapper/1, vector_to_key/2, key_to_vector/2, next_range/3]). - -%% behavior callbacks: --export([]). - -%% internal exports: --export([]). +-export([ + make_keymapper/1, + vector_to_key/2, + bin_vector_to_key/2, + key_to_vector/2, + bin_key_to_vector/2, + next_range/3, + key_to_bitstring/2, + bitstring_to_key/2 +]). -export_type([vector/0, key/0, dimension/0, offset/0, bitsize/0, bitsource/0, keymapper/0]). -compile( {inline, [ ones/1, - extract/2 + extract/2, + extract_inv/2 ]} ). @@ -118,7 +122,7 @@ -type bitsize() :: pos_integer(). %% The resulting 1D key: --type key() :: binary(). +-type key() :: non_neg_integer(). -type bitsource() :: %% Consume `_Size` bits from timestamp starting at `_Offset`th @@ -148,7 +152,8 @@ %% API functions %%================================================================================ -%% @doc +%% @doc Create a keymapper object that stores the "schema" of the +%% transformation from a list of bitsources. %% %% Note: Dimension is 1-based. -spec make_keymapper([bitsource()]) -> keymapper(). @@ -183,6 +188,19 @@ vector_to_key(#keymapper{scanner = []}, []) -> vector_to_key(#keymapper{scanner = [Actions | Scanner]}, [Coord | Vector]) -> do_vector_to_key(Actions, Scanner, Coord, Vector, 0). +%% @doc Same as `vector_to_key', but it works with binaries, and outputs a binary. +-spec bin_vector_to_key(keymapper(), [binary()]) -> binary(). +bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, Binaries) -> + Vec = lists:map( + fun({Bin, SizeOf}) -> + <> = Bin, + Int + end, + lists:zip(Binaries, DimSizeof) + ), + Key = vector_to_key(Keymapper, Vec), + <>. + %% @doc Map key to a vector. %% %% Note: `vector_to_key(key_to_vector(K)) = K' but @@ -202,6 +220,18 @@ key_to_vector(#keymapper{scanner = Scanner}, Key) -> Scanner ). +%% @doc Same as `key_to_vector', but it works with binaries. +-spec bin_key_to_vector(keymapper(), binary()) -> [binary()]. +bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, BinKey) -> + <> = BinKey, + Vector = key_to_vector(Keymapper, Key), + lists:map( + fun({Elem, SizeOf}) -> + <> + end, + lists:zip(Vector, DimSizeof) + ). + %% @doc Given a keymapper, a filter, and a key, return a triple containing: %% %% 1. `NextKey', a key that is greater than the given one, and is @@ -232,6 +262,15 @@ next_range(Keymapper, Filter0, PrevKey) -> {NewKey, Bitmask, Bitfilter} end. +-spec bitstring_to_key(keymapper(), bitstring()) -> key(). +bitstring_to_key(#keymapper{size = Size}, Bin) -> + <> = Bin, + Key. + +-spec key_to_bitstring(keymapper(), key()) -> bitstring(). +key_to_bitstring(#keymapper{size = Size}, Key) -> + <>. + %%================================================================================ %% Internal functions %%================================================================================ @@ -311,7 +350,6 @@ fold_bitsources(Fun, InitAcc, Bitsources) -> Bitsources ). -%% Specialized version of fold: do_vector_to_key([], [], _Coord, [], Acc) -> Acc; do_vector_to_key([], [NewActions | Scanner], _Coord, [NewCoord | Vector], Acc) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index e9d3124f9..a6e67c069 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -24,7 +24,7 @@ %% Debug: -export([trie_next/3, trie_insert/3, dump_to_dot/2]). --export_type([static_key/0, trie/0]). +-export_type([options/0, static_key/0, trie/0]). -include_lib("stdlib/include/ms_transform.hrl"). @@ -43,12 +43,12 @@ -type edge() :: binary() | ?EOT | ?PLUS. %% Fixed size binary --type static_key() :: binary(). +-type static_key() :: non_neg_integer(). -define(PREFIX, prefix). -type state() :: static_key() | ?PREFIX. --type varying() :: [binary()]. +-type varying() :: [binary() | ?PLUS]. -type msg_storage_key() :: {static_key(), varying()}. @@ -56,8 +56,15 @@ -type persist_callback() :: fun((_Key, _Val) -> ok). +-type options() :: + #{ + persist_callback => persist_callback(), + static_key_size => pos_integer() + }. + -record(trie, { persist :: persist_callback(), + static_key_size :: pos_integer(), trie :: ets:tid(), stats :: ets:tid() }). @@ -74,32 +81,40 @@ %%================================================================================ %% @doc Create an empty trie --spec trie_create(persist_callback()) -> trie(). -trie_create(Persist) -> - Trie = ets:new(trie, [{keypos, #trans.key}, set]), - Stats = ets:new(stats, [{keypos, 1}, set]), +-spec trie_create(options()) -> trie(). +trie_create(UserOpts) -> + Defaults = #{ + persist_callback => fun(_, _) -> ok end, + static_key_size => 8 + }, + #{ + persist_callback := Persist, + static_key_size := StaticKeySize + } = maps:merge(Defaults, UserOpts), + Trie = ets:new(trie, [{keypos, #trans.key}, set, public]), + Stats = ets:new(stats, [{keypos, 1}, set, public]), #trie{ persist = Persist, + static_key_size = StaticKeySize, trie = Trie, stats = Stats }. -spec trie_create() -> trie(). trie_create() -> - trie_create(fun(_, _) -> - ok - end). + trie_create(#{}). %% @doc Restore trie from a dump --spec trie_restore(persist_callback(), [{_Key, _Val}]) -> trie(). -trie_restore(Persist, Dump) -> - Trie = trie_create(Persist), +-spec trie_restore(options(), [{_Key, _Val}]) -> trie(). +trie_restore(Options, Dump) -> + Trie = trie_create(Options), lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(Trie, StateFrom, Token, StateTo) end, Dump - ). + ), + Trie. %% @doc Lookup the topic key. Create a new one, if not found. -spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key(). @@ -113,7 +128,7 @@ lookup_topic_key(Trie, Tokens) -> %% @doc Return list of keys of topics that match a given topic filter -spec match_topics(trie(), [binary() | '+' | '#']) -> - [{static_key(), _Varying :: binary() | ?PLUS}]. + [msg_storage_key()]. match_topics(Trie, TopicFilter) -> do_match_topics(Trie, ?PREFIX, [], TopicFilter). @@ -189,7 +204,7 @@ trie_next(#trie{trie = Trie}, State, Token) -> NChildren :: non_neg_integer(), Updated :: false | NChildren. trie_insert(Trie, State, Token) -> - trie_insert(Trie, State, Token, get_id_for_key(State, Token)). + trie_insert(Trie, State, Token, get_id_for_key(Trie, State, Token)). %%================================================================================ %% Internal functions @@ -220,8 +235,8 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token, {false, NextState} end. --spec get_id_for_key(state(), edge()) -> static_key(). -get_id_for_key(_State, _Token) -> +-spec get_id_for_key(trie(), state(), edge()) -> static_key(). +get_id_for_key(#trie{static_key_size = Size}, _State, _Token) -> %% Requirements for the return value: %% %% It should be globally unique for the `{State, Token}` pair. Other @@ -235,7 +250,8 @@ get_id_for_key(_State, _Token) -> %% If we want to impress computer science crowd, sorry, I mean to %% minimize storage requirements, we can even employ Huffman coding %% based on the frequency of messages. - crypto:strong_rand_bytes(8). + <> = crypto:strong_rand_bytes(Size), + Int. %% erlfmt-ignore -spec do_match_topics(trie(), state(), [binary() | '+'], [binary() | '+' | '#']) -> @@ -492,7 +508,7 @@ topic_key_test() -> end, lists:seq(1, 100)) after - dump_to_dot(T, atom_to_list(?FUNCTION_NAME) ++ ".dot") + dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot")) end. %% erlfmt-ignore @@ -539,7 +555,7 @@ topic_match_test() -> {S2_1_, ['+', '+']}]), ok after - dump_to_dot(T, atom_to_list(?FUNCTION_NAME) ++ ".dot") + dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot")) end. -define(keys_history, topic_key_history). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 5b4ad8666..06cead725 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -119,7 +119,7 @@ get_streams(DB, TopicFilter, StartTime) -> Streams = emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime), lists:map( fun({RankY, Stream}) -> - RankX = erlang:phash2(Shard, 255), + RankX = Shard, Rank = {RankX, RankY}, {Rank, #stream{ shard = Shard, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl new file mode 100644 index 000000000..e8bfdaa2e --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -0,0 +1,346 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Reference implementation of the storage. +%% +%% Trivial, extremely slow and inefficient. It also doesn't handle +%% restart of the Erlang node properly, so obviously it's only to be +%% used for testing. +-module(emqx_ds_storage_bitfield_lts). + +-behavior(emqx_ds_storage_layer). + +%% API: +-export([]). + +%% behavior callbacks: +-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). + +%% internal exports: +-export([]). + +-export_type([options/0]). + +-include_lib("emqx/include/emqx.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-type options() :: + #{ + bits_per_wildcard_level => pos_integer(), + topic_index_bytes => pos_integer(), + epoch_bits => non_neg_integer() + }. + +%% Permanent state: +-type schema() :: + #{ + bits_per_wildcard_level := pos_integer(), + topic_index_bytes := pos_integer(), + epoch_bits := non_neg_integer(), + ts_offset_bits := non_neg_integer() + }. + +%% Runtime state: +-record(s, { + db :: rocksdb:db_handle(), + data :: rocksdb:cf_handle(), + trie :: emqx_ds_lts:trie(), + keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()) +}). + +-record(stream, { + storage_key :: emqx_ds_lts:msg_storage_key() +}). + +-record(it, { + topic_filter :: emqx_ds:topic_filter(), + start_time :: emqx_ds:time(), + storage_key :: emqx_ds_lts:msg_storage_key(), + last_seen_key = 0 :: emqx_ds_bitmask_keymapper:key(), + key_filter :: [emqx_ds_bitmask_keymapper:scalar_range()] +}). + +-define(QUICKCHECK_KEY(KEY, BITMASK, BITFILTER), + ((KEY band BITMASK) =:= BITFILTER) +). + +%%================================================================================ +%% API funcions +%%================================================================================ + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +create(_ShardId, DBHandle, GenId, Options) -> + %% Get options: + BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), + TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), + TSOffsetBits = maps:get(epoch_bits, Options, 5), + %% Create column families: + DataCFName = data_cf(GenId), + TrieCFName = trie_cf(GenId), + {ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []), + {ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []), + %% Create schema: + + % Fixed size_of MQTT message timestamp + SizeOfTS = 64, + Schema = #{ + bits_per_wildcard_level => BitsPerTopicLevel, + topic_index_bytes => TopicIndexBytes, + epoch_bits => SizeOfTS - TSOffsetBits, + ts_offset_bits => TSOffsetBits + }, + {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}. + +open(_Shard, DBHandle, GenId, CFRefs, Schema) -> + #{ + bits_per_wildcard_level := BitsPerTopicLevel, + topic_index_bytes := TopicIndexBytes, + epoch_bits := EpochBits, + ts_offset_bits := TSOffsetBits + } = Schema, + {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), + {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), + Trie = restore_trie(TopicIndexBytes, DBHandle, TrieCF), + %% If user's topics have more than learned 10 wildcard levels then + %% total carnage is going on; learned topic structure doesn't + %% really apply: + MaxWildcardLevels = 10, + Keymappers = array:from_list( + [ + make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N) + || N <- lists:seq(0, MaxWildcardLevels) + ] + ), + #s{db = DBHandle, data = DataCF, trie = Trie, keymappers = Keymappers}. + +store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> + lists:foreach( + fun(Msg) -> + {Key, _} = make_key(S, Msg), + Val = serialize(Msg), + rocksdb:put(DB, Data, Key, Val, []) + end, + Messages + ). + +get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> + Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter), + [ + #stream{ + storage_key = I + } + || I <- Indexes + ]. + +make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) -> + {TopicIndex, Varying} = StorageKey, + Filter = [ + {'=', TopicIndex}, + {'>=', StartTime} + | lists:map( + fun + ('+') -> + any; + (TopicLevel) when is_binary(TopicLevel) -> + {'=', hash_topic_level(TopicLevel)} + end, + Varying + ) + ], + {ok, #it{ + topic_filter = TopicFilter, + start_time = StartTime, + storage_key = StorageKey, + key_filter = Filter + }}. + +next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> + #it{ + key_filter = KeyFilter + } = It0, + % TODO: ugh, so ugly + NVarying = length(KeyFilter) - 2, + Keymapper = array:get(NVarying, Keymappers), + {ok, ITHandle} = rocksdb:iterator(DB, CF, []), + try + next_loop(ITHandle, Keymapper, It0, [], BatchSize) + after + rocksdb:iterator_close(ITHandle) + end. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +next_loop(_, _, It, Acc, 0) -> + {ok, It, lists:reverse(Acc)}; +next_loop(ITHandle, KeyMapper, It0, Acc0, N0) -> + {Key1, Bitmask, Bitfilter} = next_range(KeyMapper, It0), + case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of + {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) -> + Msg = deserialize(Val), + It1 = It0#it{last_seen_key = Key}, + case check_message(It1, Msg) of + true -> + N1 = N0 - 1, + Acc1 = [Msg | Acc0]; + false -> + N1 = N0, + Acc1 = Acc0 + end, + {N, It, Acc} = traverse_interval( + ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1 + ), + next_loop(ITHandle, KeyMapper, It, Acc, N); + {ok, Key, _Val} -> + It = It0#it{last_seen_key = Key}, + next_loop(ITHandle, KeyMapper, It, Acc0, N0); + {error, invalid_iterator} -> + {ok, It0, lists:reverse(Acc0)} + end. + +traverse_interval(_, _, _, _, It, Acc, 0) -> + {0, It, Acc}; +traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It0, Acc, N) -> + %% TODO: supply the upper limit to rocksdb to the last extra seek: + case iterator_move(KeyMapper, ITHandle, next) of + {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) -> + Msg = deserialize(Val), + It = It0#it{last_seen_key = Key}, + case check_message(It, Msg) of + true -> + traverse_interval( + ITHandle, KeyMapper, Bitmask, Bitfilter, It, [Msg | Acc], N - 1 + ); + false -> + traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It, Acc, N) + end; + {ok, Key, _Val} -> + It = It0#it{last_seen_key = Key}, + {N, It, Acc}; + {error, invalid_iterator} -> + {0, It0, Acc} + end. + +next_range(KeyMapper, #it{key_filter = KeyFilter, last_seen_key = PrevKey}) -> + emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, PrevKey). + +check_message(_Iterator, _Msg) -> + %% TODO. + true. + +iterator_move(KeyMapper, ITHandle, Action0) -> + Action = + case Action0 of + next -> + next; + {seek, Int} -> + {seek, emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Int)} + end, + case rocksdb:iterator_move(ITHandle, Action) of + {ok, KeyBin, Val} -> + {ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin), Val}; + {ok, KeyBin} -> + {ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin)}; + Other -> + Other + end. + +-spec make_key(#s{}, #message{}) -> {binary(), [binary()]}. +make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) -> + Tokens = emqx_topic:tokens(TopicBin), + {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), + VaryingHashes = [hash_topic_level(I) || I <- Varying], + KeyMapper = array:get(length(Varying), KeyMappers), + KeyBin = make_key(KeyMapper, TopicIndex, Timestamp, VaryingHashes), + {KeyBin, Varying}. + +-spec make_key(emqx_ds_bitmask_keymapper:keymapper(), emqx_ds_lts:static_key(), emqx_ds:time(), [ + non_neg_integer() +]) -> + binary(). +make_key(KeyMapper, TopicIndex, Timestamp, Varying) -> + emqx_ds_bitmask_keymapper:key_to_bitstring( + KeyMapper, + emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [TopicIndex, Timestamp | Varying]) + ). + +%% TODO: don't hardcode the thresholds +threshold_fun(0) -> + 100; +threshold_fun(_) -> + 20. + +hash_topic_level(TopicLevel) -> + <> = erlang:md5(TopicLevel), + Int. + +serialize(Msg) -> + term_to_binary(Msg). + +deserialize(Blob) -> + binary_to_term(Blob). + +-define(BYTE_SIZE, 8). + +%% erlfmt-ignore +make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N) -> + Bitsources = + %% Dimension Offset Bitsize + [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index + {2, TSOffsetBits, EpochBits }] ++ %% Timestamp epoch + [{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels + || I <- lists:seq(1, N)] ++ + [{2, 0, TSOffsetBits }], %% Timestamp offset + emqx_ds_bitmask_keymapper:make_keymapper(Bitsources). + +-spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). +restore_trie(TopicIndexBytes, DB, CF) -> + PersistCallback = fun(Key, Val) -> + rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), []) + end, + {ok, IT} = rocksdb:iterator(DB, CF, []), + try + Dump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)), + TrieOpts = #{persist_callback => PersistCallback, static_key_size => TopicIndexBytes}, + emqx_ds_lts:trie_restore(TrieOpts, Dump) + after + rocksdb:iterator_close(IT) + end. + +read_persisted_trie(IT, {ok, KeyB, ValB}) -> + [ + {binary_to_term(KeyB), binary_to_term(ValB)} + | read_persisted_trie(IT, rocksdb:iterator_move(IT, next)) + ]; +read_persisted_trie(IT, {error, invalid_iterator}) -> + []. + +%% @doc Generate a column family ID for the MQTT messages +-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. +data_cf(GenId) -> + "emqx_ds_storage_bitfield_lts_data" ++ integer_to_list(GenId). + +%% @doc Generate a column family ID for the trie +-spec trie_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. +trie_cf(GenId) -> + "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 744ac869f..bce976559 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -32,6 +32,10 @@ %% Type declarations %%================================================================================ +-type prototype() :: + {emqx_ds_storage_reference, emqx_ds_storage_reference:options()} + | {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}. + -type shard_id() :: emqx_ds_replication_layer:shard_id(). -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. @@ -107,7 +111,7 @@ _Data. -callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) -> - ok. + emqx_ds:store_batch_result(). -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. @@ -122,7 +126,7 @@ %% API for the replication layer %%================================================================================ --spec open_shard(shard_id(), emqx_ds:create_db_opts()) -> ok. +-spec open_shard(shard_id(), emqx_ds:builtin_db_opts()) -> ok. open_shard(Shard, Options) -> emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). @@ -195,7 +199,7 @@ next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) -> -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). --spec start_link(shard_id(), emqx_ds:create_db_opts()) -> +-spec start_link(shard_id(), emqx_ds:builtin_db_opts()) -> {ok, pid()}. start_link(Shard, Options) -> gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). @@ -224,7 +228,8 @@ init({ShardId, Options}) -> {Schema, CFRefs} = case get_schema_persistent(DB) of not_found -> - create_new_shard_schema(ShardId, DB, CFRefs0, Options); + Prototype = maps:get(storage, Options), + create_new_shard_schema(ShardId, DB, CFRefs0, Prototype); Scm -> {Scm, CFRefs0} end, @@ -300,14 +305,14 @@ open_generation(ShardId, DB, CFRefs, GenId, GenSchema) -> RuntimeData = Mod:open(ShardId, DB, GenId, CFRefs, Schema), GenSchema#{data => RuntimeData}. --spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), _Options) -> +-spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), prototype()) -> {shard_schema(), cf_refs()}. -create_new_shard_schema(ShardId, DB, CFRefs, Options) -> - ?tp(notice, ds_create_new_shard_schema, #{shard => ShardId, options => Options}), +create_new_shard_schema(ShardId, DB, CFRefs, Prototype) -> + ?tp(notice, ds_create_new_shard_schema, #{shard => ShardId, prototype => Prototype}), %% TODO: read prototype from options/config Schema0 = #{ current_generation => 0, - prototype => {emqx_ds_storage_reference, #{}} + prototype => Prototype }, {_NewGenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, _Since = 0), {Schema, NewCFRefs ++ CFRefs}. @@ -331,7 +336,7 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB ok = put_schema_persistent(DB, Schema), put_schema_runtime(ShardId, Runtime). --spec rocksdb_open(shard_id(), emqx_ds:create_db_opts()) -> +-spec rocksdb_open(shard_id(), emqx_ds:builtin_db_opts()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. rocksdb_open(Shard, Options) -> DBOptions = [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index bf73e3ac8..fac7204bf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -25,7 +25,7 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec start_shard(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) -> supervisor:startchild_ret(). start_shard(Shard, Options) -> supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). @@ -63,7 +63,7 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec shard_child_spec(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) -> supervisor:child_spec(). shard_child_spec(Shard, Options) -> #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 5a91f9ecd..9c7fc3158 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -32,7 +32,7 @@ %% internal exports: -export([]). --export_type([]). +-export_type([options/0]). -include_lib("emqx/include/emqx.hrl"). @@ -40,6 +40,8 @@ %% Type declarations %%================================================================================ +-type options() :: #{}. + %% Permanent state: -record(schema, {}). @@ -134,4 +136,4 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> - ?MODULE_STRING ++ integer_to_list(GenId). + "emqx_ds_storage_reference" ++ integer_to_list(GenId). diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 2dc77c563..9637431d3 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -23,19 +23,25 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +opts() -> + #{ + backend => builtin, + storage => {emqx_ds_storage_reference, #{}} + }. + %% A simple smoke test that verifies that opening/closing the DB %% doesn't crash, and not much else t_00_smoke_open_drop(_Config) -> DB = 'DB', - ?assertMatch(ok, emqx_ds:open_db(DB, #{})), - ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), ?assertMatch(ok, emqx_ds:drop_db(DB)). %% A simple smoke test that verifies that storing the messages doesn't %% crash t_01_smoke_store(_Config) -> DB = default, - ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), Msg = message(<<"foo/bar">>, <<"foo">>, 0), ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])). @@ -43,7 +49,7 @@ t_01_smoke_store(_Config) -> %% doesn't crash and that iterators can be opened. t_02_smoke_get_streams_start_iter(_Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), StartTime = 0, TopicFilter = ['#'], [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), @@ -54,7 +60,7 @@ t_02_smoke_get_streams_start_iter(_Config) -> %% over messages. t_03_smoke_iterate(_Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), StartTime = 0, TopicFilter = ['#'], Msgs = [ @@ -75,7 +81,7 @@ t_03_smoke_iterate(_Config) -> %% they are left off. t_04_restart(_Config) -> DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), TopicFilter = ['#'], StartTime = 0, Msgs = [ @@ -90,7 +96,7 @@ t_04_restart(_Config) -> ?tp(warning, emqx_ds_SUITE_restart_app, #{}), ok = application:stop(emqx_durable_storage), {ok, _} = application:ensure_all_started(emqx_durable_storage), - ok = emqx_ds:open_db(DB, #{}), + ok = emqx_ds:open_db(DB, opts()), %% The old iterator should be still operational: {ok, Iter, Batch} = iterate(Iter0, 1), ?assertEqual(Msgs, Batch, {Iter0, Iter}). diff --git a/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl_ b/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl_ deleted file mode 100644 index 599bd6c7b..000000000 --- a/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl_ +++ /dev/null @@ -1,188 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ds_message_storage_bitmask_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("stdlib/include/assert.hrl"). - --import(emqx_ds_message_storage_bitmask, [ - make_keymapper/1, - keymapper_info/1, - compute_topic_bitmask/2, - compute_time_bitmask/1, - compute_topic_seek/4 -]). - -all() -> emqx_common_test_helpers:all(?MODULE). - -t_make_keymapper(_) -> - ?assertMatch( - #{ - source := [ - {timestamp, 9, 23}, - {hash, level, 2}, - {hash, level, 4}, - {hash, levels, 8}, - {timestamp, 0, 9} - ], - bitsize := 46, - epoch := 512 - }, - keymapper_info( - make_keymapper(#{ - timestamp_bits => 32, - topic_bits_per_level => [2, 4, 8], - epoch => 1000 - }) - ) - ). - -t_make_keymapper_single_hash_level(_) -> - ?assertMatch( - #{ - source := [ - {timestamp, 0, 32}, - {hash, levels, 16} - ], - bitsize := 48, - epoch := 1 - }, - keymapper_info( - make_keymapper(#{ - timestamp_bits => 32, - topic_bits_per_level => [16], - epoch => 1 - }) - ) - ). - -t_make_keymapper_no_timestamp(_) -> - ?assertMatch( - #{ - source := [ - {hash, level, 4}, - {hash, level, 8}, - {hash, levels, 16} - ], - bitsize := 28, - epoch := 1 - }, - keymapper_info( - make_keymapper(#{ - timestamp_bits => 0, - topic_bits_per_level => [4, 8, 16], - epoch => 42 - }) - ) - ). - -t_compute_topic_bitmask(_) -> - KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), - ?assertEqual( - 2#111_1111_11111_11, - compute_topic_bitmask([<<"foo">>, <<"bar">>], KM) - ), - ?assertEqual( - 2#111_0000_11111_11, - compute_topic_bitmask([<<"foo">>, '+'], KM) - ), - ?assertEqual( - 2#111_0000_00000_11, - compute_topic_bitmask([<<"foo">>, '+', '+'], KM) - ), - ?assertEqual( - 2#111_0000_11111_00, - compute_topic_bitmask([<<"foo">>, '+', <<"bar">>, '+'], KM) - ). - -t_compute_topic_bitmask_wildcard(_) -> - KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), - ?assertEqual( - 2#000_0000_00000_00, - compute_topic_bitmask(['#'], KM) - ), - ?assertEqual( - 2#111_0000_00000_00, - compute_topic_bitmask([<<"foo">>, '#'], KM) - ), - ?assertEqual( - 2#111_1111_11111_00, - compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'], KM) - ). - -t_compute_topic_bitmask_wildcard_long_tail(_) -> - KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), - ?assertEqual( - 2#111_1111_11111_11, - compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, <<"xyzzy">>], KM) - ), - ?assertEqual( - 2#111_1111_11111_00, - compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'], KM) - ). - -t_compute_time_bitmask(_) -> - KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 200}), - ?assertEqual(2#111_000000_1111111, compute_time_bitmask(KM)). - -t_compute_time_bitmask_epoch_only(_) -> - KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 1}), - ?assertEqual(2#1111111111_000000, compute_time_bitmask(KM)). - -%% Filter = |123|***|678|***| -%% Mask = |123|***|678|***| -%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000| -%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000| -%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos -%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos - -t_compute_next_topic_seek(_) -> - KM = make_keymapper(#{topic_bits_per_level => [8, 8, 16, 12], timestamp_bits => 0, epoch => 1}), - ?assertMatch( - none, - compute_topic_seek( - 16#FD_42_4242_043, - 16#FD_42_4242_042, - 16#FF_FF_FFFF_FFF, - KM - ) - ), - ?assertMatch( - 16#FD_11_0678_000, - compute_topic_seek( - 16#FD_11_0108_121, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000, - KM - ) - ), - ?assertMatch( - 16#FD_12_0678_000, - compute_topic_seek( - 16#FD_11_0679_919, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000, - KM - ) - ), - ?assertMatch( - none, - compute_topic_seek( - 16#FD_FF_0679_001, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000, - KM - ) - ), - ?assertMatch( - none, - compute_topic_seek( - 16#FE_11_0179_017, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000, - KM - ) - ). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl new file mode 100644 index 000000000..f9a7b02c4 --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -0,0 +1,343 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_bitfield_lts_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(SHARD, shard(?FUNCTION_NAME)). + +-define(DEFAULT_CONFIG, #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}} +}). + +-define(COMPACT_CONFIG, #{ + backend => builtin, + storage => + {emqx_ds_storage_bitfield_lts, #{ + bits_per_wildcard_level => 8 + }} +}). + +%% Smoke test for opening and reopening the database +t_open(_Config) -> + ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). + +%% Smoke test of store function +t_store(_Config) -> + MessageID = emqx_guid:gen(), + PublishedAt = 1000, + Topic = <<"foo/bar">>, + Payload = <<"message">>, + Msg = #message{ + id = MessageID, + topic = Topic, + payload = Payload, + timestamp = PublishedAt + }, + ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [Msg], #{})). + +%% Smoke test for iteration through a concrete topic +t_iterate(_Config) -> + %% Prepare data: + Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], + Timestamps = lists:seq(1, 10), + Batch = [ + make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + %% Iterate through individual topics: + [ + begin + [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), + {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), + {ok, NextIt, Messages} = emqx_ds_storage_layer:next(?SHARD, It, 100), + ?assertEqual( + lists:map(fun integer_to_binary/1, Timestamps), + payloads(Messages) + ), + {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100) + end + || Topic <- Topics + ], + ok. + +-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). + +%% Smoke test that verifies that concrete topics become individual +%% streams, unless there's too many of them +t_get_streams(_Config) -> + %% Prepare data: + Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], + Timestamps = lists:seq(1, 10), + Batch = [ + make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) + || Topic <- Topics, PublishedAt <- Timestamps + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + GetStream = fun(Topic) -> + StartTime = 0, + emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime) + end, + %% Get streams for individual topics to use as a reference for later: + [FooBar = {_, _}] = GetStream(<<"foo/bar">>), + [FooBarBaz] = GetStream(<<"foo/bar/baz">>), + [A] = GetStream(<<"a">>), + %% Restart shard to make sure trie is persisted: + ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}), + %% Test various wildcards: + [] = GetStream(<<"bar/foo">>), + ?assertEqual([FooBar], GetStream("+/+")), + ?assertSameSet([FooBar, FooBarBaz], GetStream(<<"foo/#">>)), + ?assertSameSet([FooBar, FooBarBaz, A], GetStream(<<"#">>)), + %% Now insert a bunch of messages with different topics to create wildcards: + NewBatch = [ + begin + B = integer_to_binary(I), + make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>) + end + || I <- lists:seq(1, 200) + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []), + %% Check that "foo/bar/baz" topic now appears in two streams: + %% "foo/bar/baz" and "foo/bar/+": + NewStreams = lists:sort(GetStream(<<"foo/bar/baz">>)), + ?assertMatch([_, _], NewStreams), + ?assertMatch([_], NewStreams -- [FooBarBaz]), + ok. + +%% Smoke test for iteration with wildcard topic filter +%% t_iterate_wildcard(_Config) -> +%% %% Prepare data: +%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], +%% Timestamps = lists:seq(1, 10), +%% _ = [ +%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) +%% || Topic <- Topics, PublishedAt <- Timestamps +%% ], +%% ?assertEqual( +%% lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)]) +%% ), +%% ?assertEqual( +%% [], +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)]) +%% ), +%% ?assertEqual( +%% lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)]) +%% ), +%% ?assertEqual( +%% lists:sort([ +%% {Topic, PublishedAt} +%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps +%% ]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) +%% ), +%% ?assertEqual( +%% lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)]) +%% ), +%% ?assertEqual( +%% [], +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)]) +%% ), +%% ?assertEqual( +%% lists:sort([ +%% {Topic, PublishedAt} +%% || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps +%% ]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)]) +%% ), +%% ?assertEqual( +%% lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)]) +%% ), +%% ?assertEqual( +%% [], +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)]) +%% ), +%% ok. + + +%% t_create_gen(_Config) -> +%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), +%% ?assertEqual( +%% {error, nonmonotonic}, +%% emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) +%% ), +%% ?assertEqual( +%% {error, nonmonotonic}, +%% emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) +%% ), +%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), +%% Topics = ["foo/bar", "foo/bar/baz"], +%% Timestamps = lists:seq(1, 100), +%% [ +%% ?assertMatch({ok, [_]}, store(?SHARD, PublishedAt, Topic, <<>>)) +%% || Topic <- Topics, PublishedAt <- Timestamps +%% ]. + +%% t_iterate_multigen(_Config) -> +%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), +%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), +%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), +%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], +%% Timestamps = lists:seq(1, 100), +%% _ = [ +%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) +%% || Topic <- Topics, PublishedAt <- Timestamps +%% ], +%% ?assertEqual( +%% lists:sort([ +%% {Topic, PublishedAt} +%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps +%% ]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) +%% ), +%% ?assertEqual( +%% lists:sort([ +%% {Topic, PublishedAt} +%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) +%% ]), +%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)]) +%% ). + +%% t_iterate_multigen_preserve_restore(_Config) -> +%% ReplayID = atom_to_binary(?FUNCTION_NAME), +%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), +%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), +%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), +%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"], +%% Timestamps = lists:seq(1, 100), +%% TopicFilter = "foo/#", +%% TopicsMatching = ["foo/bar", "foo/bar/baz"], +%% _ = [ +%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS})) +%% || Topic <- Topics, TS <- Timestamps +%% ], +%% It0 = iterator(?SHARD, TopicFilter, 0), +%% {It1, Res10} = iterate(It0, 10), +%% % preserve mid-generation +%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID), +%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), +%% {It3, Res100} = iterate(It2, 88), +%% % preserve on the generation boundary +%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID), +%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), +%% {It5, Res200} = iterate(It4, 1000), +%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)), +%% ?assertEqual( +%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]), +%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200]) +%% ), +%% ?assertEqual( +%% ok, +%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID) +%% ), +%% ?assertEqual( +%% {error, not_found}, +%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID) +%% ). + +make_message(PublishedAt, Topic, Payload) when is_list(Topic) -> + make_message(PublishedAt, list_to_binary(Topic), Payload); +make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> + ID = emqx_guid:gen(), + #message{ + id = ID, + topic = Topic, + timestamp = PublishedAt, + payload = Payload + }. + +store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) -> + store(Shard, PublishedAt, list_to_binary(TopicL), Payload); +store(Shard, PublishedAt, Topic, Payload) -> + ID = emqx_guid:gen(), + Msg = #message{ + id = ID, + topic = Topic, + timestamp = PublishedAt, + payload = Payload + }, + emqx_ds_storage_layer:message_store(Shard, [Msg], #{}). + +%% iterate(Shard, TopicFilter, StartTime) -> +%% Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime), +%% lists:flatmap( +%% fun(Stream) -> +%% iterate(Shard, iterator(Shard, Stream, TopicFilter, StartTime)) +%% end, +%% Streams). + +%% iterate(Shard, It) -> +%% case emqx_ds_storage_layer:next(Shard, It) of +%% {ok, ItNext, [#message{payload = Payload}]} -> +%% [Payload | iterate(Shard, ItNext)]; +%% end_of_stream -> +%% [] +%% end. + +%% iterate(_Shard, end_of_stream, _N) -> +%% {end_of_stream, []}; +%% iterate(Shard, It, N) -> +%% case emqx_ds_storage_layer:next(Shard, It, N) of +%% {ok, ItFinal, Messages} -> +%% {ItFinal, [Payload || #message{payload = Payload} <- Messages]}; +%% end_of_stream -> +%% {end_of_stream, []} +%% end. + +%% iterator(Shard, Stream, TopicFilter, StartTime) -> +%% {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, parse_topic(TopicFilter), StartTime), +%% It. + +payloads(Messages) -> + lists:map( + fun(#message{payload = P}) -> + P + end, + Messages + ). + +parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> + Topic; +parse_topic(Topic) -> + emqx_topic:words(iolist_to_binary(Topic)). + +%% CT callbacks + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(emqx_durable_storage), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_durable_storage). + +init_per_testcase(TC, Config) -> + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), ?DEFAULT_CONFIG), + Config. + +end_per_testcase(TC, _Config) -> + ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). + +shard(TC) -> + {?MODULE, TC}. + +keyspace(TC) -> + TC. + +set_keyspace_config(Keyspace, Config) -> + ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl_ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl_ deleted file mode 100644 index 25198cfd7..000000000 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl_ +++ /dev/null @@ -1,292 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ds_storage_layer_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("emqx/include/emqx.hrl"). --include_lib("common_test/include/ct.hrl"). --include_lib("stdlib/include/assert.hrl"). - --define(SHARD, shard(?FUNCTION_NAME)). - --define(DEFAULT_CONFIG, - {emqx_ds_message_storage_bitmask, #{ - timestamp_bits => 64, - topic_bits_per_level => [8, 8, 32, 16], - epoch => 5, - iteration => #{ - iterator_refresh => {every, 5} - } - }} -). - --define(COMPACT_CONFIG, - {emqx_ds_message_storage_bitmask, #{ - timestamp_bits => 16, - topic_bits_per_level => [16, 16], - epoch => 10 - }} -). - -%% Smoke test for opening and reopening the database -t_open(_Config) -> - ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). - -%% Smoke test of store function -t_store(_Config) -> - MessageID = emqx_guid:gen(), - PublishedAt = 1000, - Topic = <<"foo/bar">>, - Payload = <<"message">>, - Msg = #message{ - id = MessageID, - topic = Topic, - payload = Payload, - timestamp = PublishedAt - }, - ?assertMatch({ok, [_]}, emqx_ds_storage_layer:message_store(?SHARD, [Msg], #{})). - -%% Smoke test for iteration through a concrete topic -t_iterate(_Config) -> - %% Prepare data: - Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], - Timestamps = lists:seq(1, 10), - [ - store( - ?SHARD, - PublishedAt, - Topic, - integer_to_binary(PublishedAt) - ) - || Topic <- Topics, PublishedAt <- Timestamps - ], - %% Iterate through individual topics: - [ - begin - {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {parse_topic(Topic), 0}), - Values = iterate(It), - ?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values) - end - || Topic <- Topics - ], - ok. - -%% Smoke test for iteration with wildcard topic filter -t_iterate_wildcard(_Config) -> - %% Prepare data: - Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], - Timestamps = lists:seq(1, 10), - _ = [ - store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) - || Topic <- Topics, PublishedAt <- Timestamps - ], - ?assertEqual( - lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)]) - ), - ?assertEqual( - [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)]) - ), - ?assertEqual( - lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)]) - ), - ?assertEqual( - lists:sort([ - {Topic, PublishedAt} - || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps - ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) - ), - ?assertEqual( - lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)]) - ), - ?assertEqual( - [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)]) - ), - ?assertEqual( - lists:sort([ - {Topic, PublishedAt} - || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps - ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)]) - ), - ?assertEqual( - lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)]) - ), - ?assertEqual( - [], - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)]) - ), - ok. - -t_iterate_long_tail_wildcard(_Config) -> - Topic = "b/c/d/e/f/g", - TopicFilter = "b/c/d/e/+/+", - Timestamps = lists:seq(1, 100), - _ = [ - store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) - || PublishedAt <- Timestamps - ], - ?assertEqual( - lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)]) - ). - -t_create_gen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), - ?assertEqual( - {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG) - ), - ?assertEqual( - {error, nonmonotonic}, - emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG) - ), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - Topics = ["foo/bar", "foo/bar/baz"], - Timestamps = lists:seq(1, 100), - [ - ?assertMatch({ok, [_]}, store(?SHARD, PublishedAt, Topic, <<>>)) - || Topic <- Topics, PublishedAt <- Timestamps - ]. - -t_iterate_multigen(_Config) -> - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), - Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], - Timestamps = lists:seq(1, 100), - _ = [ - store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) - || Topic <- Topics, PublishedAt <- Timestamps - ], - ?assertEqual( - lists:sort([ - {Topic, PublishedAt} - || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps - ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) - ), - ?assertEqual( - lists:sort([ - {Topic, PublishedAt} - || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) - ]), - lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)]) - ). - -t_iterate_multigen_preserve_restore(_Config) -> - ReplayID = atom_to_binary(?FUNCTION_NAME), - {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), - {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), - {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), - Topics = ["foo/bar", "foo/bar/baz", "a/bar"], - Timestamps = lists:seq(1, 100), - TopicFilter = "foo/#", - TopicsMatching = ["foo/bar", "foo/bar/baz"], - _ = [ - store(?SHARD, TS, Topic, term_to_binary({Topic, TS})) - || Topic <- Topics, TS <- Timestamps - ], - It0 = iterator(?SHARD, TopicFilter, 0), - {It1, Res10} = iterate(It0, 10), - % preserve mid-generation - ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID), - {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), - {It3, Res100} = iterate(It2, 88), - % preserve on the generation boundary - ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID), - {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), - {It5, Res200} = iterate(It4, 1000), - ?assertEqual({end_of_stream, []}, iterate(It5, 1)), - ?assertEqual( - lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]), - lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200]) - ), - ?assertEqual( - ok, - emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID) - ), - ?assertEqual( - {error, not_found}, - emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID) - ). - -store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) -> - store(Shard, PublishedAt, list_to_binary(TopicL), Payload); -store(Shard, PublishedAt, Topic, Payload) -> - ID = emqx_guid:gen(), - Msg = #message{ - id = ID, - topic = Topic, - timestamp = PublishedAt, - payload = Payload - }, - emqx_ds_storage_layer:message_store(Shard, [Msg], #{}). - -iterate(DB, TopicFilter, StartTime) -> - iterate(iterator(DB, TopicFilter, StartTime)). - -iterate(It) -> - case emqx_ds_storage_layer:next(It) of - {ok, ItNext, [#message{payload = Payload}]} -> - [Payload | iterate(ItNext)]; - end_of_stream -> - [] - end. - -iterate(end_of_stream, _N) -> - {end_of_stream, []}; -iterate(It, N) -> - case emqx_ds_storage_layer:next(It, N) of - {ok, ItFinal, Messages} -> - {ItFinal, [Payload || #message{payload = Payload} <- Messages]}; - end_of_stream -> - {end_of_stream, []} - end. - -iterator(DB, TopicFilter, StartTime) -> - {ok, It} = emqx_ds_storage_layer:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), - It. - -parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> - Topic; -parse_topic(Topic) -> - emqx_topic:words(iolist_to_binary(Topic)). - -%% CT callbacks - -all() -> emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(emqx_durable_storage), - Config. - -end_per_suite(_Config) -> - ok = application:stop(emqx_durable_storage). - -init_per_testcase(TC, Config) -> - ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}), - Config. - -end_per_testcase(TC, _Config) -> - ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). - -shard(TC) -> - iolist_to_binary([?MODULE_STRING, "_", atom_to_list(TC)]). - -keyspace(TC) -> - TC. - -set_keyspace_config(Keyspace, Config) -> - ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).