diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index dafaf8c68..fe0a0e08a 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -97,20 +97,39 @@ -export([make_iterator/3]). -export([next/1]). +-export([preserve_iterator/1]). +-export([restore_iterator/2]). + %% Debug/troubleshooting: +%% Keymappers -export([ - make_message_key/4, + bitsize/1, compute_bitstring/3, compute_topic_bitmask/2, - compute_next_seek/4, - compute_time_seek/3, - compute_topic_seek/4, + compute_time_bitmask/1, hash/2 ]). +%% Keyspace filters +-export([ + make_keyspace_filter/3, + compute_initial_seek/1, + compute_next_seek/2, + compute_time_seek/3, + compute_topic_seek/4 +]). + -export_type([db/0, iterator/0, schema/0]). --compile({inline, [ones/1, bitwise_concat/3]}). +-compile( + {inline, [ + bitwise_concat/3, + ones/1, + successor/1, + topic_hash_matches/3, + time_matches/3 + ]} +). %%================================================================================ %% Type declarations @@ -159,9 +178,15 @@ -record(it, { handle :: rocksdb:itr_handle(), + filter :: keyspace_filter(), + cursor :: binary() | undefined, + next_action :: {seek, binary()} | next +}). + +-record(filter, { keymapper :: keymapper(), - next_action :: {seek, binary()} | next, topic_filter :: emqx_topic:words(), + start_time :: integer(), hash_bitfilter :: integer(), hash_bitmask :: integer(), time_bitfilter :: integer(), @@ -186,6 +211,7 @@ -opaque db() :: #db{}. -opaque iterator() :: #it{}. -type keymapper() :: #keymapper{}. +-type keyspace_filter() :: #filter{}. %%================================================================================ %% API funcions @@ -254,43 +280,81 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper), - HashBitmask = compute_topic_bitmask(TopicFilter, DB#db.keymapper), - TimeBitmask = compute_time_bitmask(DB#db.keymapper), - HashBitfilter = Bitstring band HashBitmask, - TimeBitfilter = Bitstring band TimeBitmask, - InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper), + % TODO earliest + Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), + InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), {ok, #it{ handle = ITHandle, - keymapper = DB#db.keymapper, - next_action = {seek, InitialSeek}, - topic_filter = TopicFilter, - hash_bitfilter = HashBitfilter, - hash_bitmask = HashBitmask, - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask + filter = Filter, + next_action = {seek, InitialSeek} }}; Err -> Err end. -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(It = #it{next_action = Action}) -> - case rocksdb:iterator_move(It#it.handle, Action) of +next(It = #it{filter = #filter{keymapper = Keymapper}}) -> + case rocksdb:iterator_move(It#it.handle, It#it.next_action) of % spec says `{ok, Key}` is also possible but the implementation says it's not {ok, Key, Value} -> - Bitstring = extract(Key, It#it.keymapper), - match_next(It, Bitstring, Value); + Bitstring = extract(Key, Keymapper), + case match_next(Bitstring, Value, It#it.filter) of + {_Topic, Payload} -> + % Preserve last seen key in the iterator so it could be restored later. + {value, Payload, It#it{cursor = Key, next_action = next}}; + next -> + next(It#it{next_action = next}); + NextBitstring when is_integer(NextBitstring) -> + NextSeek = combine(NextBitstring, <<>>, Keymapper), + next(It#it{next_action = {seek, NextSeek}}); + none -> + stop_iteration(It) + end; {error, invalid_iterator} -> stop_iteration(It); {error, iterator_closed} -> {error, closed} end. +-spec preserve_iterator(iterator()) -> binary(). +preserve_iterator(#it{cursor = Cursor, filter = Filter}) -> + State = #{ + v => 1, + cursor => Cursor, + filter => Filter#filter.topic_filter, + stime => Filter#filter.start_time + }, + term_to_binary(State). + +-spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}. +restore_iterator(DB, Serial) when is_binary(Serial) -> + State = binary_to_term(Serial), + restore_iterator(DB, State); +restore_iterator(DB, #{ + v := 1, + cursor := Cursor, + filter := TopicFilter, + stime := StartTime +}) -> + case make_iterator(DB, TopicFilter, StartTime) of + {ok, It} when Cursor == undefined -> + % Iterator was preserved right after it has been made. + {ok, It}; + {ok, It} -> + % Iterator was preserved mid-replay, seek right past the last seen key. + {ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}}; + Err -> + Err + end. + %%================================================================================ %% Internal exports %%================================================================================ +-spec bitsize(keymapper()) -> bits(). +bitsize(#keymapper{bitsize = Bitsize}) -> + Bitsize. + make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper). @@ -323,10 +387,47 @@ compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> compute_time_bitmask(#keymapper{source = Source}) -> compute_time_bitmask(Source, 0). +-spec hash(term(), bits()) -> integer(). hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). +-spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter(). +make_keyspace_filter(TopicFilter, StartTime, Keymapper) -> + Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), + HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), + TimeBitmask = compute_time_bitmask(Keymapper), + HashBitfilter = Bitstring band HashBitmask, + TimeBitfilter = Bitstring band TimeBitmask, + #filter{ + keymapper = Keymapper, + topic_filter = TopicFilter, + start_time = StartTime, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + }. + +-spec compute_initial_seek(keyspace_filter()) -> integer(). +compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) -> + % Should be the same as `compute_initial_seek(0, Filter)`. + HashBitfilter bor TimeBitfilter. + +-spec compute_next_seek(integer(), keyspace_filter()) -> integer(). +compute_next_seek( + Bitstring, + Filter = #filter{ + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter). + %%================================================================================ %% Internal functions %%================================================================================ @@ -356,8 +457,13 @@ compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) -> compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_topic_bitmask(_, [{hash, levels, Size} | Rest], Acc) -> - compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) -> + Mask = + case lists:member('+', Tail) orelse lists:member('#', Tail) of + true -> 0; + false -> ones(Size) + end, + compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size)); compute_topic_bitmask(_, [], Acc) -> Acc. @@ -374,6 +480,10 @@ bitwise_concat(Acc, Item, ItemSize) -> ones(Bits) -> 1 bsl Bits - 1. +-spec successor(key()) -> key(). +successor(Key) -> + <>. + %% |123|345|678| %% foo bar baz @@ -388,68 +498,72 @@ ones(Bits) -> %% |123|056|678| & |fff|000|fff| = |123|000|678|. match_next( - It = #it{ - keymapper = Keymapper, + Bitstring, + Value, + Filter = #filter{ topic_filter = TopicFilter, hash_bitfilter = HashBitfilter, hash_bitmask = HashBitmask, time_bitfilter = TimeBitfilter, time_bitmask = TimeBitmask - }, - Bitstring, - Value + } ) -> - HashMatches = (Bitstring band HashBitmask) == HashBitfilter, - TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter, + HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), case HashMatches and TimeMatches of true -> - {Topic, MessagePayload} = unwrap_message_value(Value), + Message = {Topic, _Payload} = unwrap_message_value(Value), case emqx_topic:match(Topic, TopicFilter) of true -> - {value, MessagePayload, It#it{next_action = next}}; + Message; false -> - next(It#it{next_action = next}) + next end; false -> - case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of - NextBitstring when is_integer(NextBitstring) -> - % ct:pal("Bitstring = ~32.16.0B", [Bitstring]), - % ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]), - % ct:pal("HBitmask = ~32.16.0B", [HashBitmask]), - % ct:pal("TBitmask = ~32.16.0B", [TimeBitmask]), - % ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]), - NextSeek = combine(NextBitstring, <<>>, Keymapper), - next(It#it{next_action = {seek, NextSeek}}); - none -> - stop_iteration(It) - end + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter) end. -stop_iteration(It) -> - ok = rocksdb:iterator_close(It#it.handle), - none. - %% `Bitstring` is out of the hash space defined by `HashBitfilter`. -compute_next_seek(_HashMatches = false, _, Bitstring, It) -> - NextBitstring = compute_topic_seek( - Bitstring, - It#it.hash_bitfilter, - It#it.hash_bitmask, - It#it.keymapper - ), +compute_next_seek( + _HashMatches = false, + _TimeMatches, + Bitstring, + Filter = #filter{ + keymapper = Keymapper, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper), case NextBitstring of none -> none; _ -> - TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter, - compute_next_seek(true, TimeMatches, NextBitstring, It) + TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(true, TimeMatches, NextBitstring, Filter) end; %% `Bitstring` is out of the time range defined by `TimeBitfilter`. -compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) -> - compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask); +compute_next_seek( + _HashMatches = true, + _TimeMatches = false, + Bitstring, + #filter{ + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask); compute_next_seek(true, true, Bitstring, _It) -> Bitstring. +topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) -> + (Bitstring band HashBitmask) == HashBitfilter. + +time_matches(Bitstring, TimeBitfilter, TimeBitmask) -> + (Bitstring band TimeBitmask) >= TimeBitfilter. + compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) -> % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`. (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter. @@ -466,7 +580,7 @@ compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> % NOTE % We're iterating through `Substring` here, in lockstep with `HashBitfilter` - % and`HashBitmask`, starting from least signigicant bits. Each bitsource in + % and `HashBitmask`, starting from least signigicant bits. Each bitsource in % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S` % bits long which we interpret as a "digit". There are 2 flavors of those % "digits": @@ -573,6 +687,10 @@ substring(I, Offset, Size) -> data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). +stop_iteration(It) -> + ok = rocksdb:iterator_close(It#it.handle), + none. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index 30850927b..3d7e7cb41 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -20,7 +20,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --include_lib("proper/include/proper.hrl"). -define(ZONE, zone(?FUNCTION_NAME)). @@ -116,6 +115,19 @@ t_iterate_wildcard(_Config) -> ), ok. +t_iterate_long_tail_wildcard(_Config) -> + Topic = "b/c/d/e/f/g", + TopicFilter = "b/c/d/e/+/+", + Timestamps = lists:seq(1, 100), + _ = [ + store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)]) + ). + store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). @@ -137,127 +149,33 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic) -> emqx_topic:words(iolist_to_binary(Topic)). -%% - -t_prop_topic_hash_computes(_) -> - Keymapper = emqx_replay_message_storage:make_keymapper(#{ - timestamp_bits => 32, - topic_bits_per_level => [8, 12, 16, 24], - epoch => 10000 - }), - ?assert( - proper:quickcheck( - ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin - BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), - is_integer(BS) andalso (BS < (1 bsl 92)) - end) - ) - ). - -t_prop_topic_bitmask_computes(_) -> - Keymapper = emqx_replay_message_storage:make_keymapper(#{ - timestamp_bits => 16, - topic_bits_per_level => [8, 12, 16], - epoch => 100 - }), - ?assert( - proper:quickcheck( - ?FORALL(TopicFilter, topic_filter(), begin - Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), - is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) - end) - ) - ). - -t_prop_iterate_stored_messages(_) -> - ?assertEqual( - true, - proper:quickcheck( - ?FORALL( - Streams, - messages(), - begin - Stream = payload_gen:interleave_streams(Streams), - ok = store_message_stream(?ZONE, Stream), - % TODO actually verify some property - true - end - ) - ) - ). - -store_message_stream(Zone, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> - MessageID = <>, - PublishedAt = rand:uniform(ChunkNum), - ok = emqx_replay_local_store:store(Zone, MessageID, PublishedAt, Topic, Payload), - store_message_stream(Zone, payload_gen:next(Rest)); -store_message_stream(_Zone, []) -> - ok. - -messages() -> - ?LET(Topics, list(topic()), begin - [{Topic, payload_gen:binary_stream_gen(64)} || Topic <- Topics] - end). - -topic() -> - % TODO - % Somehow generate topic levels with variance according to the entropy distribution? - non_empty(list(topic_level())). - -topic(EntropyWeights) -> - ?LET( - L, - list(1), - ?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))]) - ). - -topic_filter() -> - ?SUCHTHAT( - L, - non_empty( - list( - frequency([ - {5, topic_level()}, - {2, '+'}, - {1, '#'} - ]) - ) - ), - not lists:member('#', L) orelse lists:last(L) == '#' - ). - -% topic() -> -% ?LAZY(?SIZED(S, frequency([ -% {S, [topic_level() | topic()]}, -% {1, []} -% ]))). - -% topic_filter() -> -% ?LAZY(?SIZED(S, frequency([ -% {round(S / 3 * 2), [topic_level() | topic_filter()]}, -% {round(S / 3 * 1), ['+' | topic_filter()]}, -% {1, []}, -% {1, ['#']} -% ]))). - -topic_level() -> - ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). - -topic_level(Entropy) -> - S = floor(1 + math:log2(Entropy) / 4), - ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). - %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(TC, Config) -> +init_per_suite(Config) -> {ok, _} = application:ensure_all_started(emqx_replay), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_replay). + +init_per_testcase(TC, Config) -> + ok = set_zone_config(zone(TC), #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 32, 16], + epoch => 5 + }), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. -end_per_testcase(_TC, _Config) -> - ok = application:stop(emqx_replay). +end_per_testcase(TC, _Config) -> + ok = emqx_replay_local_store_sup:stop_zone(zone(TC)). zone(TC) -> list_to_atom(?MODULE_STRING ++ atom_to_list(TC)). + +set_zone_config(Zone, Options) -> + ok = application:set_env(emqx_replay, zone_config, #{ + Zone => {emqx_replay_message_storage, Options} + }). diff --git a/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl new file mode 100644 index 000000000..125c9a9fc --- /dev/null +++ b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_replay_message_storage_shim). + +-export([open/0]). +-export([close/1]). +-export([store/5]). +-export([iterate/3]). + +-type topic() :: list(binary()). +-type time() :: integer(). + +-opaque t() :: ets:tid(). + +-spec open() -> t(). +open() -> + ets:new(?MODULE, [ordered_set, {keypos, 1}]). + +-spec close(t()) -> ok. +close(Tab) -> + true = ets:delete(Tab), + ok. + +-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) -> + ok | {error, _TODO}. +store(Tab, MessageID, PublishedAt, Topic, Payload) -> + true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), + ok. + +-spec iterate(t(), emqx_topic:words(), time()) -> + [binary()]. +iterate(Tab, TopicFilter, StartTime) -> + ets:foldr( + fun({{PublishedAt, _}, Topic, Payload}, Acc) -> + case emqx_topic:match(Topic, TopicFilter) of + true when PublishedAt >= StartTime -> + [Payload | Acc]; + _ -> + Acc + end + end, + [], + Tab + ). diff --git a/apps/emqx_replay/test/payload_gen.erl b/apps/emqx_replay/test/props/payload_gen.erl similarity index 100% rename from apps/emqx_replay/test/payload_gen.erl rename to apps/emqx_replay/test/props/payload_gen.erl diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl new file mode 100644 index 000000000..9619c4f05 --- /dev/null +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -0,0 +1,426 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_replay_message_storage). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(WORK_DIR, ["_build", "test"]). +-define(RUN_ID, {?MODULE, testrun_id}). +-define(GEN_ID, 42). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_bitstring_computes() -> + ?FORALL(Keymapper, keymapper(), begin + Bitsize = emqx_replay_message_storage:bitsize(Keymapper), + ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin + BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), + is_integer(BS) andalso (BS < (1 bsl Bitsize)) + end) + end). + +prop_topic_bitmask_computes() -> + Keymapper = make_keymapper(16, [8, 12, 16], 100), + ?FORALL(TopicFilter, topic_filter(), begin + Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), + % topic bits + timestamp LSBs + is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) + end). + +prop_next_seek_monotonic() -> + ?FORALL( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + begin + Filter = emqx_replay_message_storage:make_keyspace_filter( + TopicFilter, + StartTime, + Keymapper + ), + ?FORALL( + Bitstring, + bitstr(emqx_replay_message_storage:bitsize(Keymapper)), + emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring + ) + end + ). + +prop_next_seek_eq_initial_seek() -> + ?FORALL( + Filter, + keyspace_filter(), + emqx_replay_message_storage:compute_initial_seek(Filter) =:= + emqx_replay_message_storage:compute_next_seek(0, Filter) + ). + +prop_iterate_messages() -> + TBPL = [4, 8, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 200 + }, + % TODO + % Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit. + ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin + Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), + {DB, Handle} = open_db(Filepath, Options), + Shim = emqx_replay_message_storage_shim:open(), + ok = store_db(DB, Stream), + ok = store_shim(Shim, Stream), + ?FORALL( + { + {Topic, _}, + Pattern, + StartTime + }, + { + nth(Stream), + topic_filter_pattern(), + start_time() + }, + begin + TopicFilter = make_topic_filter(Pattern, Topic), + Messages = iterate_db(DB, TopicFilter, StartTime), + Reference = iterate_shim(Shim, TopicFilter, StartTime), + ok = close_db(Handle), + ok = emqx_replay_message_storage_shim:close(Shim), + ?WHENFAIL( + begin + io:format(user, " *** Filepath = ~s~n", [Filepath]), + io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]), + io:format(user, " *** StartTime = ~p~n", [StartTime]) + end, + is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages) + ) + end + ) + end). + +prop_iterate_eq_iterate_with_preserve_restore() -> + TBPL = [4, 8, 16, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 500 + }, + {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), + ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin + % TODO + % This proptest is impure because messages from testruns assumed to be + % independent of each other are accumulated in the same storage. This + % would probably confuse shrinker in the event a testrun fails. + ok = store_db(DB, Stream), + ?FORALL( + { + {Topic, _}, + Pat, + StartTime, + Commands + }, + { + nth(Stream), + topic_filter_pattern(), + start_time(), + shuffled(flat([non_empty(list({preserve, restore})), list(iterate)])) + }, + begin + TopicFilter = make_topic_filter(Pat, Topic), + Iterator = make_iterator(DB, TopicFilter, StartTime), + Messages = run_iterator_commands(Commands, Iterator, DB), + equals(Messages, iterate_db(DB, TopicFilter, StartTime)) + end + ) + end). + +% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> +% MessageID = emqx_guid:gen(), +% PublishedAt = ChunkNum, +% MessageID, PublishedAt, Topic +% ]), +% ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), +% store_message_stream(DB, payload_gen:next(Rest)); +% store_message_stream(_Zone, []) -> +% ok. + +store_db(DB, Messages) -> + lists:foreach( + fun({Topic, Payload = {MessageID, Timestamp, _}}) -> + Bin = term_to_binary(Payload), + emqx_replay_message_storage:store(DB, MessageID, Timestamp, Topic, Bin) + end, + Messages + ). + +iterate_db(DB, TopicFilter, StartTime) -> + iterate_db(make_iterator(DB, TopicFilter, StartTime)). + +iterate_db(It) -> + case emqx_replay_message_storage:next(It) of + {value, Payload, ItNext} -> + [binary_to_term(Payload) | iterate_db(ItNext)]; + none -> + [] + end. + +make_iterator(DB, TopicFilter, StartTime) -> + {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime), + It. + +run_iterator_commands([iterate | Rest], It, DB) -> + case emqx_replay_message_storage:next(It) of + {value, Payload, ItNext} -> + [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, DB)]; + none -> + [] + end; +run_iterator_commands([{preserve, restore} | Rest], It, DB) -> + Serial = emqx_replay_message_storage:preserve_iterator(It), + {ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Serial), + run_iterator_commands(Rest, ItNext, DB); +run_iterator_commands([], It, _DB) -> + iterate_db(It). + +store_shim(Shim, Messages) -> + lists:foreach( + fun({Topic, Payload = {MessageID, Timestamp, _}}) -> + Bin = term_to_binary(Payload), + emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin) + end, + Messages + ). + +iterate_shim(Shim, TopicFilter, StartTime) -> + lists:map( + fun binary_to_term/1, + emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime) + ). + +%%-------------------------------------------------------------------- +%% Setup / teardown +%%-------------------------------------------------------------------- + +open_db(Filepath, Options) -> + {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), + {Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options), + DB = emqx_replay_message_storage:open(Handle, ?GEN_ID, CFRefs, Schema), + {DB, Handle}. + +close_db(Handle) -> + rocksdb:close(Handle). + +make_filepath(TC) -> + make_filepath(TC, 0). + +make_filepath(TC, InstID) -> + Name = io_lib:format("~0p.~0p", [TC, InstID]), + Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]), + ok = filelib:ensure_dir(Path), + Path. + +get_run_id() -> + case persistent_term:get(?RUN_ID, undefined) of + RunID when RunID /= undefined -> + RunID; + undefined -> + RunID = make_run_id(), + ok = persistent_term:put(?RUN_ID, RunID), + RunID + end. + +make_run_id() -> + calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]). + +%%-------------------------------------------------------------------- +%% Type generators +%%-------------------------------------------------------------------- + +topic() -> + non_empty(list(topic_level())). + +topic(EntropyWeights) -> + ?LET(L, scaled(1 / 4, list(1)), begin + EWs = lists:sublist(EntropyWeights ++ L, length(L)), + ?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs]) + end). + +topic_filter() -> + ?SUCHTHAT( + L, + non_empty( + list( + frequency([ + {5, topic_level()}, + {2, '+'}, + {1, '#'} + ]) + ) + ), + not lists:member('#', L) orelse lists:last(L) == '#' + ). + +topic_level_pattern() -> + frequency([ + {5, level}, + {2, '+'}, + {1, '#'} + ]). + +topic_filter_pattern() -> + list(topic_level_pattern()). + +topic_filter(Topic) -> + ?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)). + +make_topic_filter([], _) -> + []; +make_topic_filter(_, []) -> + []; +make_topic_filter(['#' | _], _) -> + ['#']; +make_topic_filter(['+' | Rest], [_ | Levels]) -> + ['+' | make_topic_filter(Rest, Levels)]; +make_topic_filter([level | Rest], [L | Levels]) -> + [L | make_topic_filter(Rest, Levels)]. + +% topic() -> +% ?LAZY(?SIZED(S, frequency([ +% {S, [topic_level() | topic()]}, +% {1, []} +% ]))). + +% topic_filter() -> +% ?LAZY(?SIZED(S, frequency([ +% {round(S / 3 * 2), [topic_level() | topic_filter()]}, +% {round(S / 3 * 1), ['+' | topic_filter()]}, +% {1, []}, +% {1, ['#']} +% ]))). + +topic_level() -> + ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). + +topic_level(Entropy) -> + S = floor(1 + math:log2(Entropy) / 4), + ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). + +topic_level_fixed() -> + oneof([ + <<"foo">>, + <<"bar">>, + <<"baz">>, + <<"xyzzy">> + ]). + +keymapper() -> + ?LET( + {TimestampBits, TopicBits, Epoch}, + { + range(0, 128), + non_empty(list(range(1, 32))), + pos_integer() + }, + make_keymapper(TimestampBits, TopicBits, Epoch * 100) + ). + +keyspace_filter() -> + ?LET( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper) + ). + +messages(Topic) -> + ?LET( + Ts, + list(Topic), + interleaved( + ?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages)) + ) + ). + +message() -> + ?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}). + +message_streams(Topic) -> + ?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]). + +timestamp() -> + scaled(20, pos_integer()). + +start_time() -> + scaled(10, pos_integer()). + +bitstr(Size) -> + ?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)). + +nth(L) -> + ?LET(I, range(1, length(L)), lists:nth(I, L)). + +scaled(Factor, T) -> + ?SIZED(S, resize(ceil(S * Factor), T)). + +interleaved(T) -> + ?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))). + +shuffled(T) -> + ?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))). + +flat(T) -> + ?LET(L, T, lists:flatten(L)). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> + emqx_replay_message_storage:make_keymapper(#{ + timestamp_bits => TimestampBits, + topic_bits_per_level => TopicBits, + epoch => MaxEpoch + }). + +-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). +interleave(Seqs, Rng) -> + interleave(Seqs, length(Seqs), Rng). + +interleave(Seqs, L, Rng) when L > 0 -> + {N, RngNext} = rand:uniform_s(L, Rng), + {SeqHead, SeqTail} = lists:split(N - 1, Seqs), + case SeqTail of + [{Tag, [M | Rest]} | SeqRest] -> + [{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)]; + [{_, []} | SeqRest] -> + interleave(SeqHead ++ SeqRest, L - 1, RngNext) + end; +interleave([], 0, _) -> + []. + +-spec shuffle(list(E), rand:state()) -> list(E). +shuffle(L, Rng) -> + {Rands, _} = randoms(length(L), Rng), + [E || {_, E} <- lists:sort(lists:zip(Rands, L))]. + +randoms(N, Rng) when N > 0 -> + {Rand, RngNext} = rand:uniform_s(Rng), + {Tail, RngFinal} = randoms(N - 1, RngNext), + {[Rand | Tail], RngFinal}; +randoms(_, Rng) -> + {[], Rng}.