Merge pull request #9695 from keynslug/feat/rocksdb-replay-queue/serializable-iterators
feat: add an ability to preserve and restore iterators
This commit is contained in:
commit
5f5cc27697
|
@ -97,20 +97,39 @@
|
||||||
-export([make_iterator/3]).
|
-export([make_iterator/3]).
|
||||||
-export([next/1]).
|
-export([next/1]).
|
||||||
|
|
||||||
|
-export([preserve_iterator/1]).
|
||||||
|
-export([restore_iterator/2]).
|
||||||
|
|
||||||
%% Debug/troubleshooting:
|
%% Debug/troubleshooting:
|
||||||
|
%% Keymappers
|
||||||
-export([
|
-export([
|
||||||
make_message_key/4,
|
bitsize/1,
|
||||||
compute_bitstring/3,
|
compute_bitstring/3,
|
||||||
compute_topic_bitmask/2,
|
compute_topic_bitmask/2,
|
||||||
compute_next_seek/4,
|
compute_time_bitmask/1,
|
||||||
compute_time_seek/3,
|
|
||||||
compute_topic_seek/4,
|
|
||||||
hash/2
|
hash/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Keyspace filters
|
||||||
|
-export([
|
||||||
|
make_keyspace_filter/3,
|
||||||
|
compute_initial_seek/1,
|
||||||
|
compute_next_seek/2,
|
||||||
|
compute_time_seek/3,
|
||||||
|
compute_topic_seek/4
|
||||||
|
]).
|
||||||
|
|
||||||
-export_type([db/0, iterator/0, schema/0]).
|
-export_type([db/0, iterator/0, schema/0]).
|
||||||
|
|
||||||
-compile({inline, [ones/1, bitwise_concat/3]}).
|
-compile(
|
||||||
|
{inline, [
|
||||||
|
bitwise_concat/3,
|
||||||
|
ones/1,
|
||||||
|
successor/1,
|
||||||
|
topic_hash_matches/3,
|
||||||
|
time_matches/3
|
||||||
|
]}
|
||||||
|
).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
|
@ -159,9 +178,15 @@
|
||||||
|
|
||||||
-record(it, {
|
-record(it, {
|
||||||
handle :: rocksdb:itr_handle(),
|
handle :: rocksdb:itr_handle(),
|
||||||
|
filter :: keyspace_filter(),
|
||||||
|
cursor :: binary() | undefined,
|
||||||
|
next_action :: {seek, binary()} | next
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(filter, {
|
||||||
keymapper :: keymapper(),
|
keymapper :: keymapper(),
|
||||||
next_action :: {seek, binary()} | next,
|
|
||||||
topic_filter :: emqx_topic:words(),
|
topic_filter :: emqx_topic:words(),
|
||||||
|
start_time :: integer(),
|
||||||
hash_bitfilter :: integer(),
|
hash_bitfilter :: integer(),
|
||||||
hash_bitmask :: integer(),
|
hash_bitmask :: integer(),
|
||||||
time_bitfilter :: integer(),
|
time_bitfilter :: integer(),
|
||||||
|
@ -186,6 +211,7 @@
|
||||||
-opaque db() :: #db{}.
|
-opaque db() :: #db{}.
|
||||||
-opaque iterator() :: #it{}.
|
-opaque iterator() :: #it{}.
|
||||||
-type keymapper() :: #keymapper{}.
|
-type keymapper() :: #keymapper{}.
|
||||||
|
-type keyspace_filter() :: #filter{}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
|
@ -254,43 +280,81 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
|
||||||
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
|
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
|
||||||
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
||||||
{ok, ITHandle} ->
|
{ok, ITHandle} ->
|
||||||
Bitstring = compute_bitstring(TopicFilter, StartTime, DB#db.keymapper),
|
% TODO earliest
|
||||||
HashBitmask = compute_topic_bitmask(TopicFilter, DB#db.keymapper),
|
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
|
||||||
TimeBitmask = compute_time_bitmask(DB#db.keymapper),
|
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
|
||||||
HashBitfilter = Bitstring band HashBitmask,
|
|
||||||
TimeBitfilter = Bitstring band TimeBitmask,
|
|
||||||
InitialSeek = combine(HashBitfilter bor TimeBitfilter, <<>>, DB#db.keymapper),
|
|
||||||
{ok, #it{
|
{ok, #it{
|
||||||
handle = ITHandle,
|
handle = ITHandle,
|
||||||
keymapper = DB#db.keymapper,
|
filter = Filter,
|
||||||
next_action = {seek, InitialSeek},
|
next_action = {seek, InitialSeek}
|
||||||
topic_filter = TopicFilter,
|
|
||||||
hash_bitfilter = HashBitfilter,
|
|
||||||
hash_bitmask = HashBitmask,
|
|
||||||
time_bitfilter = TimeBitfilter,
|
|
||||||
time_bitmask = TimeBitmask
|
|
||||||
}};
|
}};
|
||||||
Err ->
|
Err ->
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
|
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
|
||||||
next(It = #it{next_action = Action}) ->
|
next(It = #it{filter = #filter{keymapper = Keymapper}}) ->
|
||||||
case rocksdb:iterator_move(It#it.handle, Action) of
|
case rocksdb:iterator_move(It#it.handle, It#it.next_action) of
|
||||||
% spec says `{ok, Key}` is also possible but the implementation says it's not
|
% spec says `{ok, Key}` is also possible but the implementation says it's not
|
||||||
{ok, Key, Value} ->
|
{ok, Key, Value} ->
|
||||||
Bitstring = extract(Key, It#it.keymapper),
|
Bitstring = extract(Key, Keymapper),
|
||||||
match_next(It, Bitstring, Value);
|
case match_next(Bitstring, Value, It#it.filter) of
|
||||||
|
{_Topic, Payload} ->
|
||||||
|
% Preserve last seen key in the iterator so it could be restored later.
|
||||||
|
{value, Payload, It#it{cursor = Key, next_action = next}};
|
||||||
|
next ->
|
||||||
|
next(It#it{next_action = next});
|
||||||
|
NextBitstring when is_integer(NextBitstring) ->
|
||||||
|
NextSeek = combine(NextBitstring, <<>>, Keymapper),
|
||||||
|
next(It#it{next_action = {seek, NextSeek}});
|
||||||
|
none ->
|
||||||
|
stop_iteration(It)
|
||||||
|
end;
|
||||||
{error, invalid_iterator} ->
|
{error, invalid_iterator} ->
|
||||||
stop_iteration(It);
|
stop_iteration(It);
|
||||||
{error, iterator_closed} ->
|
{error, iterator_closed} ->
|
||||||
{error, closed}
|
{error, closed}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec preserve_iterator(iterator()) -> binary().
|
||||||
|
preserve_iterator(#it{cursor = Cursor, filter = Filter}) ->
|
||||||
|
State = #{
|
||||||
|
v => 1,
|
||||||
|
cursor => Cursor,
|
||||||
|
filter => Filter#filter.topic_filter,
|
||||||
|
stime => Filter#filter.start_time
|
||||||
|
},
|
||||||
|
term_to_binary(State).
|
||||||
|
|
||||||
|
-spec restore_iterator(db(), binary()) -> {ok, iterator()} | {error, _TODO}.
|
||||||
|
restore_iterator(DB, Serial) when is_binary(Serial) ->
|
||||||
|
State = binary_to_term(Serial),
|
||||||
|
restore_iterator(DB, State);
|
||||||
|
restore_iterator(DB, #{
|
||||||
|
v := 1,
|
||||||
|
cursor := Cursor,
|
||||||
|
filter := TopicFilter,
|
||||||
|
stime := StartTime
|
||||||
|
}) ->
|
||||||
|
case make_iterator(DB, TopicFilter, StartTime) of
|
||||||
|
{ok, It} when Cursor == undefined ->
|
||||||
|
% Iterator was preserved right after it has been made.
|
||||||
|
{ok, It};
|
||||||
|
{ok, It} ->
|
||||||
|
% Iterator was preserved mid-replay, seek right past the last seen key.
|
||||||
|
{ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}};
|
||||||
|
Err ->
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal exports
|
%% Internal exports
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec bitsize(keymapper()) -> bits().
|
||||||
|
bitsize(#keymapper{bitsize = Bitsize}) ->
|
||||||
|
Bitsize.
|
||||||
|
|
||||||
make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
|
make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
|
||||||
combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
|
combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
|
||||||
|
|
||||||
|
@ -323,10 +387,47 @@ compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) ->
|
||||||
compute_time_bitmask(#keymapper{source = Source}) ->
|
compute_time_bitmask(#keymapper{source = Source}) ->
|
||||||
compute_time_bitmask(Source, 0).
|
compute_time_bitmask(Source, 0).
|
||||||
|
|
||||||
|
-spec hash(term(), bits()) -> integer().
|
||||||
hash(Input, Bits) ->
|
hash(Input, Bits) ->
|
||||||
% at most 32 bits
|
% at most 32 bits
|
||||||
erlang:phash2(Input, 1 bsl Bits).
|
erlang:phash2(Input, 1 bsl Bits).
|
||||||
|
|
||||||
|
-spec make_keyspace_filter(emqx_topic:words(), time(), keymapper()) -> keyspace_filter().
|
||||||
|
make_keyspace_filter(TopicFilter, StartTime, Keymapper) ->
|
||||||
|
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
|
||||||
|
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
|
||||||
|
TimeBitmask = compute_time_bitmask(Keymapper),
|
||||||
|
HashBitfilter = Bitstring band HashBitmask,
|
||||||
|
TimeBitfilter = Bitstring band TimeBitmask,
|
||||||
|
#filter{
|
||||||
|
keymapper = Keymapper,
|
||||||
|
topic_filter = TopicFilter,
|
||||||
|
start_time = StartTime,
|
||||||
|
hash_bitfilter = HashBitfilter,
|
||||||
|
hash_bitmask = HashBitmask,
|
||||||
|
time_bitfilter = TimeBitfilter,
|
||||||
|
time_bitmask = TimeBitmask
|
||||||
|
}.
|
||||||
|
|
||||||
|
-spec compute_initial_seek(keyspace_filter()) -> integer().
|
||||||
|
compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) ->
|
||||||
|
% Should be the same as `compute_initial_seek(0, Filter)`.
|
||||||
|
HashBitfilter bor TimeBitfilter.
|
||||||
|
|
||||||
|
-spec compute_next_seek(integer(), keyspace_filter()) -> integer().
|
||||||
|
compute_next_seek(
|
||||||
|
Bitstring,
|
||||||
|
Filter = #filter{
|
||||||
|
hash_bitfilter = HashBitfilter,
|
||||||
|
hash_bitmask = HashBitmask,
|
||||||
|
time_bitfilter = TimeBitfilter,
|
||||||
|
time_bitmask = TimeBitmask
|
||||||
|
}
|
||||||
|
) ->
|
||||||
|
HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
|
||||||
|
TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
|
||||||
|
compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -356,8 +457,13 @@ compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) ->
|
||||||
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
|
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||||
compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
|
compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) ->
|
||||||
compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
|
compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size));
|
||||||
compute_topic_bitmask(_, [{hash, levels, Size} | Rest], Acc) ->
|
compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) ->
|
||||||
compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size));
|
Mask =
|
||||||
|
case lists:member('+', Tail) orelse lists:member('#', Tail) of
|
||||||
|
true -> 0;
|
||||||
|
false -> ones(Size)
|
||||||
|
end,
|
||||||
|
compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size));
|
||||||
compute_topic_bitmask(_, [], Acc) ->
|
compute_topic_bitmask(_, [], Acc) ->
|
||||||
Acc.
|
Acc.
|
||||||
|
|
||||||
|
@ -374,6 +480,10 @@ bitwise_concat(Acc, Item, ItemSize) ->
|
||||||
ones(Bits) ->
|
ones(Bits) ->
|
||||||
1 bsl Bits - 1.
|
1 bsl Bits - 1.
|
||||||
|
|
||||||
|
-spec successor(key()) -> key().
|
||||||
|
successor(Key) ->
|
||||||
|
<<Key/binary, 0:8>>.
|
||||||
|
|
||||||
%% |123|345|678|
|
%% |123|345|678|
|
||||||
%% foo bar baz
|
%% foo bar baz
|
||||||
|
|
||||||
|
@ -388,68 +498,72 @@ ones(Bits) ->
|
||||||
%% |123|056|678| & |fff|000|fff| = |123|000|678|.
|
%% |123|056|678| & |fff|000|fff| = |123|000|678|.
|
||||||
|
|
||||||
match_next(
|
match_next(
|
||||||
It = #it{
|
Bitstring,
|
||||||
keymapper = Keymapper,
|
Value,
|
||||||
|
Filter = #filter{
|
||||||
topic_filter = TopicFilter,
|
topic_filter = TopicFilter,
|
||||||
hash_bitfilter = HashBitfilter,
|
hash_bitfilter = HashBitfilter,
|
||||||
hash_bitmask = HashBitmask,
|
hash_bitmask = HashBitmask,
|
||||||
time_bitfilter = TimeBitfilter,
|
time_bitfilter = TimeBitfilter,
|
||||||
time_bitmask = TimeBitmask
|
time_bitmask = TimeBitmask
|
||||||
},
|
}
|
||||||
Bitstring,
|
|
||||||
Value
|
|
||||||
) ->
|
) ->
|
||||||
HashMatches = (Bitstring band HashBitmask) == HashBitfilter,
|
HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask),
|
||||||
TimeMatches = (Bitstring band TimeBitmask) >= TimeBitfilter,
|
TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask),
|
||||||
case HashMatches and TimeMatches of
|
case HashMatches and TimeMatches of
|
||||||
true ->
|
true ->
|
||||||
{Topic, MessagePayload} = unwrap_message_value(Value),
|
Message = {Topic, _Payload} = unwrap_message_value(Value),
|
||||||
case emqx_topic:match(Topic, TopicFilter) of
|
case emqx_topic:match(Topic, TopicFilter) of
|
||||||
true ->
|
true ->
|
||||||
{value, MessagePayload, It#it{next_action = next}};
|
Message;
|
||||||
false ->
|
false ->
|
||||||
next(It#it{next_action = next})
|
next
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
case compute_next_seek(HashMatches, TimeMatches, Bitstring, It) of
|
compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter)
|
||||||
NextBitstring when is_integer(NextBitstring) ->
|
|
||||||
% ct:pal("Bitstring = ~32.16.0B", [Bitstring]),
|
|
||||||
% ct:pal("Bitfilter = ~32.16.0B", [Bitfilter]),
|
|
||||||
% ct:pal("HBitmask = ~32.16.0B", [HashBitmask]),
|
|
||||||
% ct:pal("TBitmask = ~32.16.0B", [TimeBitmask]),
|
|
||||||
% ct:pal("NextBitstring = ~32.16.0B", [NextBitstring]),
|
|
||||||
NextSeek = combine(NextBitstring, <<>>, Keymapper),
|
|
||||||
next(It#it{next_action = {seek, NextSeek}});
|
|
||||||
none ->
|
|
||||||
stop_iteration(It)
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
stop_iteration(It) ->
|
|
||||||
ok = rocksdb:iterator_close(It#it.handle),
|
|
||||||
none.
|
|
||||||
|
|
||||||
%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
|
%% `Bitstring` is out of the hash space defined by `HashBitfilter`.
|
||||||
compute_next_seek(_HashMatches = false, _, Bitstring, It) ->
|
compute_next_seek(
|
||||||
NextBitstring = compute_topic_seek(
|
_HashMatches = false,
|
||||||
Bitstring,
|
_TimeMatches,
|
||||||
It#it.hash_bitfilter,
|
Bitstring,
|
||||||
It#it.hash_bitmask,
|
Filter = #filter{
|
||||||
It#it.keymapper
|
keymapper = Keymapper,
|
||||||
),
|
hash_bitfilter = HashBitfilter,
|
||||||
|
hash_bitmask = HashBitmask,
|
||||||
|
time_bitfilter = TimeBitfilter,
|
||||||
|
time_bitmask = TimeBitmask
|
||||||
|
}
|
||||||
|
) ->
|
||||||
|
NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper),
|
||||||
case NextBitstring of
|
case NextBitstring of
|
||||||
none ->
|
none ->
|
||||||
none;
|
none;
|
||||||
_ ->
|
_ ->
|
||||||
TimeMatches = (NextBitstring band It#it.time_bitmask) >= It#it.time_bitfilter,
|
TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask),
|
||||||
compute_next_seek(true, TimeMatches, NextBitstring, It)
|
compute_next_seek(true, TimeMatches, NextBitstring, Filter)
|
||||||
end;
|
end;
|
||||||
%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
|
%% `Bitstring` is out of the time range defined by `TimeBitfilter`.
|
||||||
compute_next_seek(_HashMatches = true, _TimeMatches = false, Bitstring, It) ->
|
compute_next_seek(
|
||||||
compute_time_seek(Bitstring, It#it.time_bitfilter, It#it.time_bitmask);
|
_HashMatches = true,
|
||||||
|
_TimeMatches = false,
|
||||||
|
Bitstring,
|
||||||
|
#filter{
|
||||||
|
time_bitfilter = TimeBitfilter,
|
||||||
|
time_bitmask = TimeBitmask
|
||||||
|
}
|
||||||
|
) ->
|
||||||
|
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask);
|
||||||
compute_next_seek(true, true, Bitstring, _It) ->
|
compute_next_seek(true, true, Bitstring, _It) ->
|
||||||
Bitstring.
|
Bitstring.
|
||||||
|
|
||||||
|
topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) ->
|
||||||
|
(Bitstring band HashBitmask) == HashBitfilter.
|
||||||
|
|
||||||
|
time_matches(Bitstring, TimeBitfilter, TimeBitmask) ->
|
||||||
|
(Bitstring band TimeBitmask) >= TimeBitfilter.
|
||||||
|
|
||||||
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
|
compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) ->
|
||||||
% Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
|
% Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`.
|
||||||
(Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
|
(Bitstring band (bnot TimeBitmask)) bor TimeBitfilter.
|
||||||
|
@ -466,7 +580,7 @@ compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) ->
|
||||||
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
|
compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) ->
|
||||||
% NOTE
|
% NOTE
|
||||||
% We're iterating through `Substring` here, in lockstep with `HashBitfilter`
|
% We're iterating through `Substring` here, in lockstep with `HashBitfilter`
|
||||||
% and`HashBitmask`, starting from least signigicant bits. Each bitsource in
|
% and `HashBitmask`, starting from least signigicant bits. Each bitsource in
|
||||||
% `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
|
% `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S`
|
||||||
% bits long which we interpret as a "digit". There are 2 flavors of those
|
% bits long which we interpret as a "digit". There are 2 flavors of those
|
||||||
% "digits":
|
% "digits":
|
||||||
|
@ -573,6 +687,10 @@ substring(I, Offset, Size) ->
|
||||||
data_cf(GenId) ->
|
data_cf(GenId) ->
|
||||||
?MODULE_STRING ++ integer_to_list(GenId).
|
?MODULE_STRING ++ integer_to_list(GenId).
|
||||||
|
|
||||||
|
stop_iteration(It) ->
|
||||||
|
ok = rocksdb:iterator_close(It#it.handle),
|
||||||
|
none.
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
-include_lib("proper/include/proper.hrl").
|
|
||||||
|
|
||||||
-define(ZONE, zone(?FUNCTION_NAME)).
|
-define(ZONE, zone(?FUNCTION_NAME)).
|
||||||
|
|
||||||
|
@ -116,6 +115,19 @@ t_iterate_wildcard(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_iterate_long_tail_wildcard(_Config) ->
|
||||||
|
Topic = "b/c/d/e/f/g",
|
||||||
|
TopicFilter = "b/c/d/e/+/+",
|
||||||
|
Timestamps = lists:seq(1, 100),
|
||||||
|
_ = [
|
||||||
|
store(?ZONE, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
||||||
|
|| PublishedAt <- Timestamps
|
||||||
|
],
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([{"b/c/d/e/f/g", PublishedAt} || PublishedAt <- lists:seq(50, 100)]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, TopicFilter, 50)])
|
||||||
|
).
|
||||||
|
|
||||||
store(Zone, PublishedAt, Topic, Payload) ->
|
store(Zone, PublishedAt, Topic, Payload) ->
|
||||||
ID = emqx_guid:gen(),
|
ID = emqx_guid:gen(),
|
||||||
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
|
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
|
||||||
|
@ -137,127 +149,33 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
|
||||||
parse_topic(Topic) ->
|
parse_topic(Topic) ->
|
||||||
emqx_topic:words(iolist_to_binary(Topic)).
|
emqx_topic:words(iolist_to_binary(Topic)).
|
||||||
|
|
||||||
%%
|
|
||||||
|
|
||||||
t_prop_topic_hash_computes(_) ->
|
|
||||||
Keymapper = emqx_replay_message_storage:make_keymapper(#{
|
|
||||||
timestamp_bits => 32,
|
|
||||||
topic_bits_per_level => [8, 12, 16, 24],
|
|
||||||
epoch => 10000
|
|
||||||
}),
|
|
||||||
?assert(
|
|
||||||
proper:quickcheck(
|
|
||||||
?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
|
|
||||||
BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
|
|
||||||
is_integer(BS) andalso (BS < (1 bsl 92))
|
|
||||||
end)
|
|
||||||
)
|
|
||||||
).
|
|
||||||
|
|
||||||
t_prop_topic_bitmask_computes(_) ->
|
|
||||||
Keymapper = emqx_replay_message_storage:make_keymapper(#{
|
|
||||||
timestamp_bits => 16,
|
|
||||||
topic_bits_per_level => [8, 12, 16],
|
|
||||||
epoch => 100
|
|
||||||
}),
|
|
||||||
?assert(
|
|
||||||
proper:quickcheck(
|
|
||||||
?FORALL(TopicFilter, topic_filter(), begin
|
|
||||||
Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper),
|
|
||||||
is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
|
|
||||||
end)
|
|
||||||
)
|
|
||||||
).
|
|
||||||
|
|
||||||
t_prop_iterate_stored_messages(_) ->
|
|
||||||
?assertEqual(
|
|
||||||
true,
|
|
||||||
proper:quickcheck(
|
|
||||||
?FORALL(
|
|
||||||
Streams,
|
|
||||||
messages(),
|
|
||||||
begin
|
|
||||||
Stream = payload_gen:interleave_streams(Streams),
|
|
||||||
ok = store_message_stream(?ZONE, Stream),
|
|
||||||
% TODO actually verify some property
|
|
||||||
true
|
|
||||||
end
|
|
||||||
)
|
|
||||||
)
|
|
||||||
).
|
|
||||||
|
|
||||||
store_message_stream(Zone, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
|
|
||||||
MessageID = <<ChunkNum:32>>,
|
|
||||||
PublishedAt = rand:uniform(ChunkNum),
|
|
||||||
ok = emqx_replay_local_store:store(Zone, MessageID, PublishedAt, Topic, Payload),
|
|
||||||
store_message_stream(Zone, payload_gen:next(Rest));
|
|
||||||
store_message_stream(_Zone, []) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
messages() ->
|
|
||||||
?LET(Topics, list(topic()), begin
|
|
||||||
[{Topic, payload_gen:binary_stream_gen(64)} || Topic <- Topics]
|
|
||||||
end).
|
|
||||||
|
|
||||||
topic() ->
|
|
||||||
% TODO
|
|
||||||
% Somehow generate topic levels with variance according to the entropy distribution?
|
|
||||||
non_empty(list(topic_level())).
|
|
||||||
|
|
||||||
topic(EntropyWeights) ->
|
|
||||||
?LET(
|
|
||||||
L,
|
|
||||||
list(1),
|
|
||||||
?SIZED(S, [topic_level(S * EW) || EW <- lists:sublist(EntropyWeights ++ L, length(L))])
|
|
||||||
).
|
|
||||||
|
|
||||||
topic_filter() ->
|
|
||||||
?SUCHTHAT(
|
|
||||||
L,
|
|
||||||
non_empty(
|
|
||||||
list(
|
|
||||||
frequency([
|
|
||||||
{5, topic_level()},
|
|
||||||
{2, '+'},
|
|
||||||
{1, '#'}
|
|
||||||
])
|
|
||||||
)
|
|
||||||
),
|
|
||||||
not lists:member('#', L) orelse lists:last(L) == '#'
|
|
||||||
).
|
|
||||||
|
|
||||||
% topic() ->
|
|
||||||
% ?LAZY(?SIZED(S, frequency([
|
|
||||||
% {S, [topic_level() | topic()]},
|
|
||||||
% {1, []}
|
|
||||||
% ]))).
|
|
||||||
|
|
||||||
% topic_filter() ->
|
|
||||||
% ?LAZY(?SIZED(S, frequency([
|
|
||||||
% {round(S / 3 * 2), [topic_level() | topic_filter()]},
|
|
||||||
% {round(S / 3 * 1), ['+' | topic_filter()]},
|
|
||||||
% {1, []},
|
|
||||||
% {1, ['#']}
|
|
||||||
% ]))).
|
|
||||||
|
|
||||||
topic_level() ->
|
|
||||||
?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)).
|
|
||||||
|
|
||||||
topic_level(Entropy) ->
|
|
||||||
S = floor(1 + math:log2(Entropy) / 4),
|
|
||||||
?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).
|
|
||||||
|
|
||||||
%% CT callbacks
|
%% CT callbacks
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_suite(Config) ->
|
||||||
{ok, _} = application:ensure_all_started(emqx_replay),
|
{ok, _} = application:ensure_all_started(emqx_replay),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
ok = application:stop(emqx_replay).
|
||||||
|
|
||||||
|
init_per_testcase(TC, Config) ->
|
||||||
|
ok = set_zone_config(zone(TC), #{
|
||||||
|
timestamp_bits => 64,
|
||||||
|
topic_bits_per_level => [8, 8, 32, 16],
|
||||||
|
epoch => 5
|
||||||
|
}),
|
||||||
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
|
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_TC, _Config) ->
|
end_per_testcase(TC, _Config) ->
|
||||||
ok = application:stop(emqx_replay).
|
ok = emqx_replay_local_store_sup:stop_zone(zone(TC)).
|
||||||
|
|
||||||
zone(TC) ->
|
zone(TC) ->
|
||||||
list_to_atom(?MODULE_STRING ++ atom_to_list(TC)).
|
list_to_atom(?MODULE_STRING ++ atom_to_list(TC)).
|
||||||
|
|
||||||
|
set_zone_config(Zone, Options) ->
|
||||||
|
ok = application:set_env(emqx_replay, zone_config, #{
|
||||||
|
Zone => {emqx_replay_message_storage, Options}
|
||||||
|
}).
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_replay_message_storage_shim).
|
||||||
|
|
||||||
|
-export([open/0]).
|
||||||
|
-export([close/1]).
|
||||||
|
-export([store/5]).
|
||||||
|
-export([iterate/3]).
|
||||||
|
|
||||||
|
-type topic() :: list(binary()).
|
||||||
|
-type time() :: integer().
|
||||||
|
|
||||||
|
-opaque t() :: ets:tid().
|
||||||
|
|
||||||
|
-spec open() -> t().
|
||||||
|
open() ->
|
||||||
|
ets:new(?MODULE, [ordered_set, {keypos, 1}]).
|
||||||
|
|
||||||
|
-spec close(t()) -> ok.
|
||||||
|
close(Tab) ->
|
||||||
|
true = ets:delete(Tab),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec store(t(), emqx_guid:guid(), time(), topic(), binary()) ->
|
||||||
|
ok | {error, _TODO}.
|
||||||
|
store(Tab, MessageID, PublishedAt, Topic, Payload) ->
|
||||||
|
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec iterate(t(), emqx_topic:words(), time()) ->
|
||||||
|
[binary()].
|
||||||
|
iterate(Tab, TopicFilter, StartTime) ->
|
||||||
|
ets:foldr(
|
||||||
|
fun({{PublishedAt, _}, Topic, Payload}, Acc) ->
|
||||||
|
case emqx_topic:match(Topic, TopicFilter) of
|
||||||
|
true when PublishedAt >= StartTime ->
|
||||||
|
[Payload | Acc];
|
||||||
|
_ ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
Tab
|
||||||
|
).
|
|
@ -0,0 +1,426 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(prop_replay_message_storage).
|
||||||
|
|
||||||
|
-include_lib("proper/include/proper.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(WORK_DIR, ["_build", "test"]).
|
||||||
|
-define(RUN_ID, {?MODULE, testrun_id}).
|
||||||
|
-define(GEN_ID, 42).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Properties
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
prop_bitstring_computes() ->
|
||||||
|
?FORALL(Keymapper, keymapper(), begin
|
||||||
|
Bitsize = emqx_replay_message_storage:bitsize(Keymapper),
|
||||||
|
?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
|
||||||
|
BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
|
||||||
|
is_integer(BS) andalso (BS < (1 bsl Bitsize))
|
||||||
|
end)
|
||||||
|
end).
|
||||||
|
|
||||||
|
prop_topic_bitmask_computes() ->
|
||||||
|
Keymapper = make_keymapper(16, [8, 12, 16], 100),
|
||||||
|
?FORALL(TopicFilter, topic_filter(), begin
|
||||||
|
Mask = emqx_replay_message_storage:compute_topic_bitmask(TopicFilter, Keymapper),
|
||||||
|
% topic bits + timestamp LSBs
|
||||||
|
is_integer(Mask) andalso (Mask < (1 bsl (36 + 6)))
|
||||||
|
end).
|
||||||
|
|
||||||
|
prop_next_seek_monotonic() ->
|
||||||
|
?FORALL(
|
||||||
|
{TopicFilter, StartTime, Keymapper},
|
||||||
|
{topic_filter(), pos_integer(), keymapper()},
|
||||||
|
begin
|
||||||
|
Filter = emqx_replay_message_storage:make_keyspace_filter(
|
||||||
|
TopicFilter,
|
||||||
|
StartTime,
|
||||||
|
Keymapper
|
||||||
|
),
|
||||||
|
?FORALL(
|
||||||
|
Bitstring,
|
||||||
|
bitstr(emqx_replay_message_storage:bitsize(Keymapper)),
|
||||||
|
emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring
|
||||||
|
)
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
|
prop_next_seek_eq_initial_seek() ->
|
||||||
|
?FORALL(
|
||||||
|
Filter,
|
||||||
|
keyspace_filter(),
|
||||||
|
emqx_replay_message_storage:compute_initial_seek(Filter) =:=
|
||||||
|
emqx_replay_message_storage:compute_next_seek(0, Filter)
|
||||||
|
).
|
||||||
|
|
||||||
|
prop_iterate_messages() ->
|
||||||
|
TBPL = [4, 8, 12],
|
||||||
|
Options = #{
|
||||||
|
timestamp_bits => 32,
|
||||||
|
topic_bits_per_level => TBPL,
|
||||||
|
epoch => 200
|
||||||
|
},
|
||||||
|
% TODO
|
||||||
|
% Shrinking is too unpredictable and leaves a LOT of garbage in the scratch dit.
|
||||||
|
?FORALL(Stream, noshrink(non_empty(messages(topic(TBPL)))), begin
|
||||||
|
Filepath = make_filepath(?FUNCTION_NAME, erlang:system_time(microsecond)),
|
||||||
|
{DB, Handle} = open_db(Filepath, Options),
|
||||||
|
Shim = emqx_replay_message_storage_shim:open(),
|
||||||
|
ok = store_db(DB, Stream),
|
||||||
|
ok = store_shim(Shim, Stream),
|
||||||
|
?FORALL(
|
||||||
|
{
|
||||||
|
{Topic, _},
|
||||||
|
Pattern,
|
||||||
|
StartTime
|
||||||
|
},
|
||||||
|
{
|
||||||
|
nth(Stream),
|
||||||
|
topic_filter_pattern(),
|
||||||
|
start_time()
|
||||||
|
},
|
||||||
|
begin
|
||||||
|
TopicFilter = make_topic_filter(Pattern, Topic),
|
||||||
|
Messages = iterate_db(DB, TopicFilter, StartTime),
|
||||||
|
Reference = iterate_shim(Shim, TopicFilter, StartTime),
|
||||||
|
ok = close_db(Handle),
|
||||||
|
ok = emqx_replay_message_storage_shim:close(Shim),
|
||||||
|
?WHENFAIL(
|
||||||
|
begin
|
||||||
|
io:format(user, " *** Filepath = ~s~n", [Filepath]),
|
||||||
|
io:format(user, " *** TopicFilter = ~p~n", [TopicFilter]),
|
||||||
|
io:format(user, " *** StartTime = ~p~n", [StartTime])
|
||||||
|
end,
|
||||||
|
is_list(Messages) andalso equals(Messages -- Reference, Reference -- Messages)
|
||||||
|
)
|
||||||
|
end
|
||||||
|
)
|
||||||
|
end).
|
||||||
|
|
||||||
|
prop_iterate_eq_iterate_with_preserve_restore() ->
|
||||||
|
TBPL = [4, 8, 16, 12],
|
||||||
|
Options = #{
|
||||||
|
timestamp_bits => 32,
|
||||||
|
topic_bits_per_level => TBPL,
|
||||||
|
epoch => 500
|
||||||
|
},
|
||||||
|
{DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
|
||||||
|
?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
|
||||||
|
% TODO
|
||||||
|
% This proptest is impure because messages from testruns assumed to be
|
||||||
|
% independent of each other are accumulated in the same storage. This
|
||||||
|
% would probably confuse shrinker in the event a testrun fails.
|
||||||
|
ok = store_db(DB, Stream),
|
||||||
|
?FORALL(
|
||||||
|
{
|
||||||
|
{Topic, _},
|
||||||
|
Pat,
|
||||||
|
StartTime,
|
||||||
|
Commands
|
||||||
|
},
|
||||||
|
{
|
||||||
|
nth(Stream),
|
||||||
|
topic_filter_pattern(),
|
||||||
|
start_time(),
|
||||||
|
shuffled(flat([non_empty(list({preserve, restore})), list(iterate)]))
|
||||||
|
},
|
||||||
|
begin
|
||||||
|
TopicFilter = make_topic_filter(Pat, Topic),
|
||||||
|
Iterator = make_iterator(DB, TopicFilter, StartTime),
|
||||||
|
Messages = run_iterator_commands(Commands, Iterator, DB),
|
||||||
|
equals(Messages, iterate_db(DB, TopicFilter, StartTime))
|
||||||
|
end
|
||||||
|
)
|
||||||
|
end).
|
||||||
|
|
||||||
|
% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
|
||||||
|
% MessageID = emqx_guid:gen(),
|
||||||
|
% PublishedAt = ChunkNum,
|
||||||
|
% MessageID, PublishedAt, Topic
|
||||||
|
% ]),
|
||||||
|
% ok = emqx_replay_message_storage:store(DB, MessageID, PublishedAt, Topic, Payload),
|
||||||
|
% store_message_stream(DB, payload_gen:next(Rest));
|
||||||
|
% store_message_stream(_Zone, []) ->
|
||||||
|
% ok.
|
||||||
|
|
||||||
|
store_db(DB, Messages) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
|
||||||
|
Bin = term_to_binary(Payload),
|
||||||
|
emqx_replay_message_storage:store(DB, MessageID, Timestamp, Topic, Bin)
|
||||||
|
end,
|
||||||
|
Messages
|
||||||
|
).
|
||||||
|
|
||||||
|
iterate_db(DB, TopicFilter, StartTime) ->
|
||||||
|
iterate_db(make_iterator(DB, TopicFilter, StartTime)).
|
||||||
|
|
||||||
|
iterate_db(It) ->
|
||||||
|
case emqx_replay_message_storage:next(It) of
|
||||||
|
{value, Payload, ItNext} ->
|
||||||
|
[binary_to_term(Payload) | iterate_db(ItNext)];
|
||||||
|
none ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
make_iterator(DB, TopicFilter, StartTime) ->
|
||||||
|
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime),
|
||||||
|
It.
|
||||||
|
|
||||||
|
run_iterator_commands([iterate | Rest], It, DB) ->
|
||||||
|
case emqx_replay_message_storage:next(It) of
|
||||||
|
{value, Payload, ItNext} ->
|
||||||
|
[binary_to_term(Payload) | run_iterator_commands(Rest, ItNext, DB)];
|
||||||
|
none ->
|
||||||
|
[]
|
||||||
|
end;
|
||||||
|
run_iterator_commands([{preserve, restore} | Rest], It, DB) ->
|
||||||
|
Serial = emqx_replay_message_storage:preserve_iterator(It),
|
||||||
|
{ok, ItNext} = emqx_replay_message_storage:restore_iterator(DB, Serial),
|
||||||
|
run_iterator_commands(Rest, ItNext, DB);
|
||||||
|
run_iterator_commands([], It, _DB) ->
|
||||||
|
iterate_db(It).
|
||||||
|
|
||||||
|
store_shim(Shim, Messages) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun({Topic, Payload = {MessageID, Timestamp, _}}) ->
|
||||||
|
Bin = term_to_binary(Payload),
|
||||||
|
emqx_replay_message_storage_shim:store(Shim, MessageID, Timestamp, Topic, Bin)
|
||||||
|
end,
|
||||||
|
Messages
|
||||||
|
).
|
||||||
|
|
||||||
|
iterate_shim(Shim, TopicFilter, StartTime) ->
|
||||||
|
lists:map(
|
||||||
|
fun binary_to_term/1,
|
||||||
|
emqx_replay_message_storage_shim:iterate(Shim, TopicFilter, StartTime)
|
||||||
|
).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setup / teardown
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
open_db(Filepath, Options) ->
|
||||||
|
{ok, Handle} = rocksdb:open(Filepath, [{create_if_missing, true}]),
|
||||||
|
{Schema, CFRefs} = emqx_replay_message_storage:create_new(Handle, ?GEN_ID, Options),
|
||||||
|
DB = emqx_replay_message_storage:open(Handle, ?GEN_ID, CFRefs, Schema),
|
||||||
|
{DB, Handle}.
|
||||||
|
|
||||||
|
close_db(Handle) ->
|
||||||
|
rocksdb:close(Handle).
|
||||||
|
|
||||||
|
make_filepath(TC) ->
|
||||||
|
make_filepath(TC, 0).
|
||||||
|
|
||||||
|
make_filepath(TC, InstID) ->
|
||||||
|
Name = io_lib:format("~0p.~0p", [TC, InstID]),
|
||||||
|
Path = filename:join(?WORK_DIR ++ ["proper", "runs", get_run_id(), ?MODULE_STRING, Name]),
|
||||||
|
ok = filelib:ensure_dir(Path),
|
||||||
|
Path.
|
||||||
|
|
||||||
|
get_run_id() ->
|
||||||
|
case persistent_term:get(?RUN_ID, undefined) of
|
||||||
|
RunID when RunID /= undefined ->
|
||||||
|
RunID;
|
||||||
|
undefined ->
|
||||||
|
RunID = make_run_id(),
|
||||||
|
ok = persistent_term:put(?RUN_ID, RunID),
|
||||||
|
RunID
|
||||||
|
end.
|
||||||
|
|
||||||
|
make_run_id() ->
|
||||||
|
calendar:system_time_to_rfc3339(erlang:system_time(second), [{offset, "Z"}]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Type generators
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
topic() ->
|
||||||
|
non_empty(list(topic_level())).
|
||||||
|
|
||||||
|
topic(EntropyWeights) ->
|
||||||
|
?LET(L, scaled(1 / 4, list(1)), begin
|
||||||
|
EWs = lists:sublist(EntropyWeights ++ L, length(L)),
|
||||||
|
?SIZED(S, [oneof([topic_level(S * EW), topic_level_fixed()]) || EW <- EWs])
|
||||||
|
end).
|
||||||
|
|
||||||
|
topic_filter() ->
|
||||||
|
?SUCHTHAT(
|
||||||
|
L,
|
||||||
|
non_empty(
|
||||||
|
list(
|
||||||
|
frequency([
|
||||||
|
{5, topic_level()},
|
||||||
|
{2, '+'},
|
||||||
|
{1, '#'}
|
||||||
|
])
|
||||||
|
)
|
||||||
|
),
|
||||||
|
not lists:member('#', L) orelse lists:last(L) == '#'
|
||||||
|
).
|
||||||
|
|
||||||
|
topic_level_pattern() ->
|
||||||
|
frequency([
|
||||||
|
{5, level},
|
||||||
|
{2, '+'},
|
||||||
|
{1, '#'}
|
||||||
|
]).
|
||||||
|
|
||||||
|
topic_filter_pattern() ->
|
||||||
|
list(topic_level_pattern()).
|
||||||
|
|
||||||
|
topic_filter(Topic) ->
|
||||||
|
?LET({T, Pat}, {Topic, topic_filter_pattern()}, make_topic_filter(Pat, T)).
|
||||||
|
|
||||||
|
make_topic_filter([], _) ->
|
||||||
|
[];
|
||||||
|
make_topic_filter(_, []) ->
|
||||||
|
[];
|
||||||
|
make_topic_filter(['#' | _], _) ->
|
||||||
|
['#'];
|
||||||
|
make_topic_filter(['+' | Rest], [_ | Levels]) ->
|
||||||
|
['+' | make_topic_filter(Rest, Levels)];
|
||||||
|
make_topic_filter([level | Rest], [L | Levels]) ->
|
||||||
|
[L | make_topic_filter(Rest, Levels)].
|
||||||
|
|
||||||
|
% topic() ->
|
||||||
|
% ?LAZY(?SIZED(S, frequency([
|
||||||
|
% {S, [topic_level() | topic()]},
|
||||||
|
% {1, []}
|
||||||
|
% ]))).
|
||||||
|
|
||||||
|
% topic_filter() ->
|
||||||
|
% ?LAZY(?SIZED(S, frequency([
|
||||||
|
% {round(S / 3 * 2), [topic_level() | topic_filter()]},
|
||||||
|
% {round(S / 3 * 1), ['+' | topic_filter()]},
|
||||||
|
% {1, []},
|
||||||
|
% {1, ['#']}
|
||||||
|
% ]))).
|
||||||
|
|
||||||
|
topic_level() ->
|
||||||
|
?LET(L, list(oneof([range($a, $z), range($0, $9)])), iolist_to_binary(L)).
|
||||||
|
|
||||||
|
topic_level(Entropy) ->
|
||||||
|
S = floor(1 + math:log2(Entropy) / 4),
|
||||||
|
?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).
|
||||||
|
|
||||||
|
topic_level_fixed() ->
|
||||||
|
oneof([
|
||||||
|
<<"foo">>,
|
||||||
|
<<"bar">>,
|
||||||
|
<<"baz">>,
|
||||||
|
<<"xyzzy">>
|
||||||
|
]).
|
||||||
|
|
||||||
|
keymapper() ->
|
||||||
|
?LET(
|
||||||
|
{TimestampBits, TopicBits, Epoch},
|
||||||
|
{
|
||||||
|
range(0, 128),
|
||||||
|
non_empty(list(range(1, 32))),
|
||||||
|
pos_integer()
|
||||||
|
},
|
||||||
|
make_keymapper(TimestampBits, TopicBits, Epoch * 100)
|
||||||
|
).
|
||||||
|
|
||||||
|
keyspace_filter() ->
|
||||||
|
?LET(
|
||||||
|
{TopicFilter, StartTime, Keymapper},
|
||||||
|
{topic_filter(), pos_integer(), keymapper()},
|
||||||
|
emqx_replay_message_storage:make_keyspace_filter(TopicFilter, StartTime, Keymapper)
|
||||||
|
).
|
||||||
|
|
||||||
|
messages(Topic) ->
|
||||||
|
?LET(
|
||||||
|
Ts,
|
||||||
|
list(Topic),
|
||||||
|
interleaved(
|
||||||
|
?LET(Messages, vector(length(Ts), scaled(4, list(message()))), lists:zip(Ts, Messages))
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
message() ->
|
||||||
|
?LET({Timestamp, Payload}, {timestamp(), binary()}, {emqx_guid:gen(), Timestamp, Payload}).
|
||||||
|
|
||||||
|
message_streams(Topic) ->
|
||||||
|
?LET(Topics, list(Topic), [{T, payload_gen:binary_stream_gen(64)} || T <- Topics]).
|
||||||
|
|
||||||
|
timestamp() ->
|
||||||
|
scaled(20, pos_integer()).
|
||||||
|
|
||||||
|
start_time() ->
|
||||||
|
scaled(10, pos_integer()).
|
||||||
|
|
||||||
|
bitstr(Size) ->
|
||||||
|
?LET(B, binary(1 + (Size div 8)), binary:decode_unsigned(B) band (1 bsl Size - 1)).
|
||||||
|
|
||||||
|
nth(L) ->
|
||||||
|
?LET(I, range(1, length(L)), lists:nth(I, L)).
|
||||||
|
|
||||||
|
scaled(Factor, T) ->
|
||||||
|
?SIZED(S, resize(ceil(S * Factor), T)).
|
||||||
|
|
||||||
|
interleaved(T) ->
|
||||||
|
?LET({L, Seed}, {T, integer()}, interleave(L, rand:seed_s(exsss, Seed))).
|
||||||
|
|
||||||
|
shuffled(T) ->
|
||||||
|
?LET({L, Seed}, {T, integer()}, shuffle(L, rand:seed_s(exsss, Seed))).
|
||||||
|
|
||||||
|
flat(T) ->
|
||||||
|
?LET(L, T, lists:flatten(L)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
make_keymapper(TimestampBits, TopicBits, MaxEpoch) ->
|
||||||
|
emqx_replay_message_storage:make_keymapper(#{
|
||||||
|
timestamp_bits => TimestampBits,
|
||||||
|
topic_bits_per_level => TopicBits,
|
||||||
|
epoch => MaxEpoch
|
||||||
|
}).
|
||||||
|
|
||||||
|
-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}).
|
||||||
|
interleave(Seqs, Rng) ->
|
||||||
|
interleave(Seqs, length(Seqs), Rng).
|
||||||
|
|
||||||
|
interleave(Seqs, L, Rng) when L > 0 ->
|
||||||
|
{N, RngNext} = rand:uniform_s(L, Rng),
|
||||||
|
{SeqHead, SeqTail} = lists:split(N - 1, Seqs),
|
||||||
|
case SeqTail of
|
||||||
|
[{Tag, [M | Rest]} | SeqRest] ->
|
||||||
|
[{Tag, M} | interleave(SeqHead ++ [{Tag, Rest} | SeqRest], L, RngNext)];
|
||||||
|
[{_, []} | SeqRest] ->
|
||||||
|
interleave(SeqHead ++ SeqRest, L - 1, RngNext)
|
||||||
|
end;
|
||||||
|
interleave([], 0, _) ->
|
||||||
|
[].
|
||||||
|
|
||||||
|
-spec shuffle(list(E), rand:state()) -> list(E).
|
||||||
|
shuffle(L, Rng) ->
|
||||||
|
{Rands, _} = randoms(length(L), Rng),
|
||||||
|
[E || {_, E} <- lists:sort(lists:zip(Rands, L))].
|
||||||
|
|
||||||
|
randoms(N, Rng) when N > 0 ->
|
||||||
|
{Rand, RngNext} = rand:uniform_s(Rng),
|
||||||
|
{Tail, RngFinal} = randoms(N - 1, RngNext),
|
||||||
|
{[Rand | Tail], RngFinal};
|
||||||
|
randoms(_, Rng) ->
|
||||||
|
{[], Rng}.
|
Loading…
Reference in New Issue