test(ds): Proptest that iteration is exhaustive

Compare iteration results against what an extremely simplified model
produces.
This commit is contained in:
Andrew Mayorov 2023-01-06 13:54:59 +03:00 committed by ieQu1
parent 60e3070328
commit ac0935ef91
2 changed files with 117 additions and 0 deletions

View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_replay_message_storage_shim).
-export([open/0]).
-export([close/1]).
-export([store/5]).
-export([iterate/3]).
-type topic() :: list(binary()).
-type time() :: integer().
-opaque t() :: ets:tid().
-spec open() -> t().
open() ->
ets:new(?MODULE, [ordered_set, {keypos, 1}]).
-spec close(t()) -> ok.
close(Tab) ->
true = ets:delete(Tab),
ok.
-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) ->
ok | {error, _TODO}.
store(Tab, MessageID, PublishedAt, Topic, Payload) ->
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
ok.
-spec iterate(t(), emqx_topic:words(), time()) ->
[binary()].
iterate(Tab, TopicFilter, StartTime) ->
ets:foldr(
fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
case emqx_topic:match(Topic, TopicFilter) of
true when PublishedAt >= StartTime ->
[Payload | Acc];
_ ->
Acc
end
end,
[],
Tab
).

View File

@ -70,6 +70,50 @@ prop_next_seek_eq_initial_seek() ->
emqx_replay_message_storage:compute_next_seek(0, Filter)
).
prop_iterate_messages() ->
TBPL = [4, 8, 12],
Options = #{
timestamp_bits => 32,
topic_bits_per_level => TBPL,
epoch => 200
},
% TODO
% Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit.
?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin
Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)),
{DB, Handle} = open_db(Filepath, Options),
Shim = emqx_replay_message_storage_shim:open(),
ok = store_db(DB, Stream),
ok = store_shim(Shim, Stream),
?FORALL(
{
{Topic, _},
Pattern,
StartTime
},
{
nth(Stream),
topic_filter_pattern(),
start_time()
},
begin
TopicFilter = make_topic_filter(Pattern, Topic),
Messages = iterate_db(DB, TopicFilter, StartTime),
Reference = iterate_shim(Shim, TopicFilter, StartTime),
ok = close_db(Handle),
ok = emqx_replay_message_storage_shim:close(Shim),
?WHENFAIL(
begin
io:format(user, " *** Filepath = ~s~n", [Filepath]),
io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]),
io:format(user, " *** StartTime = ~p~n", [StartTime])
end,
is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages)
)
end
)
end).
prop_iterate_eq_iterate_with_preserve_restore() ->
TBPL = [4, 8, 16, 12],
Options = #{
@ -154,6 +198,21 @@ run_iterator_commands([{preserve, restore} | Rest], It, DB) ->
run_iterator_commands([], It, _DB) ->
iterate_db(It).
store_shim(Shim, Messages) ->
lists:foreach(
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
Bin = term_to_binary(Payload),
emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
end,
Messages
).
iterate_shim(Shim, TopicFilter, StartTime) ->
lists:map(
fun binary_to_term/1,
emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime)
).
%%--------------------------------------------------------------------
%% Setup / teardown
%%--------------------------------------------------------------------