feat(ds): Bitfield + Learned Topic Structure

This commit is contained in:
ieQu1 2023-10-11 20:53:34 +02:00
parent ac91dbc58f
commit 7428e7037b
12 changed files with 821 additions and 539 deletions

View File

@ -34,6 +34,8 @@
-export([]).
-export_type([
create_db_opts/0,
builtin_db_opts/0,
db/0,
time/0,
topic_filter/0,
@ -58,7 +60,7 @@
%% Parsed topic filter.
-type topic_filter() :: list(binary() | '+' | '#' | '').
-type stream_rank() :: {integer(), integer()}.
-type stream_rank() :: {term(), integer()}.
-opaque stream() :: emqx_ds_replication_layer:stream().
@ -83,9 +85,14 @@
-type message_store_opts() :: #{}.
-type builtin_db_opts() ::
#{
backend := builtin,
storage := emqx_ds_storage_layer:prototype()
}.
-type create_db_opts() ::
%% TODO: keyspace
#{}.
builtin_db_opts().
-type message_id() :: emqx_ds_replication_layer:message_id().
@ -96,7 +103,7 @@
%% @doc Different DBs are completely independent from each other. They
%% could represent something like different tenants.
-spec open_db(db(), create_db_opts()) -> ok.
open_db(DB, Opts) ->
open_db(DB, Opts = #{backend := builtin}) ->
emqx_ds_replication_layer:open_db(DB, Opts).
%% @doc TODO: currently if one or a few shards are down, they won't be
@ -109,8 +116,7 @@ drop_db(DB) ->
store_batch(DB, Msgs, Opts) ->
emqx_ds_replication_layer:store_batch(DB, Msgs, Opts).
%% TODO: Do we really need to return message IDs? It's extra work...
-spec store_batch(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
-spec store_batch(db(), [emqx_types:message()]) -> store_batch_result().
store_batch(DB, Msgs) ->
store_batch(DB, Msgs, #{}).

View File

@ -80,20 +80,24 @@
%%================================================================================
%% API:
-export([make_keymapper/1, vector_to_key/2, key_to_vector/2, next_range/3]).
%% behavior callbacks:
-export([]).
%% internal exports:
-export([]).
-export([
make_keymapper/1,
vector_to_key/2,
bin_vector_to_key/2,
key_to_vector/2,
bin_key_to_vector/2,
next_range/3,
key_to_bitstring/2,
bitstring_to_key/2
]).
-export_type([vector/0, key/0, dimension/0, offset/0, bitsize/0, bitsource/0, keymapper/0]).
-compile(
{inline, [
ones/1,
extract/2
extract/2,
extract_inv/2
]}
).
@ -118,7 +122,7 @@
-type bitsize() :: pos_integer().
%% The resulting 1D key:
-type key() :: binary().
-type key() :: non_neg_integer().
-type bitsource() ::
%% Consume `_Size` bits from timestamp starting at `_Offset`th
@ -148,7 +152,8 @@
%% API functions
%%================================================================================
%% @doc
%% @doc Create a keymapper object that stores the "schema" of the
%% transformation from a list of bitsources.
%%
%% Note: Dimension is 1-based.
-spec make_keymapper([bitsource()]) -> keymapper().
@ -183,6 +188,19 @@ vector_to_key(#keymapper{scanner = []}, []) ->
vector_to_key(#keymapper{scanner = [Actions | Scanner]}, [Coord | Vector]) ->
do_vector_to_key(Actions, Scanner, Coord, Vector, 0).
%% @doc Same as `vector_to_key', but it works with binaries, and outputs a binary.
-spec bin_vector_to_key(keymapper(), [binary()]) -> binary().
bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, Binaries) ->
Vec = lists:map(
fun({Bin, SizeOf}) ->
<<Int:SizeOf, _/binary>> = Bin,
Int
end,
lists:zip(Binaries, DimSizeof)
),
Key = vector_to_key(Keymapper, Vec),
<<Key:Size>>.
%% @doc Map key to a vector.
%%
%% Note: `vector_to_key(key_to_vector(K)) = K' but
@ -202,6 +220,18 @@ key_to_vector(#keymapper{scanner = Scanner}, Key) ->
Scanner
).
%% @doc Same as `key_to_vector', but it works with binaries.
-spec bin_key_to_vector(keymapper(), binary()) -> [binary()].
bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, BinKey) ->
<<Key:Size>> = BinKey,
Vector = key_to_vector(Keymapper, Key),
lists:map(
fun({Elem, SizeOf}) ->
<<Elem:SizeOf>>
end,
lists:zip(Vector, DimSizeof)
).
%% @doc Given a keymapper, a filter, and a key, return a triple containing:
%%
%% 1. `NextKey', a key that is greater than the given one, and is
@ -232,6 +262,15 @@ next_range(Keymapper, Filter0, PrevKey) ->
{NewKey, Bitmask, Bitfilter}
end.
-spec bitstring_to_key(keymapper(), bitstring()) -> key().
bitstring_to_key(#keymapper{size = Size}, Bin) ->
<<Key:Size>> = Bin,
Key.
-spec key_to_bitstring(keymapper(), key()) -> bitstring().
key_to_bitstring(#keymapper{size = Size}, Key) ->
<<Key:Size>>.
%%================================================================================
%% Internal functions
%%================================================================================
@ -311,7 +350,6 @@ fold_bitsources(Fun, InitAcc, Bitsources) ->
Bitsources
).
%% Specialized version of fold:
do_vector_to_key([], [], _Coord, [], Acc) ->
Acc;
do_vector_to_key([], [NewActions | Scanner], _Coord, [NewCoord | Vector], Acc) ->

View File

@ -24,7 +24,7 @@
%% Debug:
-export([trie_next/3, trie_insert/3, dump_to_dot/2]).
-export_type([static_key/0, trie/0]).
-export_type([options/0, static_key/0, trie/0]).
-include_lib("stdlib/include/ms_transform.hrl").
@ -43,12 +43,12 @@
-type edge() :: binary() | ?EOT | ?PLUS.
%% Fixed size binary
-type static_key() :: binary().
-type static_key() :: non_neg_integer().
-define(PREFIX, prefix).
-type state() :: static_key() | ?PREFIX.
-type varying() :: [binary()].
-type varying() :: [binary() | ?PLUS].
-type msg_storage_key() :: {static_key(), varying()}.
@ -56,8 +56,15 @@
-type persist_callback() :: fun((_Key, _Val) -> ok).
-type options() ::
#{
persist_callback => persist_callback(),
static_key_size => pos_integer()
}.
-record(trie, {
persist :: persist_callback(),
static_key_size :: pos_integer(),
trie :: ets:tid(),
stats :: ets:tid()
}).
@ -74,32 +81,40 @@
%%================================================================================
%% @doc Create an empty trie
-spec trie_create(persist_callback()) -> trie().
trie_create(Persist) ->
Trie = ets:new(trie, [{keypos, #trans.key}, set]),
Stats = ets:new(stats, [{keypos, 1}, set]),
-spec trie_create(options()) -> trie().
trie_create(UserOpts) ->
Defaults = #{
persist_callback => fun(_, _) -> ok end,
static_key_size => 8
},
#{
persist_callback := Persist,
static_key_size := StaticKeySize
} = maps:merge(Defaults, UserOpts),
Trie = ets:new(trie, [{keypos, #trans.key}, set, public]),
Stats = ets:new(stats, [{keypos, 1}, set, public]),
#trie{
persist = Persist,
static_key_size = StaticKeySize,
trie = Trie,
stats = Stats
}.
-spec trie_create() -> trie().
trie_create() ->
trie_create(fun(_, _) ->
ok
end).
trie_create(#{}).
%% @doc Restore trie from a dump
-spec trie_restore(persist_callback(), [{_Key, _Val}]) -> trie().
trie_restore(Persist, Dump) ->
Trie = trie_create(Persist),
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
trie_restore(Options, Dump) ->
Trie = trie_create(Options),
lists:foreach(
fun({{StateFrom, Token}, StateTo}) ->
trie_insert(Trie, StateFrom, Token, StateTo)
end,
Dump
).
),
Trie.
%% @doc Lookup the topic key. Create a new one, if not found.
-spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key().
@ -113,7 +128,7 @@ lookup_topic_key(Trie, Tokens) ->
%% @doc Return list of keys of topics that match a given topic filter
-spec match_topics(trie(), [binary() | '+' | '#']) ->
[{static_key(), _Varying :: binary() | ?PLUS}].
[msg_storage_key()].
match_topics(Trie, TopicFilter) ->
do_match_topics(Trie, ?PREFIX, [], TopicFilter).
@ -189,7 +204,7 @@ trie_next(#trie{trie = Trie}, State, Token) ->
NChildren :: non_neg_integer(),
Updated :: false | NChildren.
trie_insert(Trie, State, Token) ->
trie_insert(Trie, State, Token, get_id_for_key(State, Token)).
trie_insert(Trie, State, Token, get_id_for_key(Trie, State, Token)).
%%================================================================================
%% Internal functions
@ -220,8 +235,8 @@ trie_insert(#trie{trie = Trie, stats = Stats, persist = Persist}, State, Token,
{false, NextState}
end.
-spec get_id_for_key(state(), edge()) -> static_key().
get_id_for_key(_State, _Token) ->
-spec get_id_for_key(trie(), state(), edge()) -> static_key().
get_id_for_key(#trie{static_key_size = Size}, _State, _Token) ->
%% Requirements for the return value:
%%
%% It should be globally unique for the `{State, Token}` pair. Other
@ -235,7 +250,8 @@ get_id_for_key(_State, _Token) ->
%% If we want to impress computer science crowd, sorry, I mean to
%% minimize storage requirements, we can even employ Huffman coding
%% based on the frequency of messages.
crypto:strong_rand_bytes(8).
<<Int:(Size * 8)>> = crypto:strong_rand_bytes(Size),
Int.
%% erlfmt-ignore
-spec do_match_topics(trie(), state(), [binary() | '+'], [binary() | '+' | '#']) ->
@ -492,7 +508,7 @@ topic_key_test() ->
end,
lists:seq(1, 100))
after
dump_to_dot(T, atom_to_list(?FUNCTION_NAME) ++ ".dot")
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
end.
%% erlfmt-ignore
@ -539,7 +555,7 @@ topic_match_test() ->
{S2_1_, ['+', '+']}]),
ok
after
dump_to_dot(T, atom_to_list(?FUNCTION_NAME) ++ ".dot")
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
end.
-define(keys_history, topic_key_history).

View File

@ -119,7 +119,7 @@ get_streams(DB, TopicFilter, StartTime) ->
Streams = emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime),
lists:map(
fun({RankY, Stream}) ->
RankX = erlang:phash2(Shard, 255),
RankX = Shard,
Rank = {RankX, RankY},
{Rank, #stream{
shard = Shard,

View File

@ -0,0 +1,346 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Reference implementation of the storage.
%%
%% Trivial, extremely slow and inefficient. It also doesn't handle
%% restart of the Erlang node properly, so obviously it's only to be
%% used for testing.
-module(emqx_ds_storage_bitfield_lts).
-behavior(emqx_ds_storage_layer).
%% API:
-export([]).
%% behavior callbacks:
-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]).
%% internal exports:
-export([]).
-export_type([options/0]).
-include_lib("emqx/include/emqx.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
-type options() ::
#{
bits_per_wildcard_level => pos_integer(),
topic_index_bytes => pos_integer(),
epoch_bits => non_neg_integer()
}.
%% Permanent state:
-type schema() ::
#{
bits_per_wildcard_level := pos_integer(),
topic_index_bytes := pos_integer(),
epoch_bits := non_neg_integer(),
ts_offset_bits := non_neg_integer()
}.
%% Runtime state:
-record(s, {
db :: rocksdb:db_handle(),
data :: rocksdb:cf_handle(),
trie :: emqx_ds_lts:trie(),
keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper())
}).
-record(stream, {
storage_key :: emqx_ds_lts:msg_storage_key()
}).
-record(it, {
topic_filter :: emqx_ds:topic_filter(),
start_time :: emqx_ds:time(),
storage_key :: emqx_ds_lts:msg_storage_key(),
last_seen_key = 0 :: emqx_ds_bitmask_keymapper:key(),
key_filter :: [emqx_ds_bitmask_keymapper:scalar_range()]
}).
-define(QUICKCHECK_KEY(KEY, BITMASK, BITFILTER),
((KEY band BITMASK) =:= BITFILTER)
).
%%================================================================================
%% API funcions
%%================================================================================
%%================================================================================
%% behavior callbacks
%%================================================================================
create(_ShardId, DBHandle, GenId, Options) ->
%% Get options:
BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
TSOffsetBits = maps:get(epoch_bits, Options, 5),
%% Create column families:
DataCFName = data_cf(GenId),
TrieCFName = trie_cf(GenId),
{ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []),
{ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []),
%% Create schema:
% Fixed size_of MQTT message timestamp
SizeOfTS = 64,
Schema = #{
bits_per_wildcard_level => BitsPerTopicLevel,
topic_index_bytes => TopicIndexBytes,
epoch_bits => SizeOfTS - TSOffsetBits,
ts_offset_bits => TSOffsetBits
},
{Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
#{
bits_per_wildcard_level := BitsPerTopicLevel,
topic_index_bytes := TopicIndexBytes,
epoch_bits := EpochBits,
ts_offset_bits := TSOffsetBits
} = Schema,
{_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
{_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
Trie = restore_trie(TopicIndexBytes, DBHandle, TrieCF),
%% If user's topics have more than learned 10 wildcard levels then
%% total carnage is going on; learned topic structure doesn't
%% really apply:
MaxWildcardLevels = 10,
Keymappers = array:from_list(
[
make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N)
|| N <- lists:seq(0, MaxWildcardLevels)
]
),
#s{db = DBHandle, data = DataCF, trie = Trie, keymappers = Keymappers}.
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
lists:foreach(
fun(Msg) ->
{Key, _} = make_key(S, Msg),
Val = serialize(Msg),
rocksdb:put(DB, Data, Key, Val, [])
end,
Messages
).
get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
[
#stream{
storage_key = I
}
|| I <- Indexes
].
make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) ->
{TopicIndex, Varying} = StorageKey,
Filter = [
{'=', TopicIndex},
{'>=', StartTime}
| lists:map(
fun
('+') ->
any;
(TopicLevel) when is_binary(TopicLevel) ->
{'=', hash_topic_level(TopicLevel)}
end,
Varying
)
],
{ok, #it{
topic_filter = TopicFilter,
start_time = StartTime,
storage_key = StorageKey,
key_filter = Filter
}}.
next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
#it{
key_filter = KeyFilter
} = It0,
% TODO: ugh, so ugly
NVarying = length(KeyFilter) - 2,
Keymapper = array:get(NVarying, Keymappers),
{ok, ITHandle} = rocksdb:iterator(DB, CF, []),
try
next_loop(ITHandle, Keymapper, It0, [], BatchSize)
after
rocksdb:iterator_close(ITHandle)
end.
%%================================================================================
%% Internal functions
%%================================================================================
next_loop(_, _, It, Acc, 0) ->
{ok, It, lists:reverse(Acc)};
next_loop(ITHandle, KeyMapper, It0, Acc0, N0) ->
{Key1, Bitmask, Bitfilter} = next_range(KeyMapper, It0),
case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of
{ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) ->
Msg = deserialize(Val),
It1 = It0#it{last_seen_key = Key},
case check_message(It1, Msg) of
true ->
N1 = N0 - 1,
Acc1 = [Msg | Acc0];
false ->
N1 = N0,
Acc1 = Acc0
end,
{N, It, Acc} = traverse_interval(
ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1
),
next_loop(ITHandle, KeyMapper, It, Acc, N);
{ok, Key, _Val} ->
It = It0#it{last_seen_key = Key},
next_loop(ITHandle, KeyMapper, It, Acc0, N0);
{error, invalid_iterator} ->
{ok, It0, lists:reverse(Acc0)}
end.
traverse_interval(_, _, _, _, It, Acc, 0) ->
{0, It, Acc};
traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It0, Acc, N) ->
%% TODO: supply the upper limit to rocksdb to the last extra seek:
case iterator_move(KeyMapper, ITHandle, next) of
{ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) ->
Msg = deserialize(Val),
It = It0#it{last_seen_key = Key},
case check_message(It, Msg) of
true ->
traverse_interval(
ITHandle, KeyMapper, Bitmask, Bitfilter, It, [Msg | Acc], N - 1
);
false ->
traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It, Acc, N)
end;
{ok, Key, _Val} ->
It = It0#it{last_seen_key = Key},
{N, It, Acc};
{error, invalid_iterator} ->
{0, It0, Acc}
end.
next_range(KeyMapper, #it{key_filter = KeyFilter, last_seen_key = PrevKey}) ->
emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, PrevKey).
check_message(_Iterator, _Msg) ->
%% TODO.
true.
iterator_move(KeyMapper, ITHandle, Action0) ->
Action =
case Action0 of
next ->
next;
{seek, Int} ->
{seek, emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Int)}
end,
case rocksdb:iterator_move(ITHandle, Action) of
{ok, KeyBin, Val} ->
{ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin), Val};
{ok, KeyBin} ->
{ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin)};
Other ->
Other
end.
-spec make_key(#s{}, #message{}) -> {binary(), [binary()]}.
make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
Tokens = emqx_topic:tokens(TopicBin),
{TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
VaryingHashes = [hash_topic_level(I) || I <- Varying],
KeyMapper = array:get(length(Varying), KeyMappers),
KeyBin = make_key(KeyMapper, TopicIndex, Timestamp, VaryingHashes),
{KeyBin, Varying}.
-spec make_key(emqx_ds_bitmask_keymapper:keymapper(), emqx_ds_lts:static_key(), emqx_ds:time(), [
non_neg_integer()
]) ->
binary().
make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
emqx_ds_bitmask_keymapper:key_to_bitstring(
KeyMapper,
emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [TopicIndex, Timestamp | Varying])
).
%% TODO: don't hardcode the thresholds
threshold_fun(0) ->
100;
threshold_fun(_) ->
20.
hash_topic_level(TopicLevel) ->
<<Int:64, _/binary>> = erlang:md5(TopicLevel),
Int.
serialize(Msg) ->
term_to_binary(Msg).
deserialize(Blob) ->
binary_to_term(Blob).
-define(BYTE_SIZE, 8).
%% erlfmt-ignore
make_keymapper(TopicIndexBytes, EpochBits, BitsPerTopicLevel, TSOffsetBits, N) ->
Bitsources =
%% Dimension Offset Bitsize
[{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index
{2, TSOffsetBits, EpochBits }] ++ %% Timestamp epoch
[{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels
|| I <- lists:seq(1, N)] ++
[{2, 0, TSOffsetBits }], %% Timestamp offset
emqx_ds_bitmask_keymapper:make_keymapper(Bitsources).
-spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
restore_trie(TopicIndexBytes, DB, CF) ->
PersistCallback = fun(Key, Val) ->
rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), [])
end,
{ok, IT} = rocksdb:iterator(DB, CF, []),
try
Dump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
TrieOpts = #{persist_callback => PersistCallback, static_key_size => TopicIndexBytes},
emqx_ds_lts:trie_restore(TrieOpts, Dump)
after
rocksdb:iterator_close(IT)
end.
read_persisted_trie(IT, {ok, KeyB, ValB}) ->
[
{binary_to_term(KeyB), binary_to_term(ValB)}
| read_persisted_trie(IT, rocksdb:iterator_move(IT, next))
];
read_persisted_trie(IT, {error, invalid_iterator}) ->
[].
%% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
data_cf(GenId) ->
"emqx_ds_storage_bitfield_lts_data" ++ integer_to_list(GenId).
%% @doc Generate a column family ID for the trie
-spec trie_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
trie_cf(GenId) ->
"emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId).

View File

@ -32,6 +32,10 @@
%% Type declarations
%%================================================================================
-type prototype() ::
{emqx_ds_storage_reference, emqx_ds_storage_reference:options()}
| {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}.
-type shard_id() :: emqx_ds_replication_layer:shard_id().
-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
@ -107,7 +111,7 @@
_Data.
-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
ok.
emqx_ds:store_batch_result().
-callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
[_Stream].
@ -122,7 +126,7 @@
%% API for the replication layer
%%================================================================================
-spec open_shard(shard_id(), emqx_ds:create_db_opts()) -> ok.
-spec open_shard(shard_id(), emqx_ds:builtin_db_opts()) -> ok.
open_shard(Shard, Options) ->
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
@ -195,7 +199,7 @@ next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) ->
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
-spec start_link(shard_id(), emqx_ds:create_db_opts()) ->
-spec start_link(shard_id(), emqx_ds:builtin_db_opts()) ->
{ok, pid()}.
start_link(Shard, Options) ->
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
@ -224,7 +228,8 @@ init({ShardId, Options}) ->
{Schema, CFRefs} =
case get_schema_persistent(DB) of
not_found ->
create_new_shard_schema(ShardId, DB, CFRefs0, Options);
Prototype = maps:get(storage, Options),
create_new_shard_schema(ShardId, DB, CFRefs0, Prototype);
Scm ->
{Scm, CFRefs0}
end,
@ -300,14 +305,14 @@ open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
RuntimeData = Mod:open(ShardId, DB, GenId, CFRefs, Schema),
GenSchema#{data => RuntimeData}.
-spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), _Options) ->
-spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), prototype()) ->
{shard_schema(), cf_refs()}.
create_new_shard_schema(ShardId, DB, CFRefs, Options) ->
?tp(notice, ds_create_new_shard_schema, #{shard => ShardId, options => Options}),
create_new_shard_schema(ShardId, DB, CFRefs, Prototype) ->
?tp(notice, ds_create_new_shard_schema, #{shard => ShardId, prototype => Prototype}),
%% TODO: read prototype from options/config
Schema0 = #{
current_generation => 0,
prototype => {emqx_ds_storage_reference, #{}}
prototype => Prototype
},
{_NewGenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, _Since = 0),
{Schema, NewCFRefs ++ CFRefs}.
@ -331,7 +336,7 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB
ok = put_schema_persistent(DB, Schema),
put_schema_runtime(ShardId, Runtime).
-spec rocksdb_open(shard_id(), emqx_ds:create_db_opts()) ->
-spec rocksdb_open(shard_id(), emqx_ds:builtin_db_opts()) ->
{ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
rocksdb_open(Shard, Options) ->
DBOptions = [

View File

@ -25,7 +25,7 @@
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
-spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
-spec start_shard(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
supervisor:startchild_ret().
start_shard(Shard, Options) ->
supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
@ -63,7 +63,7 @@ init([]) ->
%% Internal functions
%%================================================================================
-spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
-spec shard_child_spec(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
supervisor:child_spec().
shard_child_spec(Shard, Options) ->
#{

View File

@ -32,7 +32,7 @@
%% internal exports:
-export([]).
-export_type([]).
-export_type([options/0]).
-include_lib("emqx/include/emqx.hrl").
@ -40,6 +40,8 @@
%% Type declarations
%%================================================================================
-type options() :: #{}.
%% Permanent state:
-record(schema, {}).
@ -134,4 +136,4 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
%% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
data_cf(GenId) ->
?MODULE_STRING ++ integer_to_list(GenId).
"emqx_ds_storage_reference" ++ integer_to_list(GenId).

View File

@ -23,19 +23,25 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
opts() ->
#{
backend => builtin,
storage => {emqx_ds_storage_reference, #{}}
}.
%% A simple smoke test that verifies that opening/closing the DB
%% doesn't crash, and not much else
t_00_smoke_open_drop(_Config) ->
DB = 'DB',
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
?assertMatch(ok, emqx_ds:drop_db(DB)).
%% A simple smoke test that verifies that storing the messages doesn't
%% crash
t_01_smoke_store(_Config) ->
DB = default,
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
Msg = message(<<"foo/bar">>, <<"foo">>, 0),
?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])).
@ -43,7 +49,7 @@ t_01_smoke_store(_Config) ->
%% doesn't crash and that iterators can be opened.
t_02_smoke_get_streams_start_iter(_Config) ->
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
StartTime = 0,
TopicFilter = ['#'],
[{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
@ -54,7 +60,7 @@ t_02_smoke_get_streams_start_iter(_Config) ->
%% over messages.
t_03_smoke_iterate(_Config) ->
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
StartTime = 0,
TopicFilter = ['#'],
Msgs = [
@ -75,7 +81,7 @@ t_03_smoke_iterate(_Config) ->
%% they are left off.
t_04_restart(_Config) ->
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
TopicFilter = ['#'],
StartTime = 0,
Msgs = [
@ -90,7 +96,7 @@ t_04_restart(_Config) ->
?tp(warning, emqx_ds_SUITE_restart_app, #{}),
ok = application:stop(emqx_durable_storage),
{ok, _} = application:ensure_all_started(emqx_durable_storage),
ok = emqx_ds:open_db(DB, #{}),
ok = emqx_ds:open_db(DB, opts()),
%% The old iterator should be still operational:
{ok, Iter, Batch} = iterate(Iter0, 1),
?assertEqual(Msgs, Batch, {Iter0, Iter}).

View File

@ -1,188 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_message_storage_bitmask_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("stdlib/include/assert.hrl").
-import(emqx_ds_message_storage_bitmask, [
make_keymapper/1,
keymapper_info/1,
compute_topic_bitmask/2,
compute_time_bitmask/1,
compute_topic_seek/4
]).
all() -> emqx_common_test_helpers:all(?MODULE).
t_make_keymapper(_) ->
?assertMatch(
#{
source := [
{timestamp, 9, 23},
{hash, level, 2},
{hash, level, 4},
{hash, levels, 8},
{timestamp, 0, 9}
],
bitsize := 46,
epoch := 512
},
keymapper_info(
make_keymapper(#{
timestamp_bits => 32,
topic_bits_per_level => [2, 4, 8],
epoch => 1000
})
)
).
t_make_keymapper_single_hash_level(_) ->
?assertMatch(
#{
source := [
{timestamp, 0, 32},
{hash, levels, 16}
],
bitsize := 48,
epoch := 1
},
keymapper_info(
make_keymapper(#{
timestamp_bits => 32,
topic_bits_per_level => [16],
epoch => 1
})
)
).
t_make_keymapper_no_timestamp(_) ->
?assertMatch(
#{
source := [
{hash, level, 4},
{hash, level, 8},
{hash, levels, 16}
],
bitsize := 28,
epoch := 1
},
keymapper_info(
make_keymapper(#{
timestamp_bits => 0,
topic_bits_per_level => [4, 8, 16],
epoch => 42
})
)
).
t_compute_topic_bitmask(_) ->
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
?assertEqual(
2#111_1111_11111_11,
compute_topic_bitmask([<<"foo">>, <<"bar">>], KM)
),
?assertEqual(
2#111_0000_11111_11,
compute_topic_bitmask([<<"foo">>, '+'], KM)
),
?assertEqual(
2#111_0000_00000_11,
compute_topic_bitmask([<<"foo">>, '+', '+'], KM)
),
?assertEqual(
2#111_0000_11111_00,
compute_topic_bitmask([<<"foo">>, '+', <<"bar">>, '+'], KM)
).
t_compute_topic_bitmask_wildcard(_) ->
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
?assertEqual(
2#000_0000_00000_00,
compute_topic_bitmask(['#'], KM)
),
?assertEqual(
2#111_0000_00000_00,
compute_topic_bitmask([<<"foo">>, '#'], KM)
),
?assertEqual(
2#111_1111_11111_00,
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'], KM)
).
t_compute_topic_bitmask_wildcard_long_tail(_) ->
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
?assertEqual(
2#111_1111_11111_11,
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, <<"xyzzy">>], KM)
),
?assertEqual(
2#111_1111_11111_00,
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'], KM)
).
t_compute_time_bitmask(_) ->
KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 200}),
?assertEqual(2#111_000000_1111111, compute_time_bitmask(KM)).
t_compute_time_bitmask_epoch_only(_) ->
KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 1}),
?assertEqual(2#1111111111_000000, compute_time_bitmask(KM)).
%% Filter = |123|***|678|***|
%% Mask = |123|***|678|***|
%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000|
%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000|
%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos
%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos
t_compute_next_topic_seek(_) ->
KM = make_keymapper(#{topic_bits_per_level => [8, 8, 16, 12], timestamp_bits => 0, epoch => 1}),
?assertMatch(
none,
compute_topic_seek(
16#FD_42_4242_043,
16#FD_42_4242_042,
16#FF_FF_FFFF_FFF,
KM
)
),
?assertMatch(
16#FD_11_0678_000,
compute_topic_seek(
16#FD_11_0108_121,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
),
?assertMatch(
16#FD_12_0678_000,
compute_topic_seek(
16#FD_11_0679_919,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
),
?assertMatch(
none,
compute_topic_seek(
16#FD_FF_0679_001,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
),
?assertMatch(
none,
compute_topic_seek(
16#FE_11_0179_017,
16#FD_00_0678_000,
16#FF_00_FFFF_000,
KM
)
).

View File

@ -0,0 +1,343 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_bitfield_lts_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG, #{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}
}).
-define(COMPACT_CONFIG, #{
backend => builtin,
storage =>
{emqx_ds_storage_bitfield_lts, #{
bits_per_wildcard_level => 8
}}
}).
%% Smoke test for opening and reopening the database
t_open(_Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
%% Smoke test of store function
t_store(_Config) ->
MessageID = emqx_guid:gen(),
PublishedAt = 1000,
Topic = <<"foo/bar">>,
Payload = <<"message">>,
Msg = #message{
id = MessageID,
topic = Topic,
payload = Payload,
timestamp = PublishedAt
},
?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [Msg], #{})).
%% Smoke test for iteration through a concrete topic
t_iterate(_Config) ->
%% Prepare data:
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
%% Iterate through individual topics:
[
begin
[{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0),
{ok, NextIt, Messages} = emqx_ds_storage_layer:next(?SHARD, It, 100),
?assertEqual(
lists:map(fun integer_to_binary/1, Timestamps),
payloads(Messages)
),
{ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100)
end
|| Topic <- Topics
],
ok.
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
%% Smoke test that verifies that concrete topics become individual
%% streams, unless there's too many of them
t_get_streams(_Config) ->
%% Prepare data:
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
GetStream = fun(Topic) ->
StartTime = 0,
emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime)
end,
%% Get streams for individual topics to use as a reference for later:
[FooBar = {_, _}] = GetStream(<<"foo/bar">>),
[FooBarBaz] = GetStream(<<"foo/bar/baz">>),
[A] = GetStream(<<"a">>),
%% Restart shard to make sure trie is persisted:
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
%% Test various wildcards:
[] = GetStream(<<"bar/foo">>),
?assertEqual([FooBar], GetStream("+/+")),
?assertSameSet([FooBar, FooBarBaz], GetStream(<<"foo/#">>)),
?assertSameSet([FooBar, FooBarBaz, A], GetStream(<<"#">>)),
%% Now insert a bunch of messages with different topics to create wildcards:
NewBatch = [
begin
B = integer_to_binary(I),
make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)
end
|| I <- lists:seq(1, 200)
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []),
%% Check that "foo/bar/baz" topic now appears in two streams:
%% "foo/bar/baz" and "foo/bar/+":
NewStreams = lists:sort(GetStream(<<"foo/bar/baz">>)),
?assertMatch([_, _], NewStreams),
?assertMatch([_], NewStreams -- [FooBarBaz]),
ok.
%% Smoke test for iteration with wildcard topic filter
%% t_iterate_wildcard(_Config) ->
%% %% Prepare data:
%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
%% Timestamps = lists:seq(1, 10),
%% _ = [
%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
%% || Topic <- Topics, PublishedAt <- Timestamps
%% ],
%% ?assertEqual(
%% lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)])
%% ),
%% ?assertEqual(
%% [],
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)])
%% ),
%% ?assertEqual(
%% lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)])
%% ),
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
%% ),
%% ?assertEqual(
%% lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)])
%% ),
%% ?assertEqual(
%% [],
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)])
%% ),
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)])
%% ),
%% ?assertEqual(
%% lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)])
%% ),
%% ?assertEqual(
%% [],
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)])
%% ),
%% ok.
%% t_create_gen(_Config) ->
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
%% ?assertEqual(
%% {error, nonmonotonic},
%% emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
%% ),
%% ?assertEqual(
%% {error, nonmonotonic},
%% emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
%% ),
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
%% Topics = ["foo/bar", "foo/bar/baz"],
%% Timestamps = lists:seq(1, 100),
%% [
%% ?assertMatch({ok, [_]}, store(?SHARD, PublishedAt, Topic, <<>>))
%% || Topic <- Topics, PublishedAt <- Timestamps
%% ].
%% t_iterate_multigen(_Config) ->
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
%% Timestamps = lists:seq(1, 100),
%% _ = [
%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
%% || Topic <- Topics, PublishedAt <- Timestamps
%% ],
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
%% ),
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
%% ).
%% t_iterate_multigen_preserve_restore(_Config) ->
%% ReplayID = atom_to_binary(?FUNCTION_NAME),
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
%% Timestamps = lists:seq(1, 100),
%% TopicFilter = "foo/#",
%% TopicsMatching = ["foo/bar", "foo/bar/baz"],
%% _ = [
%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
%% || Topic <- Topics, TS <- Timestamps
%% ],
%% It0 = iterator(?SHARD, TopicFilter, 0),
%% {It1, Res10} = iterate(It0, 10),
%% % preserve mid-generation
%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
%% {It3, Res100} = iterate(It2, 88),
%% % preserve on the generation boundary
%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
%% {It5, Res200} = iterate(It4, 1000),
%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)),
%% ?assertEqual(
%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
%% ),
%% ?assertEqual(
%% ok,
%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
%% ),
%% ?assertEqual(
%% {error, not_found},
%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
%% ).
make_message(PublishedAt, Topic, Payload) when is_list(Topic) ->
make_message(PublishedAt, list_to_binary(Topic), Payload);
make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
ID = emqx_guid:gen(),
#message{
id = ID,
topic = Topic,
timestamp = PublishedAt,
payload = Payload
}.
store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) ->
store(Shard, PublishedAt, list_to_binary(TopicL), Payload);
store(Shard, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
Msg = #message{
id = ID,
topic = Topic,
timestamp = PublishedAt,
payload = Payload
},
emqx_ds_storage_layer:message_store(Shard, [Msg], #{}).
%% iterate(Shard, TopicFilter, StartTime) ->
%% Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime),
%% lists:flatmap(
%% fun(Stream) ->
%% iterate(Shard, iterator(Shard, Stream, TopicFilter, StartTime))
%% end,
%% Streams).
%% iterate(Shard, It) ->
%% case emqx_ds_storage_layer:next(Shard, It) of
%% {ok, ItNext, [#message{payload = Payload}]} ->
%% [Payload | iterate(Shard, ItNext)];
%% end_of_stream ->
%% []
%% end.
%% iterate(_Shard, end_of_stream, _N) ->
%% {end_of_stream, []};
%% iterate(Shard, It, N) ->
%% case emqx_ds_storage_layer:next(Shard, It, N) of
%% {ok, ItFinal, Messages} ->
%% {ItFinal, [Payload || #message{payload = Payload} <- Messages]};
%% end_of_stream ->
%% {end_of_stream, []}
%% end.
%% iterator(Shard, Stream, TopicFilter, StartTime) ->
%% {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, parse_topic(TopicFilter), StartTime),
%% It.
payloads(Messages) ->
lists:map(
fun(#message{payload = P}) ->
P
end,
Messages
).
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
Topic;
parse_topic(Topic) ->
emqx_topic:words(iolist_to_binary(Topic)).
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_durable_storage),
Config.
end_per_suite(_Config) ->
ok = application:stop(emqx_durable_storage).
init_per_testcase(TC, Config) ->
{ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), ?DEFAULT_CONFIG),
Config.
end_per_testcase(TC, _Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
shard(TC) ->
{?MODULE, TC}.
keyspace(TC) ->
TC.
set_keyspace_config(Keyspace, Config) ->
ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).

View File

@ -1,292 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_layer_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG,
{emqx_ds_message_storage_bitmask, #{
timestamp_bits => 64,
topic_bits_per_level => [8, 8, 32, 16],
epoch => 5,
iteration => #{
iterator_refresh => {every, 5}
}
}}
).
-define(COMPACT_CONFIG,
{emqx_ds_message_storage_bitmask, #{
timestamp_bits => 16,
topic_bits_per_level => [16, 16],
epoch => 10
}}
).
%% Smoke test for opening and reopening the database
t_open(_Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
%% Smoke test of store function
t_store(_Config) ->
MessageID = emqx_guid:gen(),
PublishedAt = 1000,
Topic = <<"foo/bar">>,
Payload = <<"message">>,
Msg = #message{
id = MessageID,
topic = Topic,
payload = Payload,
timestamp = PublishedAt
},
?assertMatch({ok, [_]}, emqx_ds_storage_layer:message_store(?SHARD, [Msg], #{})).
%% Smoke test for iteration through a concrete topic
t_iterate(_Config) ->
%% Prepare data:
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
Timestamps = lists:seq(1, 10),
[
store(
?SHARD,
PublishedAt,
Topic,
integer_to_binary(PublishedAt)
)
|| Topic <- Topics, PublishedAt <- Timestamps
],
%% Iterate through individual topics:
[
begin
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, {parse_topic(Topic), 0}),
Values = iterate(It),
?assertEqual(lists:map(fun integer_to_binary/1, Timestamps), Values)
end
|| Topic <- Topics
],
ok.
%% Smoke test for iteration with wildcard topic filter
t_iterate_wildcard(_Config) ->
%% Prepare data:
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 10),
_ = [
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
),
?assertEqual(
lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)])
),
?assertEqual(
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)])
),
ok.
t_iterate_long_tail_wildcard(_Config) ->
Topic = "b/c/d/e/f/g",
TopicFilter = "b/c/d/e/+/+",
Timestamps = lists:seq(1, 100),
_ = [
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, TopicFilter, 50)])
).
t_create_gen(_Config) ->
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
?assertEqual(
{error, nonmonotonic},
emqx_ds_storage_layer:create_generation(?SHARD, 1, ?DEFAULT_CONFIG)
),
?assertEqual(
{error, nonmonotonic},
emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG)
),
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz"],
Timestamps = lists:seq(1, 100),
[
?assertMatch({ok, [_]}, store(?SHARD, PublishedAt, Topic, <<>>))
|| Topic <- Topics, PublishedAt <- Timestamps
].
t_iterate_multigen(_Config) ->
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
Timestamps = lists:seq(1, 100),
_ = [
store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|| Topic <- Topics, PublishedAt <- Timestamps
],
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
),
?assertEqual(
lists:sort([
{Topic, PublishedAt}
|| Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
).
t_iterate_multigen_preserve_restore(_Config) ->
ReplayID = atom_to_binary(?FUNCTION_NAME),
{ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
Timestamps = lists:seq(1, 100),
TopicFilter = "foo/#",
TopicsMatching = ["foo/bar", "foo/bar/baz"],
_ = [
store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
|| Topic <- Topics, TS <- Timestamps
],
It0 = iterator(?SHARD, TopicFilter, 0),
{It1, Res10} = iterate(It0, 10),
% preserve mid-generation
ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
{ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
{It3, Res100} = iterate(It2, 88),
% preserve on the generation boundary
ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
{ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
{It5, Res200} = iterate(It4, 1000),
?assertEqual({end_of_stream, []}, iterate(It5, 1)),
?assertEqual(
lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
),
?assertEqual(
ok,
emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
),
?assertEqual(
{error, not_found},
emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
).
store(Shard, PublishedAt, TopicL, Payload) when is_list(TopicL) ->
store(Shard, PublishedAt, list_to_binary(TopicL), Payload);
store(Shard, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
Msg = #message{
id = ID,
topic = Topic,
timestamp = PublishedAt,
payload = Payload
},
emqx_ds_storage_layer:message_store(Shard, [Msg], #{}).
iterate(DB, TopicFilter, StartTime) ->
iterate(iterator(DB, TopicFilter, StartTime)).
iterate(It) ->
case emqx_ds_storage_layer:next(It) of
{ok, ItNext, [#message{payload = Payload}]} ->
[Payload | iterate(ItNext)];
end_of_stream ->
[]
end.
iterate(end_of_stream, _N) ->
{end_of_stream, []};
iterate(It, N) ->
case emqx_ds_storage_layer:next(It, N) of
{ok, ItFinal, Messages} ->
{ItFinal, [Payload || #message{payload = Payload} <- Messages]};
end_of_stream ->
{end_of_stream, []}
end.
iterator(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_ds_storage_layer:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
It.
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
Topic;
parse_topic(Topic) ->
emqx_topic:words(iolist_to_binary(Topic)).
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_durable_storage),
Config.
end_per_suite(_Config) ->
ok = application:stop(emqx_durable_storage).
init_per_testcase(TC, Config) ->
ok = set_keyspace_config(keyspace(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}),
Config.
end_per_testcase(TC, _Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
shard(TC) ->
iolist_to_binary([?MODULE_STRING, "_", atom_to_list(TC)]).
keyspace(TC) ->
TC.
set_keyspace_config(Keyspace, Config) ->
ok = application:set_env(emqx_ds, keyspace_config, #{Keyspace => Config}).