test: add basic property tests

This commit is contained in:
Andrew Mayorov 2022-12-27 13:47:38 +03:00
parent fcc8a4bcce
commit 7e13753ea5
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 483 additions and 0 deletions

View File

@ -106,6 +106,10 @@ t_iterate_wildcard(Config) ->
lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/#", 0)])
),
?assertEqual(
[],
lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/+/+", 0)])
),
ok.
store(DB, PublishedAt, Topic, Payload) ->
@ -129,6 +133,108 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
parse_topic(Topic) ->
emqx_topic:words(iolist_to_binary(Topic)).
%%
t_prop_topic_hash_computes(_) ->
?assert(
proper:quickcheck(
?FORALL(Topic, topic(), begin
Hash = emqx_replay_message_storage:compute_topic_hash(Topic),
is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
end)
)
).
t_prop_hash_bitmask_computes(_) ->
?assert(
proper:quickcheck(
?FORALL(TopicFilter, topic_filter(), begin
Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter),
is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8)
end)
)
).
t_prop_iterate_stored_messages(Config) ->
DB = ?config(handle, Config),
?assertEqual(
true,
proper:quickcheck(
?FORALL(
Streams,
messages(),
begin
Stream = payload_gen:interleave_streams(Streams),
ok = store_message_stream(DB, Stream)
% TODO actually verify some property
end
)
)
).
store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
MessageID = <<ChunkNum:32>>,
PublishedAt = rand:uniform(ChunkNum),
ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload),
store_message_stream(DB, payload_gen:next(Rest));
store_message_stream(_DB, []) ->
ok.
messages() ->
?LET(Topics, list(topic()), begin
[{Topic, payload_gen:binary_stream_gen(64)} || Topic <- Topics]
end).
topic() ->
% TODO
% Somehow generate topic levels with variance according to the entropy distribution?
non_empty(list(topic_level())).
topic(EntropyWeights) ->
?LET(
L,
list(1),
% ?SIZED(S, [topic(S * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)])
% [topic(10 * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)]
?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))])
).
topic_filter() ->
?SUCHTHAT(
L,
non_empty(
list(
frequency([
{5, topic_level()},
{2, '+'},
{1, '#'}
])
)
),
not lists:member('#', L) orelse lists:last(L) == '#'
).
% topic() ->
% ?LAZY(?SIZED(S, frequency([
% {S, [topic_level() | topic()]},
% {1, []}
% ]))).
% topic_filter() ->
% ?LAZY(?SIZED(S, frequency([
% {round(S / 3 * 2), [topic_level() | topic_filter()]},
% {round(S / 3 * 1), ['+' | topic_filter()]},
% {1, []},
% {1, ['#']}
% ]))).
topic_level() ->
?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)).
topic_level(Entropy) ->
S = floor(1 + math:log2(Entropy) / 4),
?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).

View File

@ -0,0 +1,377 @@
%% @doc This module provides lazy, composable producer streams that
%% can be considered counterparts to Archiver's consumer pipes and
%% therefore can facilitate testing
%%
%% Also it comes with an implementation of binary data stream which is
%% able to produce sufficiently large amounts of plausibly
%% pseudorandom binary payload in a deterministic way. It also
%% contains routines to check binary blobs via sampling
-module(payload_gen).
-define(end_of_stream, []).
-dialyzer(no_improper_lists).
%% Generic stream API:
-export([
interleave_streams/1,
retransmits/2,
next/1,
consume/2,
consume/1
]).
%% Binary payload generator API:
-export([
interleave_chunks/2,
interleave_chunks/1,
mb/1,
generator_fun/2,
generate_chunks/3,
generate_chunk/2,
check_consistency/3,
check_file_consistency/3,
get_byte/2
]).
%% List to stream generator API:
-export([list_to_stream/1]).
%% Proper generators:
-export([
binary_stream_gen/1,
interleaved_streams_gen/1,
interleaved_binary_gen/1,
interleaved_list_gen/1
]).
-export_type([payload/0, binary_payload/0]).
-define(hash_size, 16).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-type payload() :: {Seed :: term(), Size :: integer()}.
-type binary_payload() :: {
binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
}.
%% For performance reasons we treat regular lists as streams, see `next/1'
-opaque cont(Data) ::
fun(() -> stream(Data))
| stream(Data).
-type stream(Data) ::
maybe_improper_list(Data, cont(Data))
| ?end_of_stream.
-type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}).
-record(chunk_state, {
seed :: term(),
payload_size :: non_neg_integer(),
offset :: non_neg_integer(),
chunk_size :: non_neg_integer()
}).
-opaque chunk_state() :: #chunk_state{}.
-record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}).
-opaque interleave_state() :: #interleave_state{}.
%% =============================================================================
%% API functions
%% =============================================================================
%% -----------------------------------------------------------------------------
%% Proper generators
%% -----------------------------------------------------------------------------
%% @doc Proper generator that creates a binary stream
-spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type().
binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
?LET(
{Seed, Size},
{nat(), range(1, 16#100000)},
generate_chunk({Seed, Size}, ChunkSize)
).
%% @equiv interleaved_streams_gen(10, Type)
-spec interleaved_streams_gen(proper_types:type()) -> proper_types:type().
interleaved_streams_gen(Type) ->
interleaved_streams_gen(10, Type).
%% @doc Proper generator that creates a term of type
%% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed
%% into `interleave_streams/1' function
-spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) ->
proper_types:type().
interleaved_streams_gen(MaxNStreams, StreamType) ->
?LET(
NStreams,
range(1, MaxNStreams),
?LET(
Streams,
vector(NStreams, StreamType),
begin
Tags = [<<I/integer>> || I <- lists:seq(1, length(Streams))],
lists:zip(Tags, Streams)
end
)
).
-spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type().
interleaved_binary_gen(ChunkSize) ->
interleaved_streams_gen(binary_stream_gen(ChunkSize)).
-spec interleaved_list_gen(proper_types:type()) -> proper_types:type().
interleaved_list_gen(Type) ->
interleaved_streams_gen(non_empty(list(Type))).
%% -----------------------------------------------------------------------------
%% Generic streams
%% -----------------------------------------------------------------------------
%% @doc Consume one element from the stream.
-spec next(cont(A)) -> stream(A).
next(Fun) when is_function(Fun, 0) ->
Fun();
next(L) ->
L.
%% @doc Take a list of tagged streams and return a stream where
%% elements of the streams are tagged and randomly interleaved.
%%
%% Note: this function is more or less generic and it's compatible
%% with this module's `generate_chunks' function family, as well as
%% `ets:next', lists and what not
%%
%% Consider using simplified versions of this function
-spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}).
interleave_streams(Streams) ->
do_interleave_streams(
#interleave_state{streams = Streams}
).
%% @doc Take an arbitrary stream and add repetitions of the elements
%% TODO: Make retransmissions of arbitrary length
-spec retransmits(stream(Data), float()) -> stream(Data).
retransmits(Stream, Probability) ->
case Stream of
[Data | Cont0] ->
Cont = fun() -> retransmits(next(Cont0), Probability) end,
case rand:uniform() < Probability of
true -> [Data, Data | Cont];
false -> [Data | Cont]
end;
?end_of_stream ->
?end_of_stream
end.
%% @doc Consume all elements of the stream and feed them into a
%% callback (e.g. brod:produce)
-spec consume(
stream(A),
fun((A) -> Ret)
) -> [Ret].
consume(Stream, Callback) ->
case Stream of
[Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)];
?end_of_stream -> []
end.
%% @equiv consume(Stream, fun(A) -> A end)
-spec consume(stream(A)) -> [A].
consume(Stream) ->
consume(Stream, fun(A) -> A end).
%% -----------------------------------------------------------------------------
%% Misc functions
%% -----------------------------------------------------------------------------
%% @doc Return number of bytes in `N' megabytes
-spec mb(integer()) -> integer().
mb(N) ->
N * 1048576.
%% -----------------------------------------------------------------------------
%% List streams
%% -----------------------------------------------------------------------------
-spec list_to_stream([A]) -> stream(A).
list_to_stream(L) -> L.
%% -----------------------------------------------------------------------------
%% Binary streams
%% -----------------------------------------------------------------------------
%% @doc First argument is a chunk number, the second one is a seed.
%% This implementation is hardly efficient, but it was chosen for
%% clarity reasons
-spec generator_fun(integer(), binary()) -> binary().
generator_fun(N, Seed) ->
crypto:hash(md5, <<N:32, Seed/binary>>).
%% @doc Get byte at offset `N'
-spec get_byte(integer(), term()) -> byte().
get_byte(N, Seed) ->
do_get_byte(N, seed_hash(Seed)).
%% @doc Stream of binary chunks. Limitation: both payload size and
%% `ChunkSize' should be dividable by `?hash_size'
-spec generate_chunk(payload(), integer()) -> stream(binary_payload()).
generate_chunk({Seed, Size}, ChunkSize) when
ChunkSize rem ?hash_size =:= 0
->
State = #chunk_state{
seed = Seed,
payload_size = Size,
chunk_size = ChunkSize,
offset = 0
},
generate_chunk(State).
%% @doc Take a list of `payload()'s and a callback function, and start
%% producing the payloads in random order. Seed is used as a tag
%% @see interleave_streams/4
-spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) ->
tagged_binstream().
interleave_chunks(Streams0) ->
Streams = [
{Tag, generate_chunk(Payload, ChunkSize)}
|| {Payload = {Tag, _}, ChunkSize} <- Streams0
],
interleave_streams(Streams).
%% @doc Take a list of `payload()'s and a callback function, and start
%% consuming the payloads in a random order. Seed is used as a
%% tag. All streams use the same chunk size
%% @see interleave_streams/2
-spec interleave_chunks(
[payload()],
non_neg_integer()
) -> tagged_binstream().
interleave_chunks(Streams0, ChunkSize) ->
Streams = [
{Seed, generate_chunk({Seed, Size}, ChunkSize)}
|| {Seed, Size} <- Streams0
],
interleave_streams(Streams).
%% @doc Generate chunks of data and feed them into
%% `Callback'
-spec generate_chunks(
payload(),
integer(),
fun((binary()) -> A)
) -> [A].
generate_chunks(Payload, ChunkSize, Callback) ->
consume(generate_chunk(Payload, ChunkSize), Callback).
-spec check_consistency(
payload(),
integer(),
fun((integer()) -> {ok, binary()} | undefined)
) -> ok.
check_consistency({Seed, Size}, SampleSize, Callback) ->
SeedHash = seed_hash(Seed),
Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
%% Always check first and last bytes, and one that should not exist:
Samples = [0, Size - 1, Size | Random],
lists:foreach(
fun
(N) when N < Size ->
Expected = do_get_byte(N, SeedHash),
?assertEqual(
{N, {ok, Expected}},
{N, Callback(N)}
);
(N) ->
?assertMatch(undefined, Callback(N))
end,
Samples
).
-spec check_file_consistency(
payload(),
integer(),
file:filename()
) -> ok.
check_file_consistency(Payload, SampleSize, FileName) ->
{ok, FD} = file:open(FileName, [read, raw]),
try
Fun = fun(N) ->
case file:pread(FD, [{N, 1}]) of
{ok, [[X]]} -> {ok, X};
{ok, [eof]} -> undefined
end
end,
check_consistency(Payload, SampleSize, Fun)
after
file:close(FD)
end.
%% =============================================================================
%% Internal functions
%% =============================================================================
-spec do_interleave_streams(interleave_state()) -> stream(_Data).
do_interleave_streams(#interleave_state{streams = []}) ->
?end_of_stream;
do_interleave_streams(#interleave_state{streams = Streams} = State0) ->
%% Not the most efficient implementation (lots of avoidable list
%% traversals), but we don't expect the number of streams to be the
%% bottleneck
N = rand:uniform(length(Streams)),
{Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams),
case SC of
[Payload | SC1] ->
State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]},
Cont = fun() -> do_interleave_streams(State) end,
[{Tag, Payload} | Cont];
?end_of_stream ->
State = State0#interleave_state{streams = Hd ++ Tl},
do_interleave_streams(State)
end.
%% @doc Continue generating chunks
-spec generate_chunk(chunk_state()) -> stream(binary()).
generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when
Offset >= Size
->
?end_of_stream;
generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
State = State0#chunk_state{offset = Offset + ChunkSize},
Payload = generate_chunk(
State#chunk_state.seed,
Offset,
ChunkSize,
State#chunk_state.payload_size
),
[Payload | fun() -> generate_chunk(State) end].
generate_chunk(Seed, Offset, ChunkSize, Size) ->
SeedHash = seed_hash(Seed),
To = min(Offset + ChunkSize, Size) - 1,
Payload = iolist_to_binary([
generator_fun(I, SeedHash)
|| I <- lists:seq(Offset div 16, To div 16)
]),
ChunkNum = Offset div ChunkSize + 1,
ChunkCnt = ceil(Size / ChunkSize),
{Payload, ChunkNum, ChunkCnt}.
%% @doc Hash any term
-spec seed_hash(term()) -> binary().
seed_hash(Seed) ->
crypto:hash(md5, term_to_binary(Seed)).
%% @private Get byte at offset `N'
-spec do_get_byte(integer(), binary()) -> byte().
do_get_byte(N, Seed) ->
Chunk = generator_fun(N div ?hash_size, Seed),
binary:at(Chunk, N rem ?hash_size).