From 7e13753ea5cfa97b983ad36e57d8c46c1f7a8f1f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 27 Dec 2022 13:47:38 +0300 Subject: [PATCH] test: add basic property tests --- .../test/emqx_replay_storage_SUITE.erl | 106 +++++ apps/emqx_replay/test/payload_gen.erl | 377 ++++++++++++++++++ 2 files changed, 483 insertions(+) create mode 100644 apps/emqx_replay/test/payload_gen.erl diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index e565df455..761ac041a 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -106,6 +106,10 @@ t_iterate_wildcard(Config) -> lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/#", 0)]) ), + ?assertEqual( + [], + lists:sort([binary_to_term(Payload) || Payload <- iterate(DB, "a/+/+", 0)]) + ), ok. store(DB, PublishedAt, Topic, Payload) -> @@ -129,6 +133,108 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic) -> emqx_topic:words(iolist_to_binary(Topic)). +%% + +t_prop_topic_hash_computes(_) -> + ?assert( + proper:quickcheck( + ?FORALL(Topic, topic(), begin + Hash = emqx_replay_message_storage:compute_topic_hash(Topic), + is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) + end) + ) + ). + +t_prop_hash_bitmask_computes(_) -> + ?assert( + proper:quickcheck( + ?FORALL(TopicFilter, topic_filter(), begin + Hash = emqx_replay_message_storage:compute_hash_bitmask(TopicFilter), + is_integer(Hash) andalso (byte_size(binary:encode_unsigned(Hash)) =< 8) + end) + ) + ). + +t_prop_iterate_stored_messages(Config) -> + DB = ?config(handle, Config), + ?assertEqual( + true, + proper:quickcheck( + ?FORALL( + Streams, + messages(), + begin + Stream = payload_gen:interleave_streams(Streams), + ok = store_message_stream(DB, Stream) + % TODO actually verify some property + end + ) + ) + ). + +store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> + MessageID = <>, + PublishedAt = rand:uniform(ChunkNum), + ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), + store_message_stream(DB, payload_gen:next(Rest)); +store_message_stream(_DB, []) -> + ok. + +messages() -> + ?LET(Topics, list(topic()), begin + [{Topic, payload_gen:binary_stream_gen(64)} || Topic <- Topics] + end). + +topic() -> + % TODO + % Somehow generate topic levels with variance according to the entropy distribution? + non_empty(list(topic_level())). + +topic(EntropyWeights) -> + ?LET( + L, + list(1), + % ?SIZED(S, [topic(S * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)]) + % [topic(10 * nth(I, EntropyWeights, 1)) || I <- lists:seq(1, Len)] + ?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))]) + ). + +topic_filter() -> + ?SUCHTHAT( + L, + non_empty( + list( + frequency([ + {5, topic_level()}, + {2, '+'}, + {1, '#'} + ]) + ) + ), + not lists:member('#', L) orelse lists:last(L) == '#' + ). + +% topic() -> +% ?LAZY(?SIZED(S, frequency([ +% {S, [topic_level() | topic()]}, +% {1, []} +% ]))). + +% topic_filter() -> +% ?LAZY(?SIZED(S, frequency([ +% {round(S / 3 * 2), [topic_level() | topic_filter()]}, +% {round(S / 3 * 1), ['+' | topic_filter()]}, +% {1, []}, +% {1, ['#']} +% ]))). + +topic_level() -> + ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). + +topic_level(Entropy) -> + S = floor(1 + math:log2(Entropy) / 4), + ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). + %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx_replay/test/payload_gen.erl b/apps/emqx_replay/test/payload_gen.erl new file mode 100644 index 000000000..17e68f8d5 --- /dev/null +++ b/apps/emqx_replay/test/payload_gen.erl @@ -0,0 +1,377 @@ +%% @doc This module provides lazy, composable producer streams that +%% can be considered counterparts to Archiver's consumer pipes and +%% therefore can facilitate testing +%% +%% Also it comes with an implementation of binary data stream which is +%% able to produce sufficiently large amounts of plausibly +%% pseudorandom binary payload in a deterministic way. It also +%% contains routines to check binary blobs via sampling +-module(payload_gen). + +-define(end_of_stream, []). + +-dialyzer(no_improper_lists). + +%% Generic stream API: +-export([ + interleave_streams/1, + retransmits/2, + next/1, + consume/2, + consume/1 +]). + +%% Binary payload generator API: +-export([ + interleave_chunks/2, + interleave_chunks/1, + + mb/1, + + generator_fun/2, + generate_chunks/3, + generate_chunk/2, + check_consistency/3, + check_file_consistency/3, + get_byte/2 +]). + +%% List to stream generator API: +-export([list_to_stream/1]). + +%% Proper generators: +-export([ + binary_stream_gen/1, + interleaved_streams_gen/1, + interleaved_binary_gen/1, + interleaved_list_gen/1 +]). + +-export_type([payload/0, binary_payload/0]). + +-define(hash_size, 16). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-type payload() :: {Seed :: term(), Size :: integer()}. + +-type binary_payload() :: { + binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer() +}. + +%% For performance reasons we treat regular lists as streams, see `next/1' +-opaque cont(Data) :: + fun(() -> stream(Data)) + | stream(Data). + +-type stream(Data) :: + maybe_improper_list(Data, cont(Data)) + | ?end_of_stream. + +-type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}). + +-record(chunk_state, { + seed :: term(), + payload_size :: non_neg_integer(), + offset :: non_neg_integer(), + chunk_size :: non_neg_integer() +}). + +-opaque chunk_state() :: #chunk_state{}. + +-record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}). + +-opaque interleave_state() :: #interleave_state{}. + +%% ============================================================================= +%% API functions +%% ============================================================================= + +%% ----------------------------------------------------------------------------- +%% Proper generators +%% ----------------------------------------------------------------------------- + +%% @doc Proper generator that creates a binary stream +-spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type(). +binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 -> + ?LET( + {Seed, Size}, + {nat(), range(1, 16#100000)}, + generate_chunk({Seed, Size}, ChunkSize) + ). + +%% @equiv interleaved_streams_gen(10, Type) +-spec interleaved_streams_gen(proper_types:type()) -> proper_types:type(). +interleaved_streams_gen(Type) -> + interleaved_streams_gen(10, Type). + +%% @doc Proper generator that creates a term of type +%% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed +%% into `interleave_streams/1' function +-spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) -> + proper_types:type(). +interleaved_streams_gen(MaxNStreams, StreamType) -> + ?LET( + NStreams, + range(1, MaxNStreams), + ?LET( + Streams, + vector(NStreams, StreamType), + begin + Tags = [<> || I <- lists:seq(1, length(Streams))], + lists:zip(Tags, Streams) + end + ) + ). + +-spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type(). +interleaved_binary_gen(ChunkSize) -> + interleaved_streams_gen(binary_stream_gen(ChunkSize)). + +-spec interleaved_list_gen(proper_types:type()) -> proper_types:type(). +interleaved_list_gen(Type) -> + interleaved_streams_gen(non_empty(list(Type))). + +%% ----------------------------------------------------------------------------- +%% Generic streams +%% ----------------------------------------------------------------------------- + +%% @doc Consume one element from the stream. +-spec next(cont(A)) -> stream(A). +next(Fun) when is_function(Fun, 0) -> + Fun(); +next(L) -> + L. + +%% @doc Take a list of tagged streams and return a stream where +%% elements of the streams are tagged and randomly interleaved. +%% +%% Note: this function is more or less generic and it's compatible +%% with this module's `generate_chunks' function family, as well as +%% `ets:next', lists and what not +%% +%% Consider using simplified versions of this function +-spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}). +interleave_streams(Streams) -> + do_interleave_streams( + #interleave_state{streams = Streams} + ). + +%% @doc Take an arbitrary stream and add repetitions of the elements +%% TODO: Make retransmissions of arbitrary length +-spec retransmits(stream(Data), float()) -> stream(Data). +retransmits(Stream, Probability) -> + case Stream of + [Data | Cont0] -> + Cont = fun() -> retransmits(next(Cont0), Probability) end, + case rand:uniform() < Probability of + true -> [Data, Data | Cont]; + false -> [Data | Cont] + end; + ?end_of_stream -> + ?end_of_stream + end. + +%% @doc Consume all elements of the stream and feed them into a +%% callback (e.g. brod:produce) +-spec consume( + stream(A), + fun((A) -> Ret) +) -> [Ret]. +consume(Stream, Callback) -> + case Stream of + [Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)]; + ?end_of_stream -> [] + end. + +%% @equiv consume(Stream, fun(A) -> A end) +-spec consume(stream(A)) -> [A]. +consume(Stream) -> + consume(Stream, fun(A) -> A end). + +%% ----------------------------------------------------------------------------- +%% Misc functions +%% ----------------------------------------------------------------------------- + +%% @doc Return number of bytes in `N' megabytes +-spec mb(integer()) -> integer(). +mb(N) -> + N * 1048576. + +%% ----------------------------------------------------------------------------- +%% List streams +%% ----------------------------------------------------------------------------- +-spec list_to_stream([A]) -> stream(A). +list_to_stream(L) -> L. + +%% ----------------------------------------------------------------------------- +%% Binary streams +%% ----------------------------------------------------------------------------- + +%% @doc First argument is a chunk number, the second one is a seed. +%% This implementation is hardly efficient, but it was chosen for +%% clarity reasons +-spec generator_fun(integer(), binary()) -> binary(). +generator_fun(N, Seed) -> + crypto:hash(md5, <>). + +%% @doc Get byte at offset `N' +-spec get_byte(integer(), term()) -> byte(). +get_byte(N, Seed) -> + do_get_byte(N, seed_hash(Seed)). + +%% @doc Stream of binary chunks. Limitation: both payload size and +%% `ChunkSize' should be dividable by `?hash_size' +-spec generate_chunk(payload(), integer()) -> stream(binary_payload()). +generate_chunk({Seed, Size}, ChunkSize) when + ChunkSize rem ?hash_size =:= 0 +-> + State = #chunk_state{ + seed = Seed, + payload_size = Size, + chunk_size = ChunkSize, + offset = 0 + }, + generate_chunk(State). + +%% @doc Take a list of `payload()'s and a callback function, and start +%% producing the payloads in random order. Seed is used as a tag +%% @see interleave_streams/4 +-spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) -> + tagged_binstream(). +interleave_chunks(Streams0) -> + Streams = [ + {Tag, generate_chunk(Payload, ChunkSize)} + || {Payload = {Tag, _}, ChunkSize} <- Streams0 + ], + interleave_streams(Streams). + +%% @doc Take a list of `payload()'s and a callback function, and start +%% consuming the payloads in a random order. Seed is used as a +%% tag. All streams use the same chunk size +%% @see interleave_streams/2 +-spec interleave_chunks( + [payload()], + non_neg_integer() +) -> tagged_binstream(). +interleave_chunks(Streams0, ChunkSize) -> + Streams = [ + {Seed, generate_chunk({Seed, Size}, ChunkSize)} + || {Seed, Size} <- Streams0 + ], + interleave_streams(Streams). + +%% @doc Generate chunks of data and feed them into +%% `Callback' +-spec generate_chunks( + payload(), + integer(), + fun((binary()) -> A) +) -> [A]. +generate_chunks(Payload, ChunkSize, Callback) -> + consume(generate_chunk(Payload, ChunkSize), Callback). + +-spec check_consistency( + payload(), + integer(), + fun((integer()) -> {ok, binary()} | undefined) +) -> ok. +check_consistency({Seed, Size}, SampleSize, Callback) -> + SeedHash = seed_hash(Seed), + Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)], + %% Always check first and last bytes, and one that should not exist: + Samples = [0, Size - 1, Size | Random], + lists:foreach( + fun + (N) when N < Size -> + Expected = do_get_byte(N, SeedHash), + ?assertEqual( + {N, {ok, Expected}}, + {N, Callback(N)} + ); + (N) -> + ?assertMatch(undefined, Callback(N)) + end, + Samples + ). + +-spec check_file_consistency( + payload(), + integer(), + file:filename() +) -> ok. +check_file_consistency(Payload, SampleSize, FileName) -> + {ok, FD} = file:open(FileName, [read, raw]), + try + Fun = fun(N) -> + case file:pread(FD, [{N, 1}]) of + {ok, [[X]]} -> {ok, X}; + {ok, [eof]} -> undefined + end + end, + check_consistency(Payload, SampleSize, Fun) + after + file:close(FD) + end. + +%% ============================================================================= +%% Internal functions +%% ============================================================================= + +-spec do_interleave_streams(interleave_state()) -> stream(_Data). +do_interleave_streams(#interleave_state{streams = []}) -> + ?end_of_stream; +do_interleave_streams(#interleave_state{streams = Streams} = State0) -> + %% Not the most efficient implementation (lots of avoidable list + %% traversals), but we don't expect the number of streams to be the + %% bottleneck + N = rand:uniform(length(Streams)), + {Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams), + case SC of + [Payload | SC1] -> + State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]}, + Cont = fun() -> do_interleave_streams(State) end, + [{Tag, Payload} | Cont]; + ?end_of_stream -> + State = State0#interleave_state{streams = Hd ++ Tl}, + do_interleave_streams(State) + end. + +%% @doc Continue generating chunks +-spec generate_chunk(chunk_state()) -> stream(binary()). +generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when + Offset >= Size +-> + ?end_of_stream; +generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) -> + State = State0#chunk_state{offset = Offset + ChunkSize}, + Payload = generate_chunk( + State#chunk_state.seed, + Offset, + ChunkSize, + State#chunk_state.payload_size + ), + [Payload | fun() -> generate_chunk(State) end]. + +generate_chunk(Seed, Offset, ChunkSize, Size) -> + SeedHash = seed_hash(Seed), + To = min(Offset + ChunkSize, Size) - 1, + Payload = iolist_to_binary([ + generator_fun(I, SeedHash) + || I <- lists:seq(Offset div 16, To div 16) + ]), + ChunkNum = Offset div ChunkSize + 1, + ChunkCnt = ceil(Size / ChunkSize), + {Payload, ChunkNum, ChunkCnt}. + +%% @doc Hash any term +-spec seed_hash(term()) -> binary(). +seed_hash(Seed) -> + crypto:hash(md5, term_to_binary(Seed)). + +%% @private Get byte at offset `N' +-spec do_get_byte(integer(), binary()) -> byte(). +do_get_byte(N, Seed) -> + Chunk = generator_fun(N div ?hash_size, Seed), + binary:at(Chunk, N rem ?hash_size).