From e745e42093656e4f123ae6780d3257649d1d5001 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 18 Oct 2023 01:07:21 +0200 Subject: [PATCH] test(ds): Explore full range of keys when testing ratchet function --- apps/emqx_durable_storage/src/emqx_ds.erl_ | 189 ------- .../src/emqx_ds_bitmask_keymapper.erl | 42 +- .../emqx_ds_message_storage_bitmask_shim.erl | 17 +- .../props/prop_replay_message_storage.erl | 463 ------------------ 4 files changed, 40 insertions(+), 671 deletions(-) delete mode 100644 apps/emqx_durable_storage/src/emqx_ds.erl_ delete mode 100644 apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl_ b/apps/emqx_durable_storage/src/emqx_ds.erl_ deleted file mode 100644 index 1acbcc7c7..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds.erl_ +++ /dev/null @@ -1,189 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_ds). - --include_lib("stdlib/include/ms_transform.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - -%% API: --export([ensure_shard/2]). -%% Messages: --export([message_store/2, message_store/1, message_stats/0]). -%% Iterator: --export([get_streams/3, open_iterator/1, next/2]). - -%% internal exports: --export([]). - --export_type([ - stream/0, - keyspace/0, - message_id/0, - message_stats/0, - message_store_opts/0, - replay/0, - replay_id/0, - %iterator_id/0, - iterator/0, - topic/0, - topic_filter/0, - time/0 -]). - -%%================================================================================ -%% Type declarations -%%================================================================================ - -%% This record enapsulates the stream entity from the storage level. -%% -%% TODO: currently the stream is hardwired to only support the -%% internal rocksdb storage. In t he future we want to add another -%% implementations for emqx_ds, so this type has to take this into -%% account. --record(stream, - { shard :: emqx_ds:shard() - , :: emqx_ds_storage_layer:stream() - }). - --opaque stream() :: #stream{}. - --type iterator() :: term(). - -%-type iterator_id() :: binary(). - --type message_store_opts() :: #{}. - --type message_stats() :: #{}. - --type message_id() :: binary(). - -%% Parsed topic. --type topic() :: list(binary()). - -%% Parsed topic filter. --type topic_filter() :: list(binary() | '+' | '#' | ''). - --type keyspace() :: atom(). --type shard_id() :: binary(). --type shard() :: {keyspace(), shard_id()}. - -%% Timestamp -%% Earliest possible timestamp is 0. -%% TODO granularity? Currently, we should always use micro second, as that's the unit we -%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. --type time() :: non_neg_integer(). - --type replay_id() :: binary(). - --type replay() :: { - _TopicFilter :: topic_filter(), - _StartTime :: time() -}. - -%%================================================================================ -%% API funcions -%%================================================================================ - -%% @doc Get a list of streams needed for replaying a topic filter. -%% -%% Motivation: under the hood, EMQX may store different topics at -%% different locations or even in different databases. A wildcard -%% topic filter may require pulling data from any number of locations. -%% -%% Stream is an abstraction exposed by `emqx_ds' that reflects the -%% notion that different topics can be stored differently, but hides -%% the implementation details. -%% -%% Rules: -%% -%% 1. New streams matching the topic filter can appear without notice, -%% so the replayer must periodically call this function to get the -%% updated list of streams. -%% -%% 2. Streams may depend on one another. Therefore, care should be -%% taken while replaying them in parallel to avoid out-of-order -%% replay. This function returns stream together with its -%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is -%% different, then they can be replayed in parallel. If it's the -%% same, then the stream with smaller T coordinate should be replayed -%% first. --spec get_streams(keyspace(), topic_filter(), time()) -> [{integer(), integer(), stream()}]. -get_streams(Keyspace, TopicFilter, StartTime) -> - ShardIds = emqx_ds_replication_layer:get_all_shards(Keyspace), - lists:flatmap( - fun(Shard) -> - Node = emqx_ds_replication_layer:shard_to_node(Shard), - try - Streams = emqx_persistent_session_ds_proto_v1:get_streams(Node, Keyspace, Shard, TopicFilter, StartTime), - [#stream{ shard = {Keyspace, ShardId} - , stream = Stream - } || Stream <- Streams] - catch - error:{erpc, _} -> - %% The caller has to periodically refresh the - %% list of streams anyway, so it's ok to ignore - %% transient errors. - [] - end - end, - ShardIds). - --spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> - ok | {error, _Reason}. -ensure_shard(Sharzd, Options) -> - case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of - {ok, _Pid} -> - ok; - {error, {already_started, _Pid}} -> - ok; - {error, Reason} -> - {error, Reason} - end. - -%%-------------------------------------------------------------------------------- -%% Message -%%-------------------------------------------------------------------------------- - --spec message_store([emqx_types:message()], message_store_opts()) -> - {ok, [message_id()]} | {error, _}. -message_store(Msg, Opts) -> - message_store(Msg, Opts). - --spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. -message_store(Msg) -> - message_store(Msg, #{}). - --spec message_stats() -> message_stats(). -message_stats() -> - #{}. - -%%-------------------------------------------------------------------------------- -%% Iterator (pull API) -%%-------------------------------------------------------------------------------- - --spec open_iterator(stream()) -> {ok, iterator()}. -open_iterator(#stream{shard = {_Keyspace, _ShardId}, stream = _StorageSpecificStream}) -> - error(todo). - --spec next(iterator(), non_neg_integer()) -> - {ok, iterator(), [emqx_types:message()]} - | end_of_stream. -next(_Iterator, _BatchSize) -> - error(todo). - -%%================================================================================ -%% Internal functions -%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index e18c8498d..90c381104 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -699,26 +699,44 @@ ratchet2_test() -> ?assertEqual(16#aaddcc00, ratchet(F2, 0)), ?assertEqual(16#aa_de_cc_00, ratchet(F2, 16#aa_dd_cd_11)). -ratchet3_test() -> - ?assert(proper:quickcheck(ratchet1_prop(), 100)). - %% erlfmt-ignore -ratchet1_prop() -> +ratchet3_test_() -> EpochBits = 4, Bitsources = [{1, 0, 2}, %% Static topic index {2, EpochBits, 4}, %% Epoch {3, 0, 2}, %% Varying topic hash {2, 0, EpochBits}], %% Timestamp offset - M = make_keymapper(lists:reverse(Bitsources)), - F1 = make_filter(M, [{'=', 2#10}, any, {'=', 2#01}]), - ?FORALL(N, integer(0, ones(12)), - ratchet_prop(F1, N)). + Keymapper = make_keymapper(lists:reverse(Bitsources)), + Filter1 = make_filter(Keymapper, [{'=', 2#10}, any, {'=', 2#01}]), + Filter2 = make_filter(Keymapper, [{'=', 2#01}, any, any]), + Filter3 = make_filter(Keymapper, [{'=', 2#01}, {'>=', 16#aa}, any]), + {timeout, 15, + [?_assert(test_iterate(Filter1, 0)), + ?_assert(test_iterate(Filter2, 0)), + %% Not starting from 0 here for simplicity, since the beginning + %% of a >= interval can't be properly checked with a bitmask: + ?_assert(test_iterate(Filter3, ratchet(Filter3, 1))) + ]}. -ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0) -> - Key = ratchet(Filter, Key0), +%% Note: this function iterates through the full range of keys, so its +%% complexity grows _exponentially_ with the total size of the +%% keymapper. +test_iterate(Filter, overflow) -> + true; +test_iterate(Filter, Key0) -> + Key = ratchet(Filter, Key0 + 1), + ?assert(ratchet_prop(Filter, Key0, Key)), + test_iterate(Filter, Key). + +ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0, Key) -> + %% Validate basic properties of the generated key. It must be + %% greater than the old key, and match the bitmask: ?assert(Key =:= overflow orelse (Key band Bitmask =:= Bitfilter)), - ?assert(Key >= Key0, {Key, '>=', Key}), + ?assert(Key > Key0, {Key, '>=', Key}), IMax = ones(Size), + %% Iterate through all keys between `Key0 + 1' and `Key' and + %% validate that none of them match the bitmask. Ultimately, it + %% means that `ratchet' function doesn't skip over any valid keys: CheckGaps = fun F(I) when I >= Key; I > IMax -> true; @@ -729,7 +747,7 @@ ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = S ), F(I + 1) end, - CheckGaps(Key0). + CheckGaps(Key0 + 1). mkbmask(Keymapper, Filter0) -> Filter = inequations_to_ranges(Keymapper, Filter0), diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index e9daf2581..9b5af9428 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -4,9 +4,11 @@ -module(emqx_ds_message_storage_bitmask_shim). +-include_lib("emqx/include/emqx.hrl"). + -export([open/0]). -export([close/1]). --export([store/5]). +-export([store/2]). -export([iterate/2]). -type topic() :: list(binary()). @@ -25,20 +27,21 @@ close(Tab) -> true = ets:delete(Tab), ok. --spec store(t(), emqx_guid:guid(), time(), topic(), binary()) -> +-spec store(t(), emqx_types:message()) -> ok | {error, _TODO}. -store(Tab, MessageID, PublishedAt, Topic, Payload) -> - true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), +store(Tab, Msg = #message{id = MessageID, timestamp = PublishedAt}) -> + true = ets:insert(Tab, {{PublishedAt, MessageID}, Msg}), ok. -spec iterate(t(), emqx_ds:replay()) -> [binary()]. -iterate(Tab, {TopicFilter, StartTime}) -> +iterate(Tab, {TopicFilter0, StartTime}) -> + TopicFilter = iolist_to_binary(lists:join("/", TopicFilter0)), ets:foldr( - fun({{PublishedAt, _}, Topic, Payload}, Acc) -> + fun({{PublishedAt, _}, Msg = #message{topic = Topic}}, Acc) -> case emqx_topic:match(Topic, TopicFilter) of true when PublishedAt >= StartTime -> - [Payload | Acc]; + [Msg | Acc]; _ -> Acc end diff --git a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl b/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl deleted file mode 100644 index d96996534..000000000 --- a/apps/emqx_durable_storage/test/props/prop_replay_message_storage.erl +++ /dev/null @@ -1,463 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(prop_replay_message_storage). - --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(WORK_DIR, ["_build", "test"]). --define(RUN_ID, {?MODULE, testrun_id}). - --define(KEYSPACE, ?MODULE). --define(SHARD_ID, <<"shard">>). --define(SHARD, {?KEYSPACE, ?SHARD_ID}). --define(GEN_ID, 42). - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_bitstring_computes() -> - ?FORALL( - Keymapper, - keymapper(), - ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin - BS = emqx_ds_message_storage_bitmask:compute_bitstring(Topic, Timestamp, Keymapper), - is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper))) - end) - ). - -prop_topic_bitmask_computes() -> - Keymapper = make_keymapper(16, [8, 12, 16], 100), - ?FORALL(TopicFilter, topic_filter(), begin - Mask = emqx_ds_message_storage_bitmask:compute_topic_bitmask(TopicFilter, Keymapper), - % topic bits + timestamp LSBs - is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) - end). - -prop_next_seek_monotonic() -> - ?FORALL( - {TopicFilter, StartTime, Keymapper}, - {topic_filter(), pos_integer(), keymapper()}, - begin - Filter = emqx_ds_message_storage_bitmask:make_keyspace_filter( - {TopicFilter, StartTime}, - Keymapper - ), - ?FORALL( - Bitstring, - bitstr(get_keymapper_bitsize(Keymapper)), - emqx_ds_message_storage_bitmask:compute_next_seek(Bitstring, Filter) >= Bitstring - ) - end - ). - -prop_next_seek_eq_initial_seek() -> - ?FORALL( - Filter, - keyspace_filter(), - emqx_ds_message_storage_bitmask:compute_initial_seek(Filter) =:= - emqx_ds_message_storage_bitmask:compute_next_seek(0, Filter) - ). - -prop_iterate_messages() -> - TBPL = [4, 8, 12], - Options = #{ - timestamp_bits => 32, - topic_bits_per_level => TBPL, - epoch => 200 - }, - % TODO - % Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit. - ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin - Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), - {DB, Handle} = open_db(Filepath, Options), - Shim = emqx_ds_message_storage_bitmask_shim:open(), - ok = store_db(DB, Stream), - ok = store_shim(Shim, Stream), - ?FORALL( - { - {Topic, _}, - Pattern, - StartTime - }, - { - nth(Stream), - topic_filter_pattern(), - start_time() - }, - begin - TopicFilter = make_topic_filter(Pattern, Topic), - Iteration = {TopicFilter, StartTime}, - Messages = iterate_db(DB, Iteration), - Reference = iterate_shim(Shim, Iteration), - ok = close_db(Handle), - ok = emqx_ds_message_storage_bitmask_shim:close(Shim), - ?WHENFAIL( - begin - io:format(user, " *** Filepath = ~s~n", [Filepath]), - io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]), - io:format(user, " *** StartTime = ~p~n", [StartTime]) - end, - is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages) - ) - end - ) - end). - -prop_iterate_eq_iterate_with_preserve_restore() -> - TBPL = [4, 8, 16, 12], - Options = #{ - timestamp_bits => 32, - topic_bits_per_level => TBPL, - epoch => 500 - }, - {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), - ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin - % TODO - % This proptest is impure because messages from testruns assumed to be - % independent of each other are accumulated in the same storage. This - % would probably confuse shrinker in the event a testrun fails. - ok = store_db(DB, Stream), - ?FORALL( - { - {Topic, _}, - Pat, - StartTime, - Commands - }, - { - nth(Stream), - topic_filter_pattern(), - start_time(), - shuffled(flat([non_empty(list({preserve, restore})), list(iterate)])) - }, - begin - Replay = {make_topic_filter(Pat, Topic), StartTime}, - Iterator = make_iterator(DB, Replay), - Ctx = #{db => DB, replay => Replay}, - Messages = run_iterator_commands(Commands, Iterator, Ctx), - equals(Messages, iterate_db(DB, Replay)) - end - ) - end). - -prop_iterate_eq_iterate_with_refresh() -> - TBPL = [4, 8, 16, 12], - Options = #{ - timestamp_bits => 32, - topic_bits_per_level => TBPL, - epoch => 500 - }, - {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), - ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin - % TODO - % This proptest is also impure, see above. - ok = store_db(DB, Stream), - ?FORALL( - { - {Topic, _}, - Pat, - StartTime, - RefreshEvery - }, - { - nth(Stream), - topic_filter_pattern(), - start_time(), - pos_integer() - }, - ?TIMEOUT(5000, begin - Replay = {make_topic_filter(Pat, Topic), StartTime}, - IterationOptions = #{iterator_refresh => {every, RefreshEvery}}, - Iterator = make_iterator(DB, Replay, IterationOptions), - Messages = iterate_db(Iterator), - equals(Messages, iterate_db(DB, Replay)) - end) - ) - end). - -% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> -% MessageID = emqx_guid:gen(), -% PublishedAt = ChunkNum, -% MessageID, PublishedAt, Topic -% ]), -% ok = emqx_ds_message_storage_bitmask:store(DB, MessageID, PublishedAt, Topic, Payload), -% store_message_stream(DB, payload_gen:next(Rest)); -% store_message_stream(_Zone, []) -> -% ok. - -store_db(DB, Messages) -> - lists:foreach( - fun({Topic, Payload = {MessageID, Timestamp, _}}) -> - Bin = term_to_binary(Payload), - emqx_ds_message_storage_bitmask:store(DB, MessageID, Timestamp, Topic, Bin) - end, - Messages - ). - -iterate_db(DB, Iteration) -> - iterate_db(make_iterator(DB, Iteration)). - -iterate_db(It) -> - case emqx_ds_message_storage_bitmask:next(It) of - {value, Payload, ItNext} -> - [binary_to_term(Payload) | iterate_db(ItNext)]; - none -> - [] - end. - -make_iterator(DB, Replay) -> - {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay), - It. - -make_iterator(DB, Replay, Options) -> - {ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay, Options), - It. - -run_iterator_commands([iterate | Rest], It, Ctx) -> - case emqx_ds_message_storage_bitmask:next(It) of - {value, Payload, ItNext} -> - [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)]; - none -> - [] - end; -run_iterator_commands([{preserve, restore} | Rest], It, Ctx) -> - #{db := DB} = Ctx, - Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It), - {ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Serial), - run_iterator_commands(Rest, ItNext, Ctx); -run_iterator_commands([], It, _Ctx) -> - iterate_db(It). - -store_shim(Shim, Messages) -> - lists:foreach( - fun({Topic, Payload = {MessageID, Timestamp, _}}) -> - Bin = term_to_binary(Payload), - emqx_ds_message_storage_bitmask_shim:store(Shim, MessageID, Timestamp, Topic, Bin) - end, - Messages - ). - -iterate_shim(Shim, Iteration) -> - lists:map( - fun binary_to_term/1, - emqx_ds_message_storage_bitmask_shim:iterate(Shim, Iteration) - ). - -%%-------------------------------------------------------------------- -%% Setup / teardown -%%-------------------------------------------------------------------- - -open_db(Filepath, Options) -> - {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), - {Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options), - DB = emqx_ds_message_storage_bitmask:open(?SHARD, Handle, ?GEN_ID, CFRefs, Schema), - {DB, Handle}. - -close_db(Handle) -> - rocksdb:close(Handle). - -make_filepath(TC) -> - make_filepath(TC, 0). - -make_filepath(TC, InstID) -> - Name = io_lib:format("~0p.~0p", [TC, InstID]), - Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]), - ok = filelib:ensure_dir(Path), - Path. - -get_run_id() -> - case persistent_term:get(?RUN_ID, undefined) of - RunID when RunID /= undefined -> - RunID; - undefined -> - RunID = make_run_id(), - ok = persistent_term:put(?RUN_ID, RunID), - RunID - end. - -make_run_id() -> - calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]). - -%%-------------------------------------------------------------------- -%% Type generators -%%-------------------------------------------------------------------- - -topic() -> - non_empty(list(topic_level())). - -topic(EntropyWeights) -> - ?LET(L, scaled(1 / 4, list(1)), begin - EWs = lists:sublist(EntropyWeights ++ L, length(L)), - ?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs]) - end). - -topic_filter() -> - ?SUCHTHAT( - L, - non_empty( - list( - frequency([ - {5, topic_level()}, - {2, '+'}, - {1, '#'} - ]) - ) - ), - not lists:member('#', L) orelse lists:last(L) == '#' - ). - -topic_level_pattern() -> - frequency([ - {5, level}, - {2, '+'}, - {1, '#'} - ]). - -topic_filter_pattern() -> - list(topic_level_pattern()). - -topic_filter(Topic) -> - ?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)). - -make_topic_filter([], _) -> - []; -make_topic_filter(_, []) -> - []; -make_topic_filter(['#' | _], _) -> - ['#']; -make_topic_filter(['+' | Rest], [_ | Levels]) -> - ['+' | make_topic_filter(Rest, Levels)]; -make_topic_filter([level | Rest], [L | Levels]) -> - [L | make_topic_filter(Rest, Levels)]. - -% topic() -> -% ?LAZY(?SIZED(S, frequency([ -% {S, [topic_level() | topic()]}, -% {1, []} -% ]))). - -% topic_filter() -> -% ?LAZY(?SIZED(S, frequency([ -% {round(S / 3 * 2), [topic_level() | topic_filter()]}, -% {round(S / 3 * 1), ['+' | topic_filter()]}, -% {1, []}, -% {1, ['#']} -% ]))). - -topic_level() -> - ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). - -topic_level(Entropy) -> - S = floor(1 + math:log2(Entropy) / 4), - ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). - -topic_level_fixed() -> - oneof([ - <<"foo">>, - <<"bar">>, - <<"baz">>, - <<"xyzzy">> - ]). - -keymapper() -> - ?LET( - {TimestampBits, TopicBits, Epoch}, - { - range(0, 128), - non_empty(list(range(1, 32))), - pos_integer() - }, - make_keymapper(TimestampBits, TopicBits, Epoch * 100) - ). - -keyspace_filter() -> - ?LET( - {TopicFilter, StartTime, Keymapper}, - {topic_filter(), pos_integer(), keymapper()}, - emqx_ds_message_storage_bitmask:make_keyspace_filter({TopicFilter, StartTime}, Keymapper) - ). - -messages(Topic) -> - ?LET( - Ts, - list(Topic), - interleaved( - ?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages)) - ) - ). - -message() -> - ?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}). - -message_streams(Topic) -> - ?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]). - -timestamp() -> - scaled(20, pos_integer()). - -start_time() -> - scaled(10, pos_integer()). - -bitstr(Size) -> - ?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)). - -nth(L) -> - ?LET(I, range(1, length(L)), lists:nth(I, L)). - -scaled(Factor, T) -> - ?SIZED(S, resize(ceil(S * Factor), T)). - -interleaved(T) -> - ?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))). - -shuffled(T) -> - ?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))). - -flat(T) -> - ?LET(L, T, lists:flatten(L)). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> - emqx_ds_message_storage_bitmask:make_keymapper(#{ - timestamp_bits => TimestampBits, - topic_bits_per_level => TopicBits, - epoch => MaxEpoch - }). - -get_keymapper_bitsize(Keymapper) -> - maps:get(bitsize, emqx_ds_message_storage_bitmask:keymapper_info(Keymapper)). - --spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). -interleave(Seqs, Rng) -> - interleave(Seqs, length(Seqs), Rng). - -interleave(Seqs, L, Rng) when L > 0 -> - {N, RngNext} = rand:uniform_s(L, Rng), - {SeqHead, SeqTail} = lists:split(N - 1, Seqs), - case SeqTail of - [{Tag, [M | Rest]} | SeqRest] -> - [{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)]; - [{_, []} | SeqRest] -> - interleave(SeqHead ++ SeqRest, L - 1, RngNext) - end; -interleave([], 0, _) -> - []. - --spec shuffle(list(E), rand:state()) -> list(E). -shuffle(L, Rng) -> - {Rands, _} = randoms(length(L), Rng), - [E || {_, E} <- lists:sort(lists:zip(Rands, L))]. - -randoms(N, Rng) when N > 0 -> - {Rand, RngNext} = rand:uniform_s(Rng), - {Tail, RngFinal} = randoms(N - 1, RngNext), - {[Rand | Tail], RngFinal}; -randoms(_, Rng) -> - {[], Rng}.