test(replay): verify preserve / restore works with stored iterators
This commit is contained in:
parent
1f033f92b5
commit
cfd23d76d3
|
@ -190,13 +190,49 @@ t_iterate_multigen(_Config) ->
|
||||||
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
|
lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)])
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_iterate_multigen_preserve_restore(_Config) ->
|
||||||
|
ReplayID = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
{ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG),
|
||||||
|
{ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG),
|
||||||
|
{ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 100, ?DEFAULT_CONFIG),
|
||||||
|
Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
||||||
|
Timestamps = lists:seq(1, 100),
|
||||||
|
TopicFilter = "foo/#",
|
||||||
|
TopicsMatching = ["foo/bar", "foo/bar/baz"],
|
||||||
|
_ = [
|
||||||
|
store(?ZONE, TS, Topic, term_to_binary({Topic, TS}))
|
||||||
|
|| Topic <- Topics, TS <- Timestamps
|
||||||
|
],
|
||||||
|
It0 = iterator(?ZONE, TopicFilter, 0),
|
||||||
|
{It1, Res10} = iterate(It0, 10),
|
||||||
|
% preserve mid-generation
|
||||||
|
ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID),
|
||||||
|
{ok, It2} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
|
||||||
|
{It3, Res100} = iterate(It2, 88),
|
||||||
|
% preserve on the generation boundary
|
||||||
|
ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID),
|
||||||
|
{ok, It4} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID),
|
||||||
|
{It5, Res200} = iterate(It4, 1000),
|
||||||
|
?assertEqual(none, It5),
|
||||||
|
?assertEqual(
|
||||||
|
lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
|
||||||
|
lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
ok,
|
||||||
|
emqx_replay_local_store:discard_iterator(?ZONE, ReplayID)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
{error, not_found},
|
||||||
|
emqx_replay_local_store:restore_iterator(?ZONE, ReplayID)
|
||||||
|
).
|
||||||
|
|
||||||
store(Zone, PublishedAt, Topic, Payload) ->
|
store(Zone, PublishedAt, Topic, Payload) ->
|
||||||
ID = emqx_guid:gen(),
|
ID = emqx_guid:gen(),
|
||||||
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
|
emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload).
|
||||||
|
|
||||||
iterate(DB, TopicFilter, StartTime) ->
|
iterate(DB, TopicFilter, StartTime) ->
|
||||||
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
|
iterate(iterator(DB, TopicFilter, StartTime)).
|
||||||
iterate(It).
|
|
||||||
|
|
||||||
iterate(It) ->
|
iterate(It) ->
|
||||||
case emqx_replay_local_store:next(It) of
|
case emqx_replay_local_store:next(It) of
|
||||||
|
@ -206,6 +242,21 @@ iterate(It) ->
|
||||||
[]
|
[]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
iterate(It, 0) ->
|
||||||
|
{It, []};
|
||||||
|
iterate(It, N) ->
|
||||||
|
case emqx_replay_local_store:next(It) of
|
||||||
|
{value, Payload, ItNext} ->
|
||||||
|
{ItFinal, Ps} = iterate(ItNext, N - 1),
|
||||||
|
{ItFinal, [Payload | Ps]};
|
||||||
|
none ->
|
||||||
|
{none, []}
|
||||||
|
end.
|
||||||
|
|
||||||
|
iterator(DB, TopicFilter, StartTime) ->
|
||||||
|
{ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}),
|
||||||
|
It.
|
||||||
|
|
||||||
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
|
parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
|
||||||
Topic;
|
Topic;
|
||||||
parse_topic(Topic) ->
|
parse_topic(Topic) ->
|
||||||
|
|
Loading…
Reference in New Issue