From ac0935ef912f54407c50fb403573e7290e80afd1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 6 Jan 2023 13:54:59 +0300 Subject: [PATCH] test(ds): Proptest that iteration is exhaustive Compare iteration results against what an extremely simplified model produces. --- .../emqx_replay_message_storage_shim.erl | 58 ++++++++++++++++++ .../props/prop_replay_message_storage.erl | 59 +++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl diff --git a/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl new file mode 100644 index 000000000..125c9a9fc --- /dev/null +++ b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_replay_message_storage_shim). + +-export([open/0]). +-export([close/1]). +-export([store/5]). +-export([iterate/3]). + +-type topic() :: list(binary()). +-type time() :: integer(). + +-opaque t() :: ets:tid(). + +-spec open() -> t(). +open() -> + ets:new(?MODULE, [ordered_set, {keypos, 1}]). + +-spec close(t()) -> ok. +close(Tab) -> + true = ets:delete(Tab), + ok. + +-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) -> + ok | {error, _TODO}. +store(Tab, MessageID, PublishedAt, Topic, Payload) -> + true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), + ok. + +-spec iterate(t(), emqx_topic:words(), time()) -> + [binary()]. +iterate(Tab, TopicFilter, StartTime) -> + ets:foldr( + fun({{PublishedAt, _}, Topic, Payload}, Acc) -> + case emqx_topic:match(Topic, TopicFilter) of + true when PublishedAt >= StartTime -> + [Payload | Acc]; + _ -> + Acc + end + end, + [], + Tab + ). diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index 8be5a5edb..9619c4f05 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -70,6 +70,50 @@ prop_next_seek_eq_initial_seek() -> emqx_replay_message_storage:compute_next_seek(0, Filter) ). +prop_iterate_messages() -> + TBPL = [4, 8, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 200 + }, + % TODO + % Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit. + ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin + Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), + {DB, Handle} = open_db(Filepath, Options), + Shim = emqx_replay_message_storage_shim:open(), + ok = store_db(DB, Stream), + ok = store_shim(Shim, Stream), + ?FORALL( + { + {Topic, _}, + Pattern, + StartTime + }, + { + nth(Stream), + topic_filter_pattern(), + start_time() + }, + begin + TopicFilter = make_topic_filter(Pattern, Topic), + Messages = iterate_db(DB, TopicFilter, StartTime), + Reference = iterate_shim(Shim, TopicFilter, StartTime), + ok = close_db(Handle), + ok = emqx_replay_message_storage_shim:close(Shim), + ?WHENFAIL( + begin + io:format(user, " *** Filepath = ~s~n", [Filepath]), + io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]), + io:format(user, " *** StartTime = ~p~n", [StartTime]) + end, + is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages) + ) + end + ) + end). + prop_iterate_eq_iterate_with_preserve_restore() -> TBPL = [4, 8, 16, 12], Options = #{ @@ -154,6 +198,21 @@ run_iterator_commands([{preserve, restore} | Rest], It, DB) -> run_iterator_commands([], It, _DB) -> iterate_db(It). +store_shim(Shim, Messages) -> + lists:foreach( + fun({Topic, Payload = {MessageID, Timestamp, _}}) -> + Bin = term_to_binary(Payload), + emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin) + end, + Messages + ). + +iterate_shim(Shim, TopicFilter, StartTime) -> + lists:map( + fun binary_to_term/1, + emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime) + ). + %%-------------------------------------------------------------------- %% Setup / teardown %%--------------------------------------------------------------------