test(ds): Explore full range of keys when testing ratchet function
This commit is contained in:
parent
87689890ff
commit
e745e42093
|
@ -1,189 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ds).
|
||||
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
%% API:
|
||||
-export([ensure_shard/2]).
|
||||
%% Messages:
|
||||
-export([message_store/2, message_store/1, message_stats/0]).
|
||||
%% Iterator:
|
||||
-export([get_streams/3, open_iterator/1, next/2]).
|
||||
|
||||
%% internal exports:
|
||||
-export([]).
|
||||
|
||||
-export_type([
|
||||
stream/0,
|
||||
keyspace/0,
|
||||
message_id/0,
|
||||
message_stats/0,
|
||||
message_store_opts/0,
|
||||
replay/0,
|
||||
replay_id/0,
|
||||
%iterator_id/0,
|
||||
iterator/0,
|
||||
topic/0,
|
||||
topic_filter/0,
|
||||
time/0
|
||||
]).
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
%% This record enapsulates the stream entity from the storage level.
|
||||
%%
|
||||
%% TODO: currently the stream is hardwired to only support the
|
||||
%% internal rocksdb storage. In t he future we want to add another
|
||||
%% implementations for emqx_ds, so this type has to take this into
|
||||
%% account.
|
||||
-record(stream,
|
||||
{ shard :: emqx_ds:shard()
|
||||
, :: emqx_ds_storage_layer:stream()
|
||||
}).
|
||||
|
||||
-opaque stream() :: #stream{}.
|
||||
|
||||
-type iterator() :: term().
|
||||
|
||||
%-type iterator_id() :: binary().
|
||||
|
||||
-type message_store_opts() :: #{}.
|
||||
|
||||
-type message_stats() :: #{}.
|
||||
|
||||
-type message_id() :: binary().
|
||||
|
||||
%% Parsed topic.
|
||||
-type topic() :: list(binary()).
|
||||
|
||||
%% Parsed topic filter.
|
||||
-type topic_filter() :: list(binary() | '+' | '#' | '').
|
||||
|
||||
-type keyspace() :: atom().
|
||||
-type shard_id() :: binary().
|
||||
-type shard() :: {keyspace(), shard_id()}.
|
||||
|
||||
%% Timestamp
|
||||
%% Earliest possible timestamp is 0.
|
||||
%% TODO granularity? Currently, we should always use micro second, as that's the unit we
|
||||
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
|
||||
-type time() :: non_neg_integer().
|
||||
|
||||
-type replay_id() :: binary().
|
||||
|
||||
-type replay() :: {
|
||||
_TopicFilter :: topic_filter(),
|
||||
_StartTime :: time()
|
||||
}.
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
%%================================================================================
|
||||
|
||||
%% @doc Get a list of streams needed for replaying a topic filter.
|
||||
%%
|
||||
%% Motivation: under the hood, EMQX may store different topics at
|
||||
%% different locations or even in different databases. A wildcard
|
||||
%% topic filter may require pulling data from any number of locations.
|
||||
%%
|
||||
%% Stream is an abstraction exposed by `emqx_ds' that reflects the
|
||||
%% notion that different topics can be stored differently, but hides
|
||||
%% the implementation details.
|
||||
%%
|
||||
%% Rules:
|
||||
%%
|
||||
%% 1. New streams matching the topic filter can appear without notice,
|
||||
%% so the replayer must periodically call this function to get the
|
||||
%% updated list of streams.
|
||||
%%
|
||||
%% 2. Streams may depend on one another. Therefore, care should be
|
||||
%% taken while replaying them in parallel to avoid out-of-order
|
||||
%% replay. This function returns stream together with its
|
||||
%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is
|
||||
%% different, then they can be replayed in parallel. If it's the
|
||||
%% same, then the stream with smaller T coordinate should be replayed
|
||||
%% first.
|
||||
-spec get_streams(keyspace(), topic_filter(), time()) -> [{integer(), integer(), stream()}].
|
||||
get_streams(Keyspace, TopicFilter, StartTime) ->
|
||||
ShardIds = emqx_ds_replication_layer:get_all_shards(Keyspace),
|
||||
lists:flatmap(
|
||||
fun(Shard) ->
|
||||
Node = emqx_ds_replication_layer:shard_to_node(Shard),
|
||||
try
|
||||
Streams = emqx_persistent_session_ds_proto_v1:get_streams(Node, Keyspace, Shard, TopicFilter, StartTime),
|
||||
[#stream{ shard = {Keyspace, ShardId}
|
||||
, stream = Stream
|
||||
} || Stream <- Streams]
|
||||
catch
|
||||
error:{erpc, _} ->
|
||||
%% The caller has to periodically refresh the
|
||||
%% list of streams anyway, so it's ok to ignore
|
||||
%% transient errors.
|
||||
[]
|
||||
end
|
||||
end,
|
||||
ShardIds).
|
||||
|
||||
-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
|
||||
ok | {error, _Reason}.
|
||||
ensure_shard(Sharzd, Options) ->
|
||||
case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
|
||||
{ok, _Pid} ->
|
||||
ok;
|
||||
{error, {already_started, _Pid}} ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------------------
|
||||
%% Message
|
||||
%%--------------------------------------------------------------------------------
|
||||
|
||||
-spec message_store([emqx_types:message()], message_store_opts()) ->
|
||||
{ok, [message_id()]} | {error, _}.
|
||||
message_store(Msg, Opts) ->
|
||||
message_store(Msg, Opts).
|
||||
|
||||
-spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
|
||||
message_store(Msg) ->
|
||||
message_store(Msg, #{}).
|
||||
|
||||
-spec message_stats() -> message_stats().
|
||||
message_stats() ->
|
||||
#{}.
|
||||
|
||||
%%--------------------------------------------------------------------------------
|
||||
%% Iterator (pull API)
|
||||
%%--------------------------------------------------------------------------------
|
||||
|
||||
-spec open_iterator(stream()) -> {ok, iterator()}.
|
||||
open_iterator(#stream{shard = {_Keyspace, _ShardId}, stream = _StorageSpecificStream}) ->
|
||||
error(todo).
|
||||
|
||||
-spec next(iterator(), non_neg_integer()) ->
|
||||
{ok, iterator(), [emqx_types:message()]}
|
||||
| end_of_stream.
|
||||
next(_Iterator, _BatchSize) ->
|
||||
error(todo).
|
||||
|
||||
%%================================================================================
|
||||
%% Internal functions
|
||||
%%================================================================================
|
|
@ -699,26 +699,44 @@ ratchet2_test() ->
|
|||
?assertEqual(16#aaddcc00, ratchet(F2, 0)),
|
||||
?assertEqual(16#aa_de_cc_00, ratchet(F2, 16#aa_dd_cd_11)).
|
||||
|
||||
ratchet3_test() ->
|
||||
?assert(proper:quickcheck(ratchet1_prop(), 100)).
|
||||
|
||||
%% erlfmt-ignore
|
||||
ratchet1_prop() ->
|
||||
ratchet3_test_() ->
|
||||
EpochBits = 4,
|
||||
Bitsources = [{1, 0, 2}, %% Static topic index
|
||||
{2, EpochBits, 4}, %% Epoch
|
||||
{3, 0, 2}, %% Varying topic hash
|
||||
{2, 0, EpochBits}], %% Timestamp offset
|
||||
M = make_keymapper(lists:reverse(Bitsources)),
|
||||
F1 = make_filter(M, [{'=', 2#10}, any, {'=', 2#01}]),
|
||||
?FORALL(N, integer(0, ones(12)),
|
||||
ratchet_prop(F1, N)).
|
||||
Keymapper = make_keymapper(lists:reverse(Bitsources)),
|
||||
Filter1 = make_filter(Keymapper, [{'=', 2#10}, any, {'=', 2#01}]),
|
||||
Filter2 = make_filter(Keymapper, [{'=', 2#01}, any, any]),
|
||||
Filter3 = make_filter(Keymapper, [{'=', 2#01}, {'>=', 16#aa}, any]),
|
||||
{timeout, 15,
|
||||
[?_assert(test_iterate(Filter1, 0)),
|
||||
?_assert(test_iterate(Filter2, 0)),
|
||||
%% Not starting from 0 here for simplicity, since the beginning
|
||||
%% of a >= interval can't be properly checked with a bitmask:
|
||||
?_assert(test_iterate(Filter3, ratchet(Filter3, 1)))
|
||||
]}.
|
||||
|
||||
ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0) ->
|
||||
Key = ratchet(Filter, Key0),
|
||||
%% Note: this function iterates through the full range of keys, so its
|
||||
%% complexity grows _exponentially_ with the total size of the
|
||||
%% keymapper.
|
||||
test_iterate(Filter, overflow) ->
|
||||
true;
|
||||
test_iterate(Filter, Key0) ->
|
||||
Key = ratchet(Filter, Key0 + 1),
|
||||
?assert(ratchet_prop(Filter, Key0, Key)),
|
||||
test_iterate(Filter, Key).
|
||||
|
||||
ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0, Key) ->
|
||||
%% Validate basic properties of the generated key. It must be
|
||||
%% greater than the old key, and match the bitmask:
|
||||
?assert(Key =:= overflow orelse (Key band Bitmask =:= Bitfilter)),
|
||||
?assert(Key >= Key0, {Key, '>=', Key}),
|
||||
?assert(Key > Key0, {Key, '>=', Key}),
|
||||
IMax = ones(Size),
|
||||
%% Iterate through all keys between `Key0 + 1' and `Key' and
|
||||
%% validate that none of them match the bitmask. Ultimately, it
|
||||
%% means that `ratchet' function doesn't skip over any valid keys:
|
||||
CheckGaps = fun
|
||||
F(I) when I >= Key; I > IMax ->
|
||||
true;
|
||||
|
@ -729,7 +747,7 @@ ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = S
|
|||
),
|
||||
F(I + 1)
|
||||
end,
|
||||
CheckGaps(Key0).
|
||||
CheckGaps(Key0 + 1).
|
||||
|
||||
mkbmask(Keymapper, Filter0) ->
|
||||
Filter = inequations_to_ranges(Keymapper, Filter0),
|
||||
|
|
|
@ -4,9 +4,11 @@
|
|||
|
||||
-module(emqx_ds_message_storage_bitmask_shim).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
|
||||
-export([open/0]).
|
||||
-export([close/1]).
|
||||
-export([store/5]).
|
||||
-export([store/2]).
|
||||
-export([iterate/2]).
|
||||
|
||||
-type topic() :: list(binary()).
|
||||
|
@ -25,20 +27,21 @@ close(Tab) ->
|
|||
true = ets:delete(Tab),
|
||||
ok.
|
||||
|
||||
-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) ->
|
||||
-spec store(t(), emqx_types:message()) ->
|
||||
ok | {error, _TODO}.
|
||||
store(Tab, MessageID, PublishedAt, Topic, Payload) ->
|
||||
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
|
||||
store(Tab, Msg = #message{id = MessageID, timestamp = PublishedAt}) ->
|
||||
true = ets:insert(Tab, {{PublishedAt, MessageID}, Msg}),
|
||||
ok.
|
||||
|
||||
-spec iterate(t(), emqx_ds:replay()) ->
|
||||
[binary()].
|
||||
iterate(Tab, {TopicFilter, StartTime}) ->
|
||||
iterate(Tab, {TopicFilter0, StartTime}) ->
|
||||
TopicFilter = iolist_to_binary(lists:join("/", TopicFilter0)),
|
||||
ets:foldr(
|
||||
fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
|
||||
fun({{PublishedAt, _}, Msg = #message{topic = Topic}}, Acc) ->
|
||||
case emqx_topic:match(Topic, TopicFilter) of
|
||||
true when PublishedAt >= StartTime ->
|
||||
[Payload | Acc];
|
||||
[Msg | Acc];
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
|
|
|
@ -1,463 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(prop_replay_message_storage).
|
||||
|
||||
-include_lib("proper/include/proper.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(WORK_DIR, ["_build", "test"]).
|
||||
-define(RUN_ID, {?MODULE, testrun_id}).
|
||||
|
||||
-define(KEYSPACE, ?MODULE).
|
||||
-define(SHARD_ID, <<"shard">>).
|
||||
-define(SHARD, {?KEYSPACE, ?SHARD_ID}).
|
||||
-define(GEN_ID, 42).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Properties
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
prop_bitstring_computes() ->
|
||||
?FORALL(
|
||||
Keymapper,
|
||||
keymapper(),
|
||||
?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
|
||||
BS = emqx_ds_message_storage_bitmask:compute_bitstring(Topic, Timestamp, Keymapper),
|
||||
is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper)))
|
||||
end)
|
||||
).
|
||||
|
||||
prop_topic_bitmask_computes() ->
|
||||
Keymapper = make_keymapper(16, [8, 12, 16], 100),
|
||||
?FORALL(TopicFilter, topic_filter(), begin
|
||||
Mask = emqx_ds_message_storage_bitmask:compute_topic_bitmask(TopicFilter, Keymapper),
|
||||
% topic bits + timestamp LSBs
|
||||
is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
|
||||
end).
|
||||
|
||||
prop_next_seek_monotonic() ->
|
||||
?FORALL(
|
||||
{TopicFilter, StartTime, Keymapper},
|
||||
{topic_filter(), pos_integer(), keymapper()},
|
||||
begin
|
||||
Filter = emqx_ds_message_storage_bitmask:make_keyspace_filter(
|
||||
{TopicFilter, StartTime},
|
||||
Keymapper
|
||||
),
|
||||
?FORALL(
|
||||
Bitstring,
|
||||
bitstr(get_keymapper_bitsize(Keymapper)),
|
||||
emqx_ds_message_storage_bitmask:compute_next_seek(Bitstring, Filter) >= Bitstring
|
||||
)
|
||||
end
|
||||
).
|
||||
|
||||
prop_next_seek_eq_initial_seek() ->
|
||||
?FORALL(
|
||||
Filter,
|
||||
keyspace_filter(),
|
||||
emqx_ds_message_storage_bitmask:compute_initial_seek(Filter) =:=
|
||||
emqx_ds_message_storage_bitmask:compute_next_seek(0, Filter)
|
||||
).
|
||||
|
||||
prop_iterate_messages() ->
|
||||
TBPL = [4, 8, 12],
|
||||
Options = #{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => TBPL,
|
||||
epoch => 200
|
||||
},
|
||||
% TODO
|
||||
% Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit.
|
||||
?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin
|
||||
Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)),
|
||||
{DB, Handle} = open_db(Filepath, Options),
|
||||
Shim = emqx_ds_message_storage_bitmask_shim:open(),
|
||||
ok = store_db(DB, Stream),
|
||||
ok = store_shim(Shim, Stream),
|
||||
?FORALL(
|
||||
{
|
||||
{Topic, _},
|
||||
Pattern,
|
||||
StartTime
|
||||
},
|
||||
{
|
||||
nth(Stream),
|
||||
topic_filter_pattern(),
|
||||
start_time()
|
||||
},
|
||||
begin
|
||||
TopicFilter = make_topic_filter(Pattern, Topic),
|
||||
Iteration = {TopicFilter, StartTime},
|
||||
Messages = iterate_db(DB, Iteration),
|
||||
Reference = iterate_shim(Shim, Iteration),
|
||||
ok = close_db(Handle),
|
||||
ok = emqx_ds_message_storage_bitmask_shim:close(Shim),
|
||||
?WHENFAIL(
|
||||
begin
|
||||
io:format(user, " *** Filepath = ~s~n", [Filepath]),
|
||||
io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]),
|
||||
io:format(user, " *** StartTime = ~p~n", [StartTime])
|
||||
end,
|
||||
is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages)
|
||||
)
|
||||
end
|
||||
)
|
||||
end).
|
||||
|
||||
prop_iterate_eq_iterate_with_preserve_restore() ->
|
||||
TBPL = [4, 8, 16, 12],
|
||||
Options = #{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => TBPL,
|
||||
epoch => 500
|
||||
},
|
||||
{DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
|
||||
?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
|
||||
% TODO
|
||||
% This proptest is impure because messages from testruns assumed to be
|
||||
% independent of each other are accumulated in the same storage. This
|
||||
% would probably confuse shrinker in the event a testrun fails.
|
||||
ok = store_db(DB, Stream),
|
||||
?FORALL(
|
||||
{
|
||||
{Topic, _},
|
||||
Pat,
|
||||
StartTime,
|
||||
Commands
|
||||
},
|
||||
{
|
||||
nth(Stream),
|
||||
topic_filter_pattern(),
|
||||
start_time(),
|
||||
shuffled(flat([non_empty(list({preserve, restore})), list(iterate)]))
|
||||
},
|
||||
begin
|
||||
Replay = {make_topic_filter(Pat, Topic), StartTime},
|
||||
Iterator = make_iterator(DB, Replay),
|
||||
Ctx = #{db => DB, replay => Replay},
|
||||
Messages = run_iterator_commands(Commands, Iterator, Ctx),
|
||||
equals(Messages, iterate_db(DB, Replay))
|
||||
end
|
||||
)
|
||||
end).
|
||||
|
||||
prop_iterate_eq_iterate_with_refresh() ->
|
||||
TBPL = [4, 8, 16, 12],
|
||||
Options = #{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => TBPL,
|
||||
epoch => 500
|
||||
},
|
||||
{DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
|
||||
?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
|
||||
% TODO
|
||||
% This proptest is also impure, see above.
|
||||
ok = store_db(DB, Stream),
|
||||
?FORALL(
|
||||
{
|
||||
{Topic, _},
|
||||
Pat,
|
||||
StartTime,
|
||||
RefreshEvery
|
||||
},
|
||||
{
|
||||
nth(Stream),
|
||||
topic_filter_pattern(),
|
||||
start_time(),
|
||||
pos_integer()
|
||||
},
|
||||
?TIMEOUT(5000, begin
|
||||
Replay = {make_topic_filter(Pat, Topic), StartTime},
|
||||
IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
|
||||
Iterator = make_iterator(DB, Replay, IterationOptions),
|
||||
Messages = iterate_db(Iterator),
|
||||
equals(Messages, iterate_db(DB, Replay))
|
||||
end)
|
||||
)
|
||||
end).
|
||||
|
||||
% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
|
||||
% MessageID = emqx_guid:gen(),
|
||||
% PublishedAt = ChunkNum,
|
||||
% MessageID, PublishedAt, Topic
|
||||
% ]),
|
||||
% ok = emqx_ds_message_storage_bitmask:store(DB, MessageID, PublishedAt, Topic, Payload),
|
||||
% store_message_stream(DB, payload_gen:next(Rest));
|
||||
% store_message_stream(_Zone, []) ->
|
||||
% ok.
|
||||
|
||||
store_db(DB, Messages) ->
|
||||
lists:foreach(
|
||||
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
|
||||
Bin = term_to_binary(Payload),
|
||||
emqx_ds_message_storage_bitmask:store(DB, MessageID, Timestamp, Topic, Bin)
|
||||
end,
|
||||
Messages
|
||||
).
|
||||
|
||||
iterate_db(DB, Iteration) ->
|
||||
iterate_db(make_iterator(DB, Iteration)).
|
||||
|
||||
iterate_db(It) ->
|
||||
case emqx_ds_message_storage_bitmask:next(It) of
|
||||
{value, Payload, ItNext} ->
|
||||
[binary_to_term(Payload) | iterate_db(ItNext)];
|
||||
none ->
|
||||
[]
|
||||
end.
|
||||
|
||||
make_iterator(DB, Replay) ->
|
||||
{ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay),
|
||||
It.
|
||||
|
||||
make_iterator(DB, Replay, Options) ->
|
||||
{ok, It} = emqx_ds_message_storage_bitmask:make_iterator(DB, Replay, Options),
|
||||
It.
|
||||
|
||||
run_iterator_commands([iterate | Rest], It, Ctx) ->
|
||||
case emqx_ds_message_storage_bitmask:next(It) of
|
||||
{value, Payload, ItNext} ->
|
||||
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, Ctx)];
|
||||
none ->
|
||||
[]
|
||||
end;
|
||||
run_iterator_commands([{preserve, restore} | Rest], It, Ctx) ->
|
||||
#{db := DB} = Ctx,
|
||||
Serial = emqx_ds_message_storage_bitmask:preserve_iterator(It),
|
||||
{ok, ItNext} = emqx_ds_message_storage_bitmask:restore_iterator(DB, Serial),
|
||||
run_iterator_commands(Rest, ItNext, Ctx);
|
||||
run_iterator_commands([], It, _Ctx) ->
|
||||
iterate_db(It).
|
||||
|
||||
store_shim(Shim, Messages) ->
|
||||
lists:foreach(
|
||||
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
|
||||
Bin = term_to_binary(Payload),
|
||||
emqx_ds_message_storage_bitmask_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
|
||||
end,
|
||||
Messages
|
||||
).
|
||||
|
||||
iterate_shim(Shim, Iteration) ->
|
||||
lists:map(
|
||||
fun binary_to_term/1,
|
||||
emqx_ds_message_storage_bitmask_shim:iterate(Shim, Iteration)
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setup / teardown
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
open_db(Filepath, Options) ->
|
||||
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
|
||||
{Schema, CFRefs} = emqx_ds_message_storage_bitmask:create_new(Handle, ?GEN_ID, Options),
|
||||
DB = emqx_ds_message_storage_bitmask:open(?SHARD, Handle, ?GEN_ID, CFRefs, Schema),
|
||||
{DB, Handle}.
|
||||
|
||||
close_db(Handle) ->
|
||||
rocksdb:close(Handle).
|
||||
|
||||
make_filepath(TC) ->
|
||||
make_filepath(TC, 0).
|
||||
|
||||
make_filepath(TC, InstID) ->
|
||||
Name = io_lib:format("~0p.~0p", [TC, InstID]),
|
||||
Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]),
|
||||
ok = filelib:ensure_dir(Path),
|
||||
Path.
|
||||
|
||||
get_run_id() ->
|
||||
case persistent_term:get(?RUN_ID, undefined) of
|
||||
RunID when RunID /= undefined ->
|
||||
RunID;
|
||||
undefined ->
|
||||
RunID = make_run_id(),
|
||||
ok = persistent_term:put(?RUN_ID, RunID),
|
||||
RunID
|
||||
end.
|
||||
|
||||
make_run_id() ->
|
||||
calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Type generators
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
topic() ->
|
||||
non_empty(list(topic_level())).
|
||||
|
||||
topic(EntropyWeights) ->
|
||||
?LET(L, scaled(1 / 4, list(1)), begin
|
||||
EWs = lists:sublist(EntropyWeights ++ L, length(L)),
|
||||
?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs])
|
||||
end).
|
||||
|
||||
topic_filter() ->
|
||||
?SUCHTHAT(
|
||||
L,
|
||||
non_empty(
|
||||
list(
|
||||
frequency([
|
||||
{5, topic_level()},
|
||||
{2, '+'},
|
||||
{1, '#'}
|
||||
])
|
||||
)
|
||||
),
|
||||
not lists:member('#', L) orelse lists:last(L) == '#'
|
||||
).
|
||||
|
||||
topic_level_pattern() ->
|
||||
frequency([
|
||||
{5, level},
|
||||
{2, '+'},
|
||||
{1, '#'}
|
||||
]).
|
||||
|
||||
topic_filter_pattern() ->
|
||||
list(topic_level_pattern()).
|
||||
|
||||
topic_filter(Topic) ->
|
||||
?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)).
|
||||
|
||||
make_topic_filter([], _) ->
|
||||
[];
|
||||
make_topic_filter(_, []) ->
|
||||
[];
|
||||
make_topic_filter(['#' | _], _) ->
|
||||
['#'];
|
||||
make_topic_filter(['+' | Rest], [_ | Levels]) ->
|
||||
['+' | make_topic_filter(Rest, Levels)];
|
||||
make_topic_filter([level | Rest], [L | Levels]) ->
|
||||
[L | make_topic_filter(Rest, Levels)].
|
||||
|
||||
% topic() ->
|
||||
% ?LAZY(?SIZED(S, frequency([
|
||||
% {S, [topic_level() | topic()]},
|
||||
% {1, []}
|
||||
% ]))).
|
||||
|
||||
% topic_filter() ->
|
||||
% ?LAZY(?SIZED(S, frequency([
|
||||
% {round(S / 3 * 2), [topic_level() | topic_filter()]},
|
||||
% {round(S / 3 * 1), ['+' | topic_filter()]},
|
||||
% {1, []},
|
||||
% {1, ['#']}
|
||||
% ]))).
|
||||
|
||||
topic_level() ->
|
||||
?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)).
|
||||
|
||||
topic_level(Entropy) ->
|
||||
S = floor(1 + math:log2(Entropy) / 4),
|
||||
?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).
|
||||
|
||||
topic_level_fixed() ->
|
||||
oneof([
|
||||
<<"foo">>,
|
||||
<<"bar">>,
|
||||
<<"baz">>,
|
||||
<<"xyzzy">>
|
||||
]).
|
||||
|
||||
keymapper() ->
|
||||
?LET(
|
||||
{TimestampBits, TopicBits, Epoch},
|
||||
{
|
||||
range(0, 128),
|
||||
non_empty(list(range(1, 32))),
|
||||
pos_integer()
|
||||
},
|
||||
make_keymapper(TimestampBits, TopicBits, Epoch * 100)
|
||||
).
|
||||
|
||||
keyspace_filter() ->
|
||||
?LET(
|
||||
{TopicFilter, StartTime, Keymapper},
|
||||
{topic_filter(), pos_integer(), keymapper()},
|
||||
emqx_ds_message_storage_bitmask:make_keyspace_filter({TopicFilter, StartTime}, Keymapper)
|
||||
).
|
||||
|
||||
messages(Topic) ->
|
||||
?LET(
|
||||
Ts,
|
||||
list(Topic),
|
||||
interleaved(
|
||||
?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages))
|
||||
)
|
||||
).
|
||||
|
||||
message() ->
|
||||
?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}).
|
||||
|
||||
message_streams(Topic) ->
|
||||
?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]).
|
||||
|
||||
timestamp() ->
|
||||
scaled(20, pos_integer()).
|
||||
|
||||
start_time() ->
|
||||
scaled(10, pos_integer()).
|
||||
|
||||
bitstr(Size) ->
|
||||
?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)).
|
||||
|
||||
nth(L) ->
|
||||
?LET(I, range(1, length(L)), lists:nth(I, L)).
|
||||
|
||||
scaled(Factor, T) ->
|
||||
?SIZED(S, resize(ceil(S * Factor), T)).
|
||||
|
||||
interleaved(T) ->
|
||||
?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))).
|
||||
|
||||
shuffled(T) ->
|
||||
?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))).
|
||||
|
||||
flat(T) ->
|
||||
?LET(L, T, lists:flatten(L)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
make_keymapper(TimestampBits, TopicBits, MaxEpoch) ->
|
||||
emqx_ds_message_storage_bitmask:make_keymapper(#{
|
||||
timestamp_bits => TimestampBits,
|
||||
topic_bits_per_level => TopicBits,
|
||||
epoch => MaxEpoch
|
||||
}).
|
||||
|
||||
get_keymapper_bitsize(Keymapper) ->
|
||||
maps:get(bitsize, emqx_ds_message_storage_bitmask:keymapper_info(Keymapper)).
|
||||
|
||||
-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}).
|
||||
interleave(Seqs, Rng) ->
|
||||
interleave(Seqs, length(Seqs), Rng).
|
||||
|
||||
interleave(Seqs, L, Rng) when L > 0 ->
|
||||
{N, RngNext} = rand:uniform_s(L, Rng),
|
||||
{SeqHead, SeqTail} = lists:split(N - 1, Seqs),
|
||||
case SeqTail of
|
||||
[{Tag, [M | Rest]} | SeqRest] ->
|
||||
[{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)];
|
||||
[{_, []} | SeqRest] ->
|
||||
interleave(SeqHead ++ SeqRest, L - 1, RngNext)
|
||||
end;
|
||||
interleave([], 0, _) ->
|
||||
[].
|
||||
|
||||
-spec shuffle(list(E), rand:state()) -> list(E).
|
||||
shuffle(L, Rng) ->
|
||||
{Rands, _} = randoms(length(L), Rng),
|
||||
[E || {_, E} <- lists:sort(lists:zip(Rands, L))].
|
||||
|
||||
randoms(N, Rng) when N > 0 ->
|
||||
{Rand, RngNext} = rand:uniform_s(Rng),
|
||||
{Tail, RngFinal} = randoms(N - 1, RngNext),
|
||||
{[Rand | Tail], RngFinal};
|
||||
randoms(_, Rng) ->
|
||||
{[], Rng}.
|
Loading…
Reference in New Issue