test(ds): verify preserve / restore works with stored iterators

This commit is contained in:
Andrew Mayorov 2023-02-09 21:35:11 +03:00 committed by ieQu1
parent d2065e0c1b
commit 8ac0bba958
1 changed files with 53 additions and 2 deletions

View File

@ -190,13 +190,49 @@ t_iterate_multigen(_Config) ->
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
).
t_iterate_multigen_preserve_restore(_Config) ->
ReplayID = atom_to_binary(?FUNCTION_NAME),
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 100, ?DEFAULT_CONFIG),
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
Timestamps = lists:seq(1, 100),
TopicFilter = "foo/#",
TopicsMatching = ["foo/bar", "foo/bar/baz"],
_ = [
store(?ZONE, TS, Topic, term_to_binary({Topic, TS}))
|| Topic <- Topics, TS <- Timestamps
],
It0 = iterator(?ZONE, TopicFilter, 0),
{It1, Res10} = iterate(It0, 10),
% preserve mid-generation
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID),
{ok, It2} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
{It3, Res100} = iterate(It2, 88),
% preserve on the generation boundary
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID),
{ok, It4} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
{It5, Res200} = iterate(It4, 1000),
?assertEqual(none, It5),
?assertEqual(
lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
),
?assertEqual(
ok,
emqx_replay_local_store:discard_iterator(?ZONE, ReplayID)
),
?assertEqual(
{error, not_found},
emqx_replay_local_store:restore_iterator(?ZONE, ReplayID)
).
store(Zone, PublishedAt, Topic, Payload) ->
ID = emqx_guid:gen(),
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
iterate(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
iterate(It).
iterate(iterator(DB, TopicFilter, StartTime)).
iterate(It) ->
case emqx_replay_local_store:next(It) of
@ -206,6 +242,21 @@ iterate(It) ->
[]
end.
iterate(It, 0) ->
{It, []};
iterate(It, N) ->
case emqx_replay_local_store:next(It) of
{value, Payload, ItNext} ->
{ItFinal, Ps} = iterate(ItNext, N - 1),
{ItFinal, [Payload | Ps]};
none ->
{none, []}
end.
iterator(DB, TopicFilter, StartTime) ->
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
It.
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
Topic;
parse_topic(Topic) ->