diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index f36dda267..afed30b88 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -190,13 +190,49 @@ t_iterate_multigen(_Config) -> lists:sort([binary_to_term(Payload) || Payload <- iterate(?ZONE, "a/#", 60)]) ). +t_iterate_multigen_preserve_restore(_Config) -> + ReplayID = atom_to_binary(?FUNCTION_NAME), + {ok, 1} = emqx_replay_local_store:create_generation(?ZONE, 10, ?COMPACT_CONFIG), + {ok, 2} = emqx_replay_local_store:create_generation(?ZONE, 50, ?DEFAULT_CONFIG), + {ok, 3} = emqx_replay_local_store:create_generation(?ZONE, 100, ?DEFAULT_CONFIG), + Topics = ["foo/bar", "foo/bar/baz", "a/bar"], + Timestamps = lists:seq(1, 100), + TopicFilter = "foo/#", + TopicsMatching = ["foo/bar", "foo/bar/baz"], + _ = [ + store(?ZONE, TS, Topic, term_to_binary({Topic, TS})) + || Topic <- Topics, TS <- Timestamps + ], + It0 = iterator(?ZONE, TopicFilter, 0), + {It1, Res10} = iterate(It0, 10), + % preserve mid-generation + ok = emqx_replay_local_store:preserve_iterator(It1, ReplayID), + {ok, It2} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID), + {It3, Res100} = iterate(It2, 88), + % preserve on the generation boundary + ok = emqx_replay_local_store:preserve_iterator(It3, ReplayID), + {ok, It4} = emqx_replay_local_store:restore_iterator(?ZONE, ReplayID), + {It5, Res200} = iterate(It4, 1000), + ?assertEqual(none, It5), + ?assertEqual( + lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]), + lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200]) + ), + ?assertEqual( + ok, + emqx_replay_local_store:discard_iterator(?ZONE, ReplayID) + ), + ?assertEqual( + {error, not_found}, + emqx_replay_local_store:restore_iterator(?ZONE, ReplayID) + ). + store(Zone, PublishedAt, Topic, Payload) -> ID = emqx_guid:gen(), emqx_replay_local_store:store(Zone, ID, PublishedAt, parse_topic(Topic), Payload). iterate(DB, TopicFilter, StartTime) -> - {ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), - iterate(It). + iterate(iterator(DB, TopicFilter, StartTime)). iterate(It) -> case emqx_replay_local_store:next(It) of @@ -206,6 +242,21 @@ iterate(It) -> [] end. +iterate(It, 0) -> + {It, []}; +iterate(It, N) -> + case emqx_replay_local_store:next(It) of + {value, Payload, ItNext} -> + {ItFinal, Ps} = iterate(ItNext, N - 1), + {ItFinal, [Payload | Ps]}; + none -> + {none, []} + end. + +iterator(DB, TopicFilter, StartTime) -> + {ok, It} = emqx_replay_local_store:make_iterator(DB, {parse_topic(TopicFilter), StartTime}), + It. + parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> Topic; parse_topic(Topic) ->