test(ds): factor out storage iteration into helper module

This commit is contained in:
Andrew Mayorov 2024-03-28 15:11:45 +01:00
parent 7cebf598a8
commit c666c65c6a
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 85 additions and 105 deletions

View File

@ -98,8 +98,8 @@ t_03_smoke_iterate(_Config) ->
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
{ok, Iter, Batch} = iterate(DB, Iter0, 1), {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). ?assertEqual(Msgs, Batch, {Iter0, Iter}).
%% Verify that iterators survive restart of the application. This is %% Verify that iterators survive restart of the application. This is
%% an important property, since the lifetime of the iterators is tied %% an important property, since the lifetime of the iterators is tied
@ -125,8 +125,8 @@ t_04_restart(_Config) ->
{ok, _} = application:ensure_all_started(emqx_durable_storage), {ok, _} = application:ensure_all_started(emqx_durable_storage),
ok = emqx_ds:open_db(DB, opts()), ok = emqx_ds:open_db(DB, opts()),
%% The old iterator should be still operational: %% The old iterator should be still operational:
{ok, Iter, Batch} = iterate(DB, Iter0, 1), {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). ?assertEqual(Msgs, Batch, {Iter0, Iter}).
%% Check that we can create iterators directly from DS keys. %% Check that we can create iterators directly from DS keys.
t_05_update_iterator(_Config) -> t_05_update_iterator(_Config) ->
@ -148,9 +148,8 @@ t_05_update_iterator(_Config) ->
Res1 = emqx_ds:update_iterator(DB, OldIter, Key0), Res1 = emqx_ds:update_iterator(DB, OldIter, Key0),
?assertMatch({ok, _Iter1}, Res1), ?assertMatch({ok, _Iter1}, Res1),
{ok, Iter1} = Res1, {ok, Iter1} = Res1,
{ok, FinalIter, Batch} = iterate(DB, Iter1, 1), {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}),
AllMsgs = [Msg0 | [Msg || {_Key, Msg} <- Batch]], ?assertEqual(Msgs, [Msg0 | Batch], #{from_key => Iter1, final_iter => Iter}),
?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
ok. ok.
t_06_update_config(_Config) -> t_06_update_config(_Config) ->
@ -190,9 +189,9 @@ t_06_update_config(_Config) ->
), ),
Checker = fun({StartTime, Msgs0}, Acc) -> Checker = fun({StartTime, Msgs0}, Acc) ->
Msgs = Msgs0 ++ Acc, Msgs = Acc ++ Msgs0,
Batch = fetch_all(DB, TopicFilter, StartTime), Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime),
?assertEqual(Msgs, Batch, {StartTime}), ?assertEqual(Msgs, Batch, StartTime),
Msgs Msgs
end, end,
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
@ -234,9 +233,9 @@ t_07_add_generation(_Config) ->
), ),
Checker = fun({StartTime, Msgs0}, Acc) -> Checker = fun({StartTime, Msgs0}, Acc) ->
Msgs = Msgs0 ++ Acc, Msgs = Acc ++ Msgs0,
Batch = fetch_all(DB, TopicFilter, StartTime), Batch = emqx_ds_test_helpers:consume(DB, TopicFilter, StartTime),
?assertEqual(Msgs, Batch, {StartTime}), ?assertEqual(Msgs, Batch, StartTime),
Msgs Msgs
end, end,
lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)).
@ -398,9 +397,8 @@ t_smoke_delete_next(_Config) ->
TopicFilterHash = ['#'], TopicFilterHash = ['#'],
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilterHash, StartTime), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilterHash, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilterHash, StartTime), Batch = emqx_ds_test_helpers:consume_stream(DB, Stream, TopicFilterHash, StartTime),
{ok, _Iter, Batch} = iterate(DB, Iter0, 1), ?assertEqual([Msg1, Msg3], Batch),
?assertEqual([Msg1, Msg3], [Msg || {_Key, Msg} <- Batch]),
ok = emqx_ds:add_generation(DB), ok = emqx_ds:add_generation(DB),
@ -444,9 +442,9 @@ t_drop_generation_with_never_used_iterator(_Config) ->
], ],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
?assertMatch( ?assertError(
{error, unrecoverable, generation_not_found, []}, {error, unrecoverable, generation_not_found},
iterate(DB, Iter0, 1) emqx_ds_test_helpers:consume_iter(DB, Iter0)
), ),
%% New iterator for the new stream will only see the later messages. %% New iterator for the new stream will only see the later messages.
@ -454,9 +452,9 @@ t_drop_generation_with_never_used_iterator(_Config) ->
?assertNotEqual(Stream0, Stream1), ?assertNotEqual(Stream0, Stream1),
{ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime), {ok, Iter1} = emqx_ds:make_iterator(DB, Stream1, TopicFilter, StartTime),
{ok, Iter, Batch} = iterate(DB, Iter1, 1), {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter1, #{batch_size => 1}),
?assertNotEqual(end_of_stream, Iter), ?assertNotEqual(end_of_stream, Iter),
?assertEqual(Msgs1, [Msg || {_Key, Msg} <- Batch]), ?assertEqual(Msgs1, Batch),
ok. ok.
@ -496,9 +494,9 @@ t_drop_generation_with_used_once_iterator(_Config) ->
], ],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)),
?assertMatch( ?assertError(
{error, unrecoverable, generation_not_found, []}, {error, unrecoverable, generation_not_found},
iterate(DB, Iter1, 1) emqx_ds_test_helpers:consume_iter(DB, Iter1)
). ).
t_drop_generation_update_iterator(_Config) -> t_drop_generation_update_iterator(_Config) ->
@ -702,25 +700,6 @@ update_data_set() ->
] ]
]. ].
fetch_all(DB, TopicFilter, StartTime) ->
Streams0 = emqx_ds:get_streams(DB, TopicFilter, StartTime),
Streams = lists:sort(
fun({{_, A}, _}, {{_, B}, _}) ->
A < B
end,
Streams0
),
lists:foldl(
fun({_, Stream}, Acc) ->
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
{ok, _, Msgs0} = iterate(DB, Iter0, StartTime),
Msgs = lists:map(fun({_, Msg}) -> Msg end, Msgs0),
Acc ++ Msgs
end,
[],
Streams
).
message(ClientId, Topic, Payload, PublishedAt) -> message(ClientId, Topic, Payload, PublishedAt) ->
Msg = message(Topic, Payload, PublishedAt), Msg = message(Topic, Payload, PublishedAt),
Msg#message{from = ClientId}. Msg#message{from = ClientId}.
@ -733,21 +712,6 @@ message(Topic, Payload, PublishedAt) ->
id = emqx_guid:gen() id = emqx_guid:gen()
}. }.
iterate(DB, It, BatchSize) ->
iterate(DB, It, BatchSize, []).
iterate(DB, It0, BatchSize, Acc) ->
case emqx_ds:next(DB, It0, BatchSize) of
{ok, It, []} ->
{ok, It, Acc};
{ok, It, Msgs} ->
iterate(DB, It, BatchSize, Acc ++ Msgs);
{ok, end_of_stream} ->
{ok, end_of_stream, Acc};
{error, Class, Reason} ->
{error, Class, Reason, Acc}
end.
delete(DB, It, Selector, BatchSize) -> delete(DB, It, Selector, BatchSize) ->
delete(DB, It, Selector, BatchSize, 0). delete(DB, It, Selector, BatchSize, 0).

View File

@ -136,28 +136,7 @@ message(Topic, Payload, PublishedAt) ->
}. }.
consume(Node, DB, Shard, TopicFilter, StartTime) -> consume(Node, DB, Shard, TopicFilter, StartTime) ->
Streams = erpc:call(Node, emqx_ds_storage_layer, get_streams, [ erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]).
{DB, Shard}, TopicFilter, StartTime
]),
lists:flatmap(
fun({_Rank, Stream}) ->
{ok, It} = erpc:call(Node, emqx_ds_storage_layer, make_iterator, [
{DB, Shard}, Stream, TopicFilter, StartTime
]),
consume_stream(Node, DB, Shard, It)
end,
Streams
).
consume_stream(Node, DB, Shard, It) ->
case erpc:call(Node, emqx_ds_storage_layer, next, [{DB, Shard}, It, 100]) of
{ok, _NIt, _Msgs = []} ->
[];
{ok, NIt, Batch} ->
[Msg || {_Key, Msg} <- Batch] ++ consume_stream(Node, DB, Shard, NIt);
{ok, end_of_stream} ->
[]
end.
probably(P, Fun) -> probably(P, Fun) ->
case rand:uniform() of case rand:uniform() of

View File

@ -42,7 +42,7 @@ t_idempotent_store_batch(_Config) ->
%% First batch should have been handled idempotently. %% First batch should have been handled idempotently.
?assertEqual( ?assertEqual(
Msgs1 ++ Msgs2, Msgs1 ++ Msgs2,
lists:keysort(#message.timestamp, consume(Shard, ['#'])) lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
), ),
ok = stop_shard(Pid). ok = stop_shard(Pid).
@ -79,7 +79,7 @@ t_snapshot_take_restore(_Config) ->
{ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
?assertEqual( ?assertEqual(
Msgs1 ++ Msgs2, Msgs1 ++ Msgs2,
lists:keysort(#message.timestamp, consume(Shard, ['#'])) lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
). ).
transfer_snapshot(Reader, Writer) -> transfer_snapshot(Reader, Writer) ->
@ -127,29 +127,6 @@ message(Topic, Payload, PublishedAt) ->
id = emqx_guid:gen() id = emqx_guid:gen()
}. }.
consume(Shard, TopicFilter) ->
consume(Shard, TopicFilter, 0).
consume(Shard, TopicFilter, StartTime) ->
Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime),
lists:flatmap(
fun({_Rank, Stream}) ->
{ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime),
consume_stream(Shard, It)
end,
Streams
).
consume_stream(Shard, It) ->
case emqx_ds_storage_layer:next(Shard, It, 100) of
{ok, _NIt, _Msgs = []} ->
[];
{ok, NIt, Batch} ->
[Msg || {_DSKey, Msg} <- Batch] ++ consume_stream(Shard, NIt);
{ok, end_of_stream} ->
[]
end.
stop_shard(Pid) -> stop_shard(Pid) ->
_ = unlink(Pid), _ = unlink(Pid),
proc_lib:stop(Pid, shutdown, infinity). proc_lib:stop(Pid, shutdown, infinity).

View File

@ -56,3 +56,63 @@ mock_rpc_result(gen_rpc, ExpectFun) ->
{badrpc, timeout} {badrpc, timeout}
end end
end). end).
%% Consuming streams and iterators
consume(DB, TopicFilter) ->
consume(DB, TopicFilter, 0).
consume(DB, TopicFilter, StartTime) ->
Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime),
lists:flatmap(
fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end,
Streams
).
consume_stream(DB, Stream, TopicFilter, StartTime) ->
{ok, It0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
{ok, _It, Msgs} = consume_iter(DB, It0),
Msgs.
consume_iter(DB, It) ->
consume_iter(DB, It, #{}).
consume_iter(DB, It, Opts) ->
consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts).
storage_consume(ShardId, TopicFilter) ->
storage_consume(ShardId, TopicFilter, 0).
storage_consume(ShardId, TopicFilter, StartTime) ->
Streams = emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime),
lists:flatmap(
fun({_Rank, Stream}) ->
storage_consume_stream(ShardId, Stream, TopicFilter, StartTime)
end,
Streams
).
storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) ->
{ok, It0} = emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime),
{ok, _It, Msgs} = storage_consume_iter(ShardId, It0),
Msgs.
storage_consume_iter(ShardId, It) ->
storage_consume_iter(ShardId, It, #{}).
storage_consume_iter(ShardId, It, Opts) ->
consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts).
consume_iter_with(NextFun, Args, It0, Opts) ->
BatchSize = maps:get(batch_size, Opts, 5),
case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of
{ok, It, _Msgs = []} ->
{ok, It, []};
{ok, It1, Batch} ->
{ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts),
{ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs};
{ok, Eos = end_of_stream} ->
{ok, Eos, []};
{error, Class, Reason} ->
error({error, Class, Reason})
end.