diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 64d81307c..3df16dc1c 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -98,8 +98,8 @@ t_03_smoke_iterate(_Config) -> ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), - {ok, Iter, Batch} = iterate(DB, Iter0, 1), - ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). + {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0), + ?assertEqual(Msgs, Batch, {Iter0, Iter}). %% Verify that iterators survive restart of the application. This is %% an important property, since the lifetime of the iterators is tied @@ -125,8 +125,8 @@ t_04_restart(_Config) -> {ok, _} = application:ensure_all_started(emqx_durable_storage), ok = emqx_ds:open_db(DB, opts()), %% The old iterator should be still operational: - {ok, Iter, Batch} = iterate(DB, Iter0, 1), - ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). + {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0), + ?assertEqual(Msgs, Batch, {Iter0, Iter}). %% Check that we can create iterators directly from DS keys. t_05_update_iterator(_Config) -> @@ -148,9 +148,8 @@ t_05_update_iterator(_Config) -> Res1 = emqx_ds:update_iterator(DB, OldIter, Key0), ?assertMatch({ok, _Iter1}, Res1), {ok, Iter1} = Res1, - {ok, FinalIter, Batch} = iterate(DB, Iter1, 1), - AllMsgs = [Msg0 | [Msg || {_Key, Msg} <- Batch]], - ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}), + {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}), + ?assertEqual(Msgs, [Msg0 | Batch], #{from_key => Iter1, final_iter => Iter}), ok. t_06_update_config(_Config) -> @@ -190,9 +189,9 @@ t_06_update_config(_Config) -> ), Checker = fun({StartTime, Msgs0}, Acc) -> - Msgs = Msgs0 ++ Acc, - Batch = fetch_all(DB, TopicFilter, StartTime), - ?assertEqual(Msgs, Batch, {StartTime}), + Msgs = Acc ++ Msgs0, + Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime), + ?assertEqual(Msgs, Batch, StartTime), Msgs end, lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). @@ -234,9 +233,9 @@ t_07_add_generation(_Config) -> ), Checker = fun({StartTime, Msgs0}, Acc) -> - Msgs = Msgs0 ++ Acc, - Batch = fetch_all(DB, TopicFilter, StartTime), - ?assertEqual(Msgs, Batch, {StartTime}), + Msgs = Acc ++ Msgs0, + Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime), + ?assertEqual(Msgs, Batch, StartTime), Msgs end, lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). @@ -398,9 +397,8 @@ t_smoke_delete_next(_Config) -> TopicFilterHash = ['#'], [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilterHash, StartTime), - {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilterHash, StartTime), - {ok, _Iter, Batch} = iterate(DB, Iter0, 1), - ?assertEqual([Msg1, Msg3], [Msg || {_Key, Msg} <- Batch]), + Batch = emqx_ds_test_helpers:consume_stream(DB, Stream, TopicFilterHash, StartTime), + ?assertEqual([Msg1, Msg3], Batch), ok = emqx_ds:add_generation(DB), @@ -444,9 +442,9 @@ t_drop_generation_with_never_used_iterator(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), - ?assertMatch( - {error, unrecoverable, generation_not_found, []}, - iterate(DB, Iter0, 1) + ?assertError( + {error, unrecoverable, generation_not_found}, + emqx_ds_test_helpers:consume_iter(DB, Iter0) ), %% New iterator for the new stream will only see the later messages. @@ -454,9 +452,9 @@ t_drop_generation_with_never_used_iterator(_Config) -> ?assertNotEqual(Stream0, Stream1), {ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime), - {ok, Iter, Batch} = iterate(DB, Iter1, 1), + {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}), ?assertNotEqual(end_of_stream, Iter), - ?assertEqual(Msgs1, [Msg || {_Key, Msg} <- Batch]), + ?assertEqual(Msgs1, Batch), ok. @@ -496,9 +494,9 @@ t_drop_generation_with_used_once_iterator(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), - ?assertMatch( - {error, unrecoverable, generation_not_found, []}, - iterate(DB, Iter1, 1) + ?assertError( + {error, unrecoverable, generation_not_found}, + emqx_ds_test_helpers:consume_iter(DB, Iter1) ). t_drop_generation_update_iterator(_Config) -> @@ -702,25 +700,6 @@ update_data_set() -> ] ]. -fetch_all(DB, TopicFilter, StartTime) -> - Streams0 = emqx_ds:get_streams(DB, TopicFilter, StartTime), - Streams = lists:sort( - fun({{_, A}, _}, {{_, B}, _}) -> - A < B - end, - Streams0 - ), - lists:foldl( - fun({_, Stream}, Acc) -> - {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), - {ok, _, Msgs0} = iterate(DB, Iter0, StartTime), - Msgs = lists:map(fun({_, Msg}) -> Msg end, Msgs0), - Acc ++ Msgs - end, - [], - Streams - ). - message(ClientId, Topic, Payload, PublishedAt) -> Msg = message(Topic, Payload, PublishedAt), Msg#message{from = ClientId}. @@ -733,21 +712,6 @@ message(Topic, Payload, PublishedAt) -> id = emqx_guid:gen() }. -iterate(DB, It, BatchSize) -> - iterate(DB, It, BatchSize, []). - -iterate(DB, It0, BatchSize, Acc) -> - case emqx_ds:next(DB, It0, BatchSize) of - {ok, It, []} -> - {ok, It, Acc}; - {ok, It, Msgs} -> - iterate(DB, It, BatchSize, Acc ++ Msgs); - {ok, end_of_stream} -> - {ok, end_of_stream, Acc}; - {error, Class, Reason} -> - {error, Class, Reason, Acc} - end. - delete(DB, It, Selector, BatchSize) -> delete(DB, It, Selector, BatchSize, 0). diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 5ff1d5fb2..24e7cdafb 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -136,28 +136,7 @@ message(Topic, Payload, PublishedAt) -> }. consume(Node, DB, Shard, TopicFilter, StartTime) -> - Streams = erpc:call(Node, emqx_ds_storage_layer, get_streams, [ - {DB, Shard}, TopicFilter, StartTime - ]), - lists:flatmap( - fun({_Rank, Stream}) -> - {ok, It} = erpc:call(Node, emqx_ds_storage_layer, make_iterator, [ - {DB, Shard}, Stream, TopicFilter, StartTime - ]), - consume_stream(Node, DB, Shard, It) - end, - Streams - ). - -consume_stream(Node, DB, Shard, It) -> - case erpc:call(Node, emqx_ds_storage_layer, next, [{DB, Shard}, It, 100]) of - {ok, _NIt, _Msgs = []} -> - []; - {ok, NIt, Batch} -> - [Msg || {_Key, Msg} <- Batch] ++ consume_stream(Node, DB, Shard, NIt); - {ok, end_of_stream} -> - [] - end. + erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]). probably(P, Fun) -> case rand:uniform() of diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl index a290a4c30..eaddab0c6 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl @@ -42,7 +42,7 @@ t_idempotent_store_batch(_Config) -> %% First batch should have been handled idempotently. ?assertEqual( Msgs1 ++ Msgs2, - lists:keysort(#message.timestamp, consume(Shard, ['#'])) + lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#'])) ), ok = stop_shard(Pid). @@ -79,7 +79,7 @@ t_snapshot_take_restore(_Config) -> {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), ?assertEqual( Msgs1 ++ Msgs2, - lists:keysort(#message.timestamp, consume(Shard, ['#'])) + lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#'])) ). transfer_snapshot(Reader, Writer) -> @@ -127,29 +127,6 @@ message(Topic, Payload, PublishedAt) -> id = emqx_guid:gen() }. -consume(Shard, TopicFilter) -> - consume(Shard, TopicFilter, 0). - -consume(Shard, TopicFilter, StartTime) -> - Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime), - lists:flatmap( - fun({_Rank, Stream}) -> - {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime), - consume_stream(Shard, It) - end, - Streams - ). - -consume_stream(Shard, It) -> - case emqx_ds_storage_layer:next(Shard, It, 100) of - {ok, _NIt, _Msgs = []} -> - []; - {ok, NIt, Batch} -> - [Msg || {_DSKey, Msg} <- Batch] ++ consume_stream(Shard, NIt); - {ok, end_of_stream} -> - [] - end. - stop_shard(Pid) -> _ = unlink(Pid), proc_lib:stop(Pid, shutdown, infinity). diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index d26c6dd30..44c45248b 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -56,3 +56,63 @@ mock_rpc_result(gen_rpc, ExpectFun) -> {badrpc, timeout} end end). + +%% Consuming streams and iterators + +consume(DB, TopicFilter) -> + consume(DB, TopicFilter, 0). + +consume(DB, TopicFilter, StartTime) -> + Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), + lists:flatmap( + fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end, + Streams + ). + +consume_stream(DB, Stream, TopicFilter, StartTime) -> + {ok, It0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), + {ok, _It, Msgs} = consume_iter(DB, It0), + Msgs. + +consume_iter(DB, It) -> + consume_iter(DB, It, #{}). + +consume_iter(DB, It, Opts) -> + consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts). + +storage_consume(ShardId, TopicFilter) -> + storage_consume(ShardId, TopicFilter, 0). + +storage_consume(ShardId, TopicFilter, StartTime) -> + Streams = emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime), + lists:flatmap( + fun({_Rank, Stream}) -> + storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) + end, + Streams + ). + +storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) -> + {ok, It0} = emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime), + {ok, _It, Msgs} = storage_consume_iter(ShardId, It0), + Msgs. + +storage_consume_iter(ShardId, It) -> + storage_consume_iter(ShardId, It, #{}). + +storage_consume_iter(ShardId, It, Opts) -> + consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts). + +consume_iter_with(NextFun, Args, It0, Opts) -> + BatchSize = maps:get(batch_size, Opts, 5), + case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of + {ok, It, _Msgs = []} -> + {ok, It, []}; + {ok, It1, Batch} -> + {ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts), + {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs}; + {ok, Eos = end_of_stream} -> + {ok, Eos, []}; + {error, Class, Reason} -> + error({error, Class, Reason}) + end.