From 4b8dbca232800b99f08660abeac4565b42219d70 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 4 Jan 2023 22:02:53 +0300 Subject: [PATCH 1/9] refactor: introduce keyspace filter concept So we could conveniently test it separately. --- .../src/emqx_replay_message_storage.erl | 185 ++++++++++++------ 1 file changed, 125 insertions(+), 60 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index dafaf8c68..3988e97dc 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -98,16 +98,24 @@ -export([next/1]). %% Debug/troubleshooting: +%% Keymappers -export([ - make_message_key/4, + bitsize/1, compute_bitstring/3, compute_topic_bitmask/2, - compute_next_seek/4, - compute_time_seek/3, - compute_topic_seek/4, + compute_time_bitmask/1, hash/2 ]). +%% Keyspace filters +-export([ + make_keyspace_filter/3, + compute_initial_seek/1, + compute_next_seek/2, + compute_time_seek/3, + compute_topic_seek/4 +]). + -export_type([db/0, iterator/0, schema/0]). -compile({inline, [ones/1, bitwise_concat/3]}). @@ -159,8 +167,12 @@ -record(it, { handle :: rocksdb:itr_handle(), + filter :: keyspace_filter(), + next_action :: {seek, binary()} | next +}). + +-record(filter, { keymapper :: keymapper(), - next_action :: {seek, binary()} | next, topic_filter :: emqx_topic:words(), hash_bitfilter :: integer(), hash_bitmask :: integer(), @@ -186,6 +198,7 @@ -opaque db() :: #db{}. -opaque iterator() :: #it{}. -type keymapper() :: #keymapper{}. +-type keyspace_filter() :: #filter{}. %%================================================================================ %% API funcions @@ -254,33 +267,35 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper), - HashBitmask = compute_topic_bitmask(TopicFilter, DB#db.keymapper), - TimeBitmask = compute_time_bitmask(DB#db.keymapper), - HashBitfilter = Bitstring band HashBitmask, - TimeBitfilter = Bitstring band TimeBitmask, - InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper), + % TODO earliest + Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), + InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), {ok, #it{ handle = ITHandle, - keymapper = DB#db.keymapper, - next_action = {seek, InitialSeek}, - topic_filter = TopicFilter, - hash_bitfilter = HashBitfilter, - hash_bitmask = HashBitmask, - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask + filter = Filter, + next_action = {seek, InitialSeek} }}; Err -> Err end. -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(It = #it{next_action = Action}) -> - case rocksdb:iterator_move(It#it.handle, Action) of +next(It = #it{filter = #filter{keymapper = Keymapper}}) -> + case rocksdb:iterator_move(It#it.handle, It#it.next_action) of % spec says `{ok, Key}` is also possible but the implementation says it's not {ok, Key, Value} -> - Bitstring = extract(Key, It#it.keymapper), - match_next(It, Bitstring, Value); + Bitstring = extract(Key, Keymapper), + case match_next(Bitstring, Value, It#it.filter) of + {_Topic, Payload} -> + {value, Payload, It#it{next_action = next}}; + next -> + next(It#it{next_action = next}); + NextBitstring when is_integer(NextBitstring) -> + NextSeek = combine(NextBitstring, <<>>, Keymapper), + next(It#it{next_action = {seek, NextSeek}}); + none -> + stop_iteration(It) + end; {error, invalid_iterator} -> stop_iteration(It); {error, iterator_closed} -> @@ -291,6 +306,18 @@ next(It = #it{next_action = Action}) -> %% Internal exports %%================================================================================ +-define(topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + (Bitstring band HashBitmask) == HashBitfilter +). + +-define(time_matches(Bitstring, TimeBitfilter, TimeBitmask), + (Bitstring band TimeBitmask) >= TimeBitfilter +). + +-spec bitsize(keymapper()) -> bits(). +bitsize(#keymapper{bitsize = Bitsize}) -> + Bitsize. + make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper). @@ -323,10 +350,46 @@ compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> compute_time_bitmask(#keymapper{source = Source}) -> compute_time_bitmask(Source, 0). +-spec hash(term(), bits()) -> integer(). hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). +-spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter(). +make_keyspace_filter(TopicFilter, StartTime, Keymapper) -> + Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), + HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), + TimeBitmask = compute_time_bitmask(Keymapper), + HashBitfilter = Bitstring band HashBitmask, + TimeBitfilter = Bitstring band TimeBitmask, + #filter{ + keymapper = Keymapper, + topic_filter = TopicFilter, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + }. + +-spec compute_initial_seek(keyspace_filter()) -> integer(). +compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) -> + % Should be the same as `compute_initial_seek(0, Filter)`. + HashBitfilter bor TimeBitfilter. + +-spec compute_next_seek(integer(), keyspace_filter()) -> integer(). +compute_next_seek( + Bitstring, + Filter = #filter{ + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter). + %%================================================================================ %% Internal functions %%================================================================================ @@ -388,65 +451,63 @@ ones(Bits) -> %% |123|056|678| & |fff|000|fff| = |123|000|678|. match_next( - It = #it{ - keymapper = Keymapper, + Bitstring, + Value, + Filter = #filter{ topic_filter = TopicFilter, hash_bitfilter = HashBitfilter, hash_bitmask = HashBitmask, time_bitfilter = TimeBitfilter, time_bitmask = TimeBitmask - }, - Bitstring, - Value + } ) -> - HashMatches = (Bitstring band HashBitmask) == HashBitfilter, - TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter, + HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask), case HashMatches and TimeMatches of true -> - {Topic, MessagePayload} = unwrap_message_value(Value), + Message = {Topic, _Payload} = unwrap_message_value(Value), case emqx_topic:match(Topic, TopicFilter) of true -> - {value, MessagePayload, It#it{next_action = next}}; + Message; false -> - next(It#it{next_action = next}) + next end; false -> - case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of - NextBitstring when is_integer(NextBitstring) -> - % ct:pal("Bitstring = ~32.16.0B", [Bitstring]), - % ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]), - % ct:pal("HBitmask = ~32.16.0B", [HashBitmask]), - % ct:pal("TBitmask = ~32.16.0B", [TimeBitmask]), - % ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]), - NextSeek = combine(NextBitstring, <<>>, Keymapper), - next(It#it{next_action = {seek, NextSeek}}); - none -> - stop_iteration(It) - end + compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter) end. -stop_iteration(It) -> - ok = rocksdb:iterator_close(It#it.handle), - none. - %% `Bitstring` is out of the hash space defined by `HashBitfilter`. -compute_next_seek(_HashMatches = false, _, Bitstring, It) -> - NextBitstring = compute_topic_seek( - Bitstring, - It#it.hash_bitfilter, - It#it.hash_bitmask, - It#it.keymapper - ), +compute_next_seek( + _HashMatches = false, + _TimeMatches, + Bitstring, + Filter = #filter{ + keymapper = Keymapper, + hash_bitfilter = HashBitfilter, + hash_bitmask = HashBitmask, + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper), case NextBitstring of none -> none; _ -> - TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter, - compute_next_seek(true, TimeMatches, NextBitstring, It) + TimeMatches = ?time_matches(NextBitstring, TimeBitfilter, TimeBitmask), + compute_next_seek(true, TimeMatches, NextBitstring, Filter) end; %% `Bitstring` is out of the time range defined by `TimeBitfilter`. -compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) -> - compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask); +compute_next_seek( + _HashMatches = true, + _TimeMatches = false, + Bitstring, + #filter{ + time_bitfilter = TimeBitfilter, + time_bitmask = TimeBitmask + } +) -> + compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask); compute_next_seek(true, true, Bitstring, _It) -> Bitstring. @@ -466,7 +527,7 @@ compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> % NOTE % We're iterating through `Substring` here, in lockstep with `HashBitfilter` - % and`HashBitmask`, starting from least signigicant bits. Each bitsource in + % and `HashBitmask`, starting from least signigicant bits. Each bitsource in % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S` % bits long which we interpret as a "digit". There are 2 flavors of those % "digits": @@ -573,6 +634,10 @@ substring(I, Offset, Size) -> data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). +stop_iteration(It) -> + ok = rocksdb:iterator_close(It#it.handle), + none. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). From d6ee23e5b340d32c0a1a430d8bcd82344a6d6dbb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 4 Jan 2023 22:05:09 +0300 Subject: [PATCH 2/9] test: move proptests into a separate module Following conventions. Also add few proptests on keyspace filters. --- .../test/emqx_replay_storage_SUITE.erl | 111 ---------- .../test/{ => props}/payload_gen.erl | 0 .../test/props/prop_replay_storage.erl | 189 ++++++++++++++++++ 3 files changed, 189 insertions(+), 111 deletions(-) rename apps/emqx_replay/test/{ => props}/payload_gen.erl (100%) create mode 100644 apps/emqx_replay/test/props/prop_replay_storage.erl diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index 30850927b..c99063350 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -20,7 +20,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --include_lib("proper/include/proper.hrl"). -define(ZONE, zone(?FUNCTION_NAME)). @@ -137,116 +136,6 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic) -> emqx_topic:words(iolist_to_binary(Topic)). -%% - -t_prop_topic_hash_computes(_) -> - Keymapper = emqx_replay_message_storage:make_keymapper(#{ - timestamp_bits => 32, - topic_bits_per_level => [8, 12, 16, 24], - epoch => 10000 - }), - ?assert( - proper:quickcheck( - ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin - BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), - is_integer(BS) andalso (BS < (1 bsl 92)) - end) - ) - ). - -t_prop_topic_bitmask_computes(_) -> - Keymapper = emqx_replay_message_storage:make_keymapper(#{ - timestamp_bits => 16, - topic_bits_per_level => [8, 12, 16], - epoch => 100 - }), - ?assert( - proper:quickcheck( - ?FORALL(TopicFilter, topic_filter(), begin - Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), - is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) - end) - ) - ). - -t_prop_iterate_stored_messages(_) -> - ?assertEqual( - true, - proper:quickcheck( - ?FORALL( - Streams, - messages(), - begin - Stream = payload_gen:interleave_streams(Streams), - ok = store_message_stream(?ZONE, Stream), - % TODO actually verify some property - true - end - ) - ) - ). - -store_message_stream(Zone, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> - MessageID = <>, - PublishedAt = rand:uniform(ChunkNum), - ok = emqx_replay_local_store:store(Zone, MessageID, PublishedAt, Topic, Payload), - store_message_stream(Zone, payload_gen:next(Rest)); -store_message_stream(_Zone, []) -> - ok. - -messages() -> - ?LET(Topics, list(topic()), begin - [{Topic, payload_gen:binary_stream_gen(64)} || Topic <- Topics] - end). - -topic() -> - % TODO - % Somehow generate topic levels with variance according to the entropy distribution? - non_empty(list(topic_level())). - -topic(EntropyWeights) -> - ?LET( - L, - list(1), - ?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))]) - ). - -topic_filter() -> - ?SUCHTHAT( - L, - non_empty( - list( - frequency([ - {5, topic_level()}, - {2, '+'}, - {1, '#'} - ]) - ) - ), - not lists:member('#', L) orelse lists:last(L) == '#' - ). - -% topic() -> -% ?LAZY(?SIZED(S, frequency([ -% {S, [topic_level() | topic()]}, -% {1, []} -% ]))). - -% topic_filter() -> -% ?LAZY(?SIZED(S, frequency([ -% {round(S / 3 * 2), [topic_level() | topic_filter()]}, -% {round(S / 3 * 1), ['+' | topic_filter()]}, -% {1, []}, -% {1, ['#']} -% ]))). - -topic_level() -> - ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). - -topic_level(Entropy) -> - S = floor(1 + math:log2(Entropy) / 4), - ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). - %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx_replay/test/payload_gen.erl b/apps/emqx_replay/test/props/payload_gen.erl similarity index 100% rename from apps/emqx_replay/test/payload_gen.erl rename to apps/emqx_replay/test/props/payload_gen.erl diff --git a/apps/emqx_replay/test/props/prop_replay_storage.erl b/apps/emqx_replay/test/props/prop_replay_storage.erl new file mode 100644 index 000000000..c2d63a3d7 --- /dev/null +++ b/apps/emqx_replay/test/props/prop_replay_storage.erl @@ -0,0 +1,189 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_replay_storage). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(ZONE, mk_zone_name(?FUNCTION_NAME)). +-define(SETUP(Test), ?SETUP(fun() -> setup(?ZONE) end, Test)). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_bitstring_computes() -> + ?FORALL(Keymapper, keymapper(), begin + Bitsize = emqx_replay_message_storage:bitsize(Keymapper), + ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin + BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), + is_integer(BS) andalso (BS < (1 bsl Bitsize)) + end) + end). + +prop_topic_bitmask_computes() -> + Keymapper = make_keymapper(16, [8, 12, 16], 100), + ?FORALL(TopicFilter, topic_filter(), begin + Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), + % topic bits + timestamp LSBs + is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) + end). + +prop_next_seek_monotonic() -> + ?FORALL( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + begin + Filter = emqx_replay_message_storage:make_keyspace_filter( + TopicFilter, StartTime, Keymapper + ), + ?FORALL( + Bitstring, + bitstr(emqx_replay_message_storage:bitsize(Keymapper)), + emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring + ) + end + ). + +prop_next_seek_eq_initial_seek() -> + ?FORALL( + Filter, + keyspace_filter(), + emqx_replay_message_storage:compute_initial_seek(Filter) =:= + emqx_replay_message_storage:compute_next_seek(0, Filter) + ). + +prop_iterate_stored_messages() -> + ?SETUP( + ?FORALL(Streams, message_streams(), begin + Stream = payload_gen:interleave_streams(Streams), + ok = store_message_stream(?ZONE, Stream), + % TODO actually verify some property + true + end) + ). + +store_message_stream(Zone, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> + MessageID = <>, + PublishedAt = rand:uniform(ChunkNum), + ok = emqx_replay_local_store:store(Zone, MessageID, PublishedAt, Topic, Payload), + store_message_stream(Zone, payload_gen:next(Rest)); +store_message_stream(_Zone, []) -> + ok. + +%%-------------------------------------------------------------------- +%% Setup / teardown +%%-------------------------------------------------------------------- + +setup(Zone) -> + {ok, _} = application:ensure_all_started(emqx_replay), + {ok, _} = emqx_replay_local_store_sup:start_zone(Zone), + fun() -> + application:stop(emqx_replay) + end. + +%%-------------------------------------------------------------------- +%% Type generators +%%-------------------------------------------------------------------- + +topic() -> + % TODO + % Somehow generate topic levels with variance according to the entropy distribution? + non_empty(list(topic_level())). + +topic(EntropyWeights) -> + ?LET( + L, + list(1), + ?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))]) + ). + +% entropy_weights() -> + +topic_filter() -> + ?SUCHTHAT( + L, + non_empty( + list( + frequency([ + {5, topic_level()}, + {2, '+'}, + {1, '#'} + ]) + ) + ), + not lists:member('#', L) orelse lists:last(L) == '#' + ). + +% topic() -> +% ?LAZY(?SIZED(S, frequency([ +% {S, [topic_level() | topic()]}, +% {1, []} +% ]))). + +% topic_filter() -> +% ?LAZY(?SIZED(S, frequency([ +% {round(S / 3 * 2), [topic_level() | topic_filter()]}, +% {round(S / 3 * 1), ['+' | topic_filter()]}, +% {1, []}, +% {1, ['#']} +% ]))). + +topic_level() -> + ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). + +topic_level(Entropy) -> + S = floor(1 + math:log2(Entropy) / 4), + ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). + +keymapper() -> + ?LET( + {TimestampBits, TopicBits, Epoch}, + { + range(0, 128), + non_empty(list(range(1, 32))), + pos_integer() + }, + make_keymapper(TimestampBits, TopicBits, Epoch * 100) + ). + +keyspace_filter() -> + ?LET( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper) + ). + +bitstr(Size) -> + ?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)). + +message_streams() -> + ?LET(Topics, list(topic()), begin + [{Topic, payload_gen:binary_stream_gen(64)} || Topic <- Topics] + end). + +%% + +make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> + emqx_replay_message_storage:make_keymapper(#{ + timestamp_bits => TimestampBits, + topic_bits_per_level => TopicBits, + epoch => MaxEpoch + }). + +mk_zone_name(TC) -> + list_to_atom(?MODULE_STRING ++ "_" ++ atom_to_list(TC)). From 7fd14fb404ea7c83385631c700128f07277904eb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 5 Jan 2023 22:48:10 +0300 Subject: [PATCH 3/9] feat: add an ability to preserve and restore iterators This will allow to persist iteration state and to periodically recreate iterators during long replays. --- .../src/emqx_replay_message_storage.erl | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 3988e97dc..1c91066cf 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -97,6 +97,9 @@ -export([make_iterator/3]). -export([next/1]). +-export([preserve_iterator/1]). +-export([restore_iterator/2]). + %% Debug/troubleshooting: %% Keymappers -export([ @@ -168,12 +171,14 @@ -record(it, { handle :: rocksdb:itr_handle(), filter :: keyspace_filter(), + cursor :: binary() | undefined, next_action :: {seek, binary()} | next }). -record(filter, { keymapper :: keymapper(), topic_filter :: emqx_topic:words(), + start_time :: integer(), hash_bitfilter :: integer(), hash_bitmask :: integer(), time_bitfilter :: integer(), @@ -287,7 +292,8 @@ next(It = #it{filter = #filter{keymapper = Keymapper}}) -> Bitstring = extract(Key, Keymapper), case match_next(Bitstring, Value, It#it.filter) of {_Topic, Payload} -> - {value, Payload, It#it{next_action = next}}; + % Preserve last seen key in the iterator so it could be restored later. + {value, Payload, It#it{cursor = Key, next_action = next}}; next -> next(It#it{next_action = next}); NextBitstring when is_integer(NextBitstring) -> @@ -302,6 +308,37 @@ next(It = #it{filter = #filter{keymapper = Keymapper}}) -> {error, closed} end. +-spec preserve_iterator(iterator()) -> binary(). +preserve_iterator(#it{cursor = Cursor, filter = Filter}) -> + State = #{ + v => 1, + cursor => Cursor, + filter => Filter#filter.topic_filter, + stime => Filter#filter.start_time + }, + term_to_binary(State). + +-spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}. +restore_iterator(DB, Serial) when is_binary(Serial) -> + State = binary_to_term(Serial), + restore_iterator(DB, State); +restore_iterator(DB, #{ + v := 1, + cursor := Cursor, + filter := TopicFilter, + stime := StartTime +}) -> + case make_iterator(DB, TopicFilter, StartTime) of + {ok, It} when Cursor == undefined -> + % Iterator was preserved right after it has been made. + {ok, It}; + {ok, It} -> + % Iterator was preserved mid-replay, seek right past the last seen key. + {ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}}; + Err -> + Err + end. + %%================================================================================ %% Internal exports %%================================================================================ @@ -365,6 +402,7 @@ make_keyspace_filter(TopicFilter, StartTime, Keymapper) -> #filter{ keymapper = Keymapper, topic_filter = TopicFilter, + start_time = StartTime, hash_bitfilter = HashBitfilter, hash_bitmask = HashBitmask, time_bitfilter = TimeBitfilter, @@ -437,6 +475,10 @@ bitwise_concat(Acc, Item, ItemSize) -> ones(Bits) -> 1 bsl Bits - 1. +-spec successor(key()) -> key(). +successor(Key) -> + <>. + %% |123|345|678| %% foo bar baz From aba48c488ea6935144a66b9910d1bf55991b4aff Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 5 Jan 2023 22:52:08 +0300 Subject: [PATCH 4/9] test: add a proptest on iterator preserve / restore Which verifies that preservation and restoration of iterators does not affect the outcome of an iteration (under the precondition that the state of database is constant during an iteration). --- .../props/prop_replay_message_storage.erl | 340 ++++++++++++++++++ .../test/props/prop_replay_storage.erl | 189 ---------- 2 files changed, 340 insertions(+), 189 deletions(-) create mode 100644 apps/emqx_replay/test/props/prop_replay_message_storage.erl delete mode 100644 apps/emqx_replay/test/props/prop_replay_storage.erl diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl new file mode 100644 index 000000000..222914680 --- /dev/null +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -0,0 +1,340 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_replay_message_storage). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(GEN_ID, 42). + +-define(PROP_FULLNAME, ?MODULE_STRING ++ "." ++ atom_to_list(?FUNCTION_NAME)). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_bitstring_computes() -> + ?FORALL(Keymapper, keymapper(), begin + Bitsize = emqx_replay_message_storage:bitsize(Keymapper), + ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin + BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), + is_integer(BS) andalso (BS < (1 bsl Bitsize)) + end) + end). + +prop_topic_bitmask_computes() -> + Keymapper = make_keymapper(16, [8, 12, 16], 100), + ?FORALL(TopicFilter, topic_filter(), begin + Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), + % topic bits + timestamp LSBs + is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) + end). + +prop_next_seek_monotonic() -> + ?FORALL( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + begin + Filter = emqx_replay_message_storage:make_keyspace_filter( + TopicFilter, + StartTime, + Keymapper + ), + ?FORALL( + Bitstring, + bitstr(emqx_replay_message_storage:bitsize(Keymapper)), + emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring + ) + end + ). + +prop_next_seek_eq_initial_seek() -> + ?FORALL( + Filter, + keyspace_filter(), + emqx_replay_message_storage:compute_initial_seek(Filter) =:= + emqx_replay_message_storage:compute_next_seek(0, Filter) + ). + +prop_iterate_eq_iterate_with_preserve_restore() -> + TBPL = [4, 8, 16, 12], + DB = open(?PROP_FULLNAME, #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 500 + }), + ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin + % TODO + % This proptest is impure because messages from testruns assumed to be + % independent of each other are accumulated in the same storage. This + % would probably confuse shrinker in the event a testrun fails. + ok = store(DB, Stream), + ?FORALL( + { + {Topic, _}, + Pat, + StartTime, + Commands + }, + { + nth(Stream), + topic_filter_pattern(), + start_time(), + shuffled(flat([non_empty(list({preserve, restore})), list(iterate)])) + }, + begin + TopicFilter = make_topic_filter(Pat, Topic), + Iterator = make_iterator(DB, TopicFilter, StartTime), + Messages = run_iterator_commands(Commands, Iterator, DB), + Messages =:= iterate(DB, TopicFilter, StartTime) + end + ) + end). + +% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> +% MessageID = emqx_guid:gen(), +% PublishedAt = ChunkNum, +% MessageID, PublishedAt, Topic +% ]), +% ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload), +% store_message_stream(DB, payload_gen:next(Rest)); +% store_message_stream(_Zone, []) -> +% ok. + +store(DB, Messages) -> + lists:foreach( + fun({Topic, Payload = {MessageID, Timestamp, _}}) -> + Bin = term_to_binary(Payload), + emqx_replay_message_storage:store(DB, MessageID, Timestamp, Topic, Bin) + end, + Messages + ). + +iterate(DB, TopicFilter, StartTime) -> + iterate(make_iterator(DB, TopicFilter, StartTime)). + +iterate(It) -> + case emqx_replay_message_storage:next(It) of + {value, Payload, ItNext} -> + [binary_to_term(Payload) | iterate(ItNext)]; + none -> + [] + end. + +make_iterator(DB, TopicFilter, StartTime) -> + {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime), + It. + +run_iterator_commands([iterate | Rest], It, DB) -> + case emqx_replay_message_storage:next(It) of + {value, Payload, ItNext} -> + [binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, DB)]; + none -> + [] + end; +run_iterator_commands([{preserve, restore} | Rest], It, DB) -> + Serial = emqx_replay_message_storage:preserve_iterator(It), + {ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Serial), + run_iterator_commands(Rest, ItNext, DB); +run_iterator_commands([], It, _DB) -> + iterate(It). + +%%-------------------------------------------------------------------- +%% Setup / teardown +%%-------------------------------------------------------------------- + +open(Filename, Options) -> + {ok, DBHandle} = rocksdb:open(Filename, [{create_if_missing, true}]), + {Schema, CFRefs} = emqx_replay_message_storage:create_new(DBHandle, ?GEN_ID, Options), + emqx_replay_message_storage:open(DBHandle, ?GEN_ID, CFRefs, Schema). + +%%-------------------------------------------------------------------- +%% Type generators +%%-------------------------------------------------------------------- + +topic() -> + non_empty(list(topic_level())). + +topic(EntropyWeights) -> + ?LET(L, scaled(1 / 4, list(1)), begin + EWs = lists:sublist(EntropyWeights ++ L, length(L)), + ?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs]) + end). + +topic_filter() -> + ?SUCHTHAT( + L, + non_empty( + list( + frequency([ + {5, topic_level()}, + {2, '+'}, + {1, '#'} + ]) + ) + ), + not lists:member('#', L) orelse lists:last(L) == '#' + ). + +topic_level_pattern() -> + frequency([ + {5, level}, + {2, '+'}, + {1, '#'} + ]). + +topic_filter_pattern() -> + list(topic_level_pattern()). + +topic_filter(Topic) -> + ?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)). + +make_topic_filter([], _) -> + []; +make_topic_filter(_, []) -> + []; +make_topic_filter(['#' | _], _) -> + ['#']; +make_topic_filter(['+' | Rest], [_ | Levels]) -> + ['+' | make_topic_filter(Rest, Levels)]; +make_topic_filter([level | Rest], [L | Levels]) -> + [L | make_topic_filter(Rest, Levels)]. + +% topic() -> +% ?LAZY(?SIZED(S, frequency([ +% {S, [topic_level() | topic()]}, +% {1, []} +% ]))). + +% topic_filter() -> +% ?LAZY(?SIZED(S, frequency([ +% {round(S / 3 * 2), [topic_level() | topic_filter()]}, +% {round(S / 3 * 1), ['+' | topic_filter()]}, +% {1, []}, +% {1, ['#']} +% ]))). + +topic_level() -> + ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). + +topic_level(Entropy) -> + S = floor(1 + math:log2(Entropy) / 4), + ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). + +topic_level_fixed() -> + oneof([ + <<"foo">>, + <<"bar">>, + <<"baz">>, + <<"xyzzy">> + ]). + +keymapper() -> + ?LET( + {TimestampBits, TopicBits, Epoch}, + { + range(0, 128), + non_empty(list(range(1, 32))), + pos_integer() + }, + make_keymapper(TimestampBits, TopicBits, Epoch * 100) + ). + +keyspace_filter() -> + ?LET( + {TopicFilter, StartTime, Keymapper}, + {topic_filter(), pos_integer(), keymapper()}, + emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper) + ). + +messages(Topic) -> + ?LET( + Ts, + list(Topic), + interleaved( + ?LET(Messages, vector(length(Ts), list(message())), lists:zip(Ts, Messages)) + ) + ). + +message() -> + ?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}). + +message_streams(Topic) -> + ?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]). + +timestamp() -> + scaled(20, pos_integer()). + +start_time() -> + scaled(10, pos_integer()). + +bitstr(Size) -> + ?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)). + +nth(L) -> + ?LET(I, range(1, length(L)), lists:nth(I, L)). + +scaled(Factor, T) -> + ?SIZED(S, resize(ceil(S * Factor), T)). + +interleaved(T) -> + ?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))). + +shuffled(T) -> + ?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))). + +flat(T) -> + ?LET(L, T, lists:flatten(L)). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> + emqx_replay_message_storage:make_keymapper(#{ + timestamp_bits => TimestampBits, + topic_bits_per_level => TopicBits, + epoch => MaxEpoch + }). + +-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). +interleave(Seqs, Rng) -> + interleave(Seqs, length(Seqs), Rng). + +interleave(Seqs, L, Rng) when L > 0 -> + {N, RngNext} = rand:uniform_s(L, Rng), + {SeqHead, SeqTail} = lists:split(N - 1, Seqs), + case SeqTail of + [{Tag, [M | Rest]} | SeqRest] -> + [{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)]; + [{_, []} | SeqRest] -> + interleave(SeqHead ++ SeqRest, L - 1, RngNext) + end; +interleave([], 0, _) -> + []. + +-spec shuffle(list(E), rand:state()) -> list(E). +shuffle(L, Rng) -> + {Rands, _} = randoms(length(L), Rng), + [E || {_, E} <- lists:sort(lists:zip(Rands, L))]. + +randoms(N, Rng) when N > 0 -> + {Rand, RngNext} = rand:uniform_s(Rng), + {Tail, RngFinal} = randoms(N - 1, RngNext), + {[Rand | Tail], RngFinal}; +randoms(_, Rng) -> + {[], Rng}. diff --git a/apps/emqx_replay/test/props/prop_replay_storage.erl b/apps/emqx_replay/test/props/prop_replay_storage.erl deleted file mode 100644 index c2d63a3d7..000000000 --- a/apps/emqx_replay/test/props/prop_replay_storage.erl +++ /dev/null @@ -1,189 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_replay_storage). - --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(ZONE, mk_zone_name(?FUNCTION_NAME)). --define(SETUP(Test), ?SETUP(fun() -> setup(?ZONE) end, Test)). - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_bitstring_computes() -> - ?FORALL(Keymapper, keymapper(), begin - Bitsize = emqx_replay_message_storage:bitsize(Keymapper), - ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin - BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), - is_integer(BS) andalso (BS < (1 bsl Bitsize)) - end) - end). - -prop_topic_bitmask_computes() -> - Keymapper = make_keymapper(16, [8, 12, 16], 100), - ?FORALL(TopicFilter, topic_filter(), begin - Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper), - % topic bits + timestamp LSBs - is_integer(Mask) andalso (Mask < (1 bsl (36 + 6))) - end). - -prop_next_seek_monotonic() -> - ?FORALL( - {TopicFilter, StartTime, Keymapper}, - {topic_filter(), pos_integer(), keymapper()}, - begin - Filter = emqx_replay_message_storage:make_keyspace_filter( - TopicFilter, StartTime, Keymapper - ), - ?FORALL( - Bitstring, - bitstr(emqx_replay_message_storage:bitsize(Keymapper)), - emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring - ) - end - ). - -prop_next_seek_eq_initial_seek() -> - ?FORALL( - Filter, - keyspace_filter(), - emqx_replay_message_storage:compute_initial_seek(Filter) =:= - emqx_replay_message_storage:compute_next_seek(0, Filter) - ). - -prop_iterate_stored_messages() -> - ?SETUP( - ?FORALL(Streams, message_streams(), begin - Stream = payload_gen:interleave_streams(Streams), - ok = store_message_stream(?ZONE, Stream), - % TODO actually verify some property - true - end) - ). - -store_message_stream(Zone, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> - MessageID = <>, - PublishedAt = rand:uniform(ChunkNum), - ok = emqx_replay_local_store:store(Zone, MessageID, PublishedAt, Topic, Payload), - store_message_stream(Zone, payload_gen:next(Rest)); -store_message_stream(_Zone, []) -> - ok. - -%%-------------------------------------------------------------------- -%% Setup / teardown -%%-------------------------------------------------------------------- - -setup(Zone) -> - {ok, _} = application:ensure_all_started(emqx_replay), - {ok, _} = emqx_replay_local_store_sup:start_zone(Zone), - fun() -> - application:stop(emqx_replay) - end. - -%%-------------------------------------------------------------------- -%% Type generators -%%-------------------------------------------------------------------- - -topic() -> - % TODO - % Somehow generate topic levels with variance according to the entropy distribution? - non_empty(list(topic_level())). - -topic(EntropyWeights) -> - ?LET( - L, - list(1), - ?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))]) - ). - -% entropy_weights() -> - -topic_filter() -> - ?SUCHTHAT( - L, - non_empty( - list( - frequency([ - {5, topic_level()}, - {2, '+'}, - {1, '#'} - ]) - ) - ), - not lists:member('#', L) orelse lists:last(L) == '#' - ). - -% topic() -> -% ?LAZY(?SIZED(S, frequency([ -% {S, [topic_level() | topic()]}, -% {1, []} -% ]))). - -% topic_filter() -> -% ?LAZY(?SIZED(S, frequency([ -% {round(S / 3 * 2), [topic_level() | topic_filter()]}, -% {round(S / 3 * 1), ['+' | topic_filter()]}, -% {1, []}, -% {1, ['#']} -% ]))). - -topic_level() -> - ?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)). - -topic_level(Entropy) -> - S = floor(1 + math:log2(Entropy) / 4), - ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))). - -keymapper() -> - ?LET( - {TimestampBits, TopicBits, Epoch}, - { - range(0, 128), - non_empty(list(range(1, 32))), - pos_integer() - }, - make_keymapper(TimestampBits, TopicBits, Epoch * 100) - ). - -keyspace_filter() -> - ?LET( - {TopicFilter, StartTime, Keymapper}, - {topic_filter(), pos_integer(), keymapper()}, - emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper) - ). - -bitstr(Size) -> - ?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)). - -message_streams() -> - ?LET(Topics, list(topic()), begin - [{Topic, payload_gen:binary_stream_gen(64)} || Topic <- Topics] - end). - -%% - -make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> - emqx_replay_message_storage:make_keymapper(#{ - timestamp_bits => TimestampBits, - topic_bits_per_level => TopicBits, - epoch => MaxEpoch - }). - -mk_zone_name(TC) -> - list_to_atom(?MODULE_STRING ++ "_" ++ atom_to_list(TC)). From 43225d20a69d0110552745bbb4a10a90c2b72a9b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 6 Jan 2023 13:51:50 +0300 Subject: [PATCH 5/9] test: use `_build/test/proper` as a scratch dir for testruns --- .../props/prop_replay_message_storage.erl | 59 ++++++++++++++----- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index 222914680..baab164f0 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -19,10 +19,10 @@ -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). +-define(WORK_DIR, ["_build", "test"]). +-define(RUN_ID, {?MODULE, testrun_id}). -define(GEN_ID, 42). --define(PROP_FULLNAME, ?MODULE_STRING ++ "." ++ atom_to_list(?FUNCTION_NAME)). - %%-------------------------------------------------------------------- %% Properties %%-------------------------------------------------------------------- @@ -72,17 +72,18 @@ prop_next_seek_eq_initial_seek() -> prop_iterate_eq_iterate_with_preserve_restore() -> TBPL = [4, 8, 16, 12], - DB = open(?PROP_FULLNAME, #{ + Options = #{ timestamp_bits => 32, topic_bits_per_level => TBPL, epoch => 500 - }), + }, + {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin % TODO % This proptest is impure because messages from testruns assumed to be % independent of each other are accumulated in the same storage. This % would probably confuse shrinker in the event a testrun fails. - ok = store(DB, Stream), + ok = store_db(DB, Stream), ?FORALL( { {Topic, _}, @@ -100,7 +101,7 @@ prop_iterate_eq_iterate_with_preserve_restore() -> TopicFilter = make_topic_filter(Pat, Topic), Iterator = make_iterator(DB, TopicFilter, StartTime), Messages = run_iterator_commands(Commands, Iterator, DB), - Messages =:= iterate(DB, TopicFilter, StartTime) + equals(Messages, iterate_db(DB, TopicFilter, StartTime)) end ) end). @@ -115,7 +116,7 @@ prop_iterate_eq_iterate_with_preserve_restore() -> % store_message_stream(_Zone, []) -> % ok. -store(DB, Messages) -> +store_db(DB, Messages) -> lists:foreach( fun({Topic, Payload = {MessageID, Timestamp, _}}) -> Bin = term_to_binary(Payload), @@ -124,13 +125,13 @@ store(DB, Messages) -> Messages ). -iterate(DB, TopicFilter, StartTime) -> - iterate(make_iterator(DB, TopicFilter, StartTime)). +iterate_db(DB, TopicFilter, StartTime) -> + iterate_db(make_iterator(DB, TopicFilter, StartTime)). -iterate(It) -> +iterate_db(It) -> case emqx_replay_message_storage:next(It) of {value, Payload, ItNext} -> - [binary_to_term(Payload) | iterate(ItNext)]; + [binary_to_term(Payload) | iterate_db(ItNext)]; none -> [] end. @@ -151,16 +152,42 @@ run_iterator_commands([{preserve, restore} | Rest], It, DB) -> {ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Serial), run_iterator_commands(Rest, ItNext, DB); run_iterator_commands([], It, _DB) -> - iterate(It). + iterate_db(It). %%-------------------------------------------------------------------- %% Setup / teardown %%-------------------------------------------------------------------- -open(Filename, Options) -> - {ok, DBHandle} = rocksdb:open(Filename, [{create_if_missing, true}]), - {Schema, CFRefs} = emqx_replay_message_storage:create_new(DBHandle, ?GEN_ID, Options), - emqx_replay_message_storage:open(DBHandle, ?GEN_ID, CFRefs, Schema). +open_db(Filepath, Options) -> + {ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]), + {Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options), + DB = emqx_replay_message_storage:open(Handle, ?GEN_ID, CFRefs, Schema), + {DB, Handle}. + +close_db(Handle) -> + rocksdb:close(Handle). + +make_filepath(TC) -> + make_filepath(TC, 0). + +make_filepath(TC, InstID) -> + Name = io_lib:format("~0p.~0p", [TC, InstID]), + Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]), + ok = filelib:ensure_dir(Path), + Path. + +get_run_id() -> + case persistent_term:get(?RUN_ID, undefined) of + RunID when RunID /= undefined -> + RunID; + undefined -> + RunID = make_run_id(), + ok = persistent_term:put(?RUN_ID, RunID), + RunID + end. + +make_run_id() -> + calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]). %%-------------------------------------------------------------------- %% Type generators From 5e633321db35e3c8ce43c823cf3aac50085c8d0f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 6 Jan 2023 13:52:53 +0300 Subject: [PATCH 6/9] test: scale up number of messages per topic in proptests --- apps/emqx_replay/test/props/prop_replay_message_storage.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index baab164f0..8be5a5edb 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -293,7 +293,7 @@ messages(Topic) -> Ts, list(Topic), interleaved( - ?LET(Messages, vector(length(Ts), list(message())), lists:zip(Ts, Messages)) + ?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages)) ) ). From 41bfebf9e0df83020ac8d30678ad522a3660d599 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 6 Jan 2023 13:54:59 +0300 Subject: [PATCH 7/9] test: proptest that iteration is exhaustive Compare iteration results against what an extremely simplified model produces. --- .../emqx_replay_message_storage_shim.erl | 58 ++++++++++++++++++ .../props/prop_replay_message_storage.erl | 59 +++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl diff --git a/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl new file mode 100644 index 000000000..125c9a9fc --- /dev/null +++ b/apps/emqx_replay/test/props/emqx_replay_message_storage_shim.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_replay_message_storage_shim). + +-export([open/0]). +-export([close/1]). +-export([store/5]). +-export([iterate/3]). + +-type topic() :: list(binary()). +-type time() :: integer(). + +-opaque t() :: ets:tid(). + +-spec open() -> t(). +open() -> + ets:new(?MODULE, [ordered_set, {keypos, 1}]). + +-spec close(t()) -> ok. +close(Tab) -> + true = ets:delete(Tab), + ok. + +-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) -> + ok | {error, _TODO}. +store(Tab, MessageID, PublishedAt, Topic, Payload) -> + true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), + ok. + +-spec iterate(t(), emqx_topic:words(), time()) -> + [binary()]. +iterate(Tab, TopicFilter, StartTime) -> + ets:foldr( + fun({{PublishedAt, _}, Topic, Payload}, Acc) -> + case emqx_topic:match(Topic, TopicFilter) of + true when PublishedAt >= StartTime -> + [Payload | Acc]; + _ -> + Acc + end + end, + [], + Tab + ). diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index 8be5a5edb..9619c4f05 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -70,6 +70,50 @@ prop_next_seek_eq_initial_seek() -> emqx_replay_message_storage:compute_next_seek(0, Filter) ). +prop_iterate_messages() -> + TBPL = [4, 8, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 200 + }, + % TODO + % Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit. + ?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin + Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)), + {DB, Handle} = open_db(Filepath, Options), + Shim = emqx_replay_message_storage_shim:open(), + ok = store_db(DB, Stream), + ok = store_shim(Shim, Stream), + ?FORALL( + { + {Topic, _}, + Pattern, + StartTime + }, + { + nth(Stream), + topic_filter_pattern(), + start_time() + }, + begin + TopicFilter = make_topic_filter(Pattern, Topic), + Messages = iterate_db(DB, TopicFilter, StartTime), + Reference = iterate_shim(Shim, TopicFilter, StartTime), + ok = close_db(Handle), + ok = emqx_replay_message_storage_shim:close(Shim), + ?WHENFAIL( + begin + io:format(user, " *** Filepath = ~s~n", [Filepath]), + io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]), + io:format(user, " *** StartTime = ~p~n", [StartTime]) + end, + is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages) + ) + end + ) + end). + prop_iterate_eq_iterate_with_preserve_restore() -> TBPL = [4, 8, 16, 12], Options = #{ @@ -154,6 +198,21 @@ run_iterator_commands([{preserve, restore} | Rest], It, DB) -> run_iterator_commands([], It, _DB) -> iterate_db(It). +store_shim(Shim, Messages) -> + lists:foreach( + fun({Topic, Payload = {MessageID, Timestamp, _}}) -> + Bin = term_to_binary(Payload), + emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin) + end, + Messages + ). + +iterate_shim(Shim, TopicFilter, StartTime) -> + lists:map( + fun binary_to_term/1, + emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime) + ). + %%-------------------------------------------------------------------- %% Setup / teardown %%-------------------------------------------------------------------- From d65112eeacac967da01f927e957cbdb13722164e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 6 Jan 2023 13:42:28 +0300 Subject: [PATCH 8/9] fix: clear bitmask of topic filter tail containing wildcards --- .../src/emqx_replay_message_storage.erl | 9 +++-- .../test/emqx_replay_storage_SUITE.erl | 35 +++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 1c91066cf..759ddf559 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -457,8 +457,13 @@ compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) -> compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_topic_bitmask(_, [{hash, levels, Size} | Rest], Acc) -> - compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); +compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) -> + Mask = + case lists:member('+', Tail) orelse lists:member('#', Tail) of + true -> 0; + false -> ones(Size) + end, + compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size)); compute_topic_bitmask(_, [], Acc) -> Acc. diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl index c99063350..3d7e7cb41 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl @@ -115,6 +115,19 @@ t_iterate_wildcard(_Config) -> ), ok. +t_iterate_long_tail_wildcard(_Config) -> + Topic = "b/c/d/e/f/g", + TopicFilter = "b/c/d/e/+/+", + Timestamps = lists:seq(1, 100), + _ = [ + store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) + || PublishedAt <- Timestamps + ], + ?assertEqual( + lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]), + lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)]) + ). + store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). @@ -140,13 +153,29 @@ parse_topic(Topic) -> all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(TC, Config) -> +init_per_suite(Config) -> {ok, _} = application:ensure_all_started(emqx_replay), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_replay). + +init_per_testcase(TC, Config) -> + ok = set_zone_config(zone(TC), #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 32, 16], + epoch => 5 + }), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. -end_per_testcase(_TC, _Config) -> - ok = application:stop(emqx_replay). +end_per_testcase(TC, _Config) -> + ok = emqx_replay_local_store_sup:stop_zone(zone(TC)). zone(TC) -> list_to_atom(?MODULE_STRING ++ atom_to_list(TC)). + +set_zone_config(Zone, Options) -> + ok = application:set_env(emqx_replay, zone_config, #{ + Zone => {emqx_replay_message_storage, Options} + }). From f338aeb3f290ed39ef3615664d9b058ad245cb3b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 11 Jan 2023 18:43:31 +0300 Subject: [PATCH 9/9] refactor: use inline functions instead of macros where applicable --- .../src/emqx_replay_message_storage.erl | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index 759ddf559..fe0a0e08a 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -121,7 +121,15 @@ -export_type([db/0, iterator/0, schema/0]). --compile({inline, [ones/1, bitwise_concat/3]}). +-compile( + {inline, [ + bitwise_concat/3, + ones/1, + successor/1, + topic_hash_matches/3, + time_matches/3 + ]} +). %%================================================================================ %% Type declarations @@ -343,14 +351,6 @@ restore_iterator(DB, #{ %% Internal exports %%================================================================================ --define(topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), - (Bitstring band HashBitmask) == HashBitfilter -). - --define(time_matches(Bitstring, TimeBitfilter, TimeBitmask), - (Bitstring band TimeBitmask) >= TimeBitfilter -). - -spec bitsize(keymapper()) -> bits(). bitsize(#keymapper{bitsize = Bitsize}) -> Bitsize. @@ -424,8 +424,8 @@ compute_next_seek( time_bitmask = TimeBitmask } ) -> - HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), - TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask), + HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter). %%================================================================================ @@ -508,8 +508,8 @@ match_next( time_bitmask = TimeBitmask } ) -> - HashMatches = ?topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), - TimeMatches = ?time_matches(Bitstring, TimeBitfilter, TimeBitmask), + HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), + TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), case HashMatches and TimeMatches of true -> Message = {Topic, _Payload} = unwrap_message_value(Value), @@ -541,7 +541,7 @@ compute_next_seek( none -> none; _ -> - TimeMatches = ?time_matches(NextBitstring, TimeBitfilter, TimeBitmask), + TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask), compute_next_seek(true, TimeMatches, NextBitstring, Filter) end; %% `Bitstring` is out of the time range defined by `TimeBitfilter`. @@ -558,6 +558,12 @@ compute_next_seek( compute_next_seek(true, true, Bitstring, _It) -> Bitstring. +topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) -> + (Bitstring band HashBitmask) == HashBitfilter. + +time_matches(Bitstring, TimeBitfilter, TimeBitmask) -> + (Bitstring band TimeBitmask) >= TimeBitfilter. + compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) -> % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`. (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.