test(ft): add some basic assembler tests
This commit is contained in:
parent
cbff2e2309
commit
7b77e96ab9
|
@ -0,0 +1,163 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ft_assembler_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
% {ok, Apps} = application:ensure_all_started(emqx_ft),
|
||||
% [{suite_apps, Apps} | Config].
|
||||
% ok = emqx_common_test_helpers:start_apps([emqx_ft]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
% lists:foreach(fun application:stop/1, lists:reverse(?config(suite_apps, Config))).
|
||||
% ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(TC, Config) ->
|
||||
ok = snabbkaffe:start_trace(),
|
||||
Root = filename:join(["roots", TC]),
|
||||
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
|
||||
[{storage_root, Root}, {assembler_sup, Pid} | Config].
|
||||
|
||||
end_per_testcase(_TC, Config) ->
|
||||
ok = inspect_storage_root(Config),
|
||||
ok = gen:stop(?config(assembler_sup, Config)),
|
||||
ok = snabbkaffe:stop(),
|
||||
ok.
|
||||
|
||||
%%
|
||||
|
||||
-define(CLIENTID, <<"thatsme">>).
|
||||
|
||||
t_assemble_empty_transfer(Config) ->
|
||||
Storage = ?config(storage_root, Config),
|
||||
Transfer = {?CLIENTID, mk_fileid()},
|
||||
Filename = "important.pdf",
|
||||
Meta = #{
|
||||
name => Filename,
|
||||
size => 0,
|
||||
expire_at => 42
|
||||
},
|
||||
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
|
||||
?assertMatch(
|
||||
{ok, [
|
||||
#{
|
||||
path := _,
|
||||
timestamp := {{_, _, _}, {_, _, _}},
|
||||
fragment := {filemeta, Meta}
|
||||
}
|
||||
]},
|
||||
emqx_ft_storage_fs:list(Storage, Transfer)
|
||||
),
|
||||
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
|
||||
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
|
||||
?assertMatch(#{result := ok}, Event),
|
||||
?assertEqual(
|
||||
{ok, <<>>},
|
||||
% TODO
|
||||
file:read_file(mk_assembly_filename(Config, Transfer, Filename))
|
||||
),
|
||||
ok.
|
||||
|
||||
t_assemble_complete_local_transfer(Config) ->
|
||||
Storage = ?config(storage_root, Config),
|
||||
Transfer = {?CLIENTID, mk_fileid()},
|
||||
Filename = "topsecret.pdf",
|
||||
TransferSize = 10000 + rand:uniform(50000),
|
||||
SegmentSize = 4096,
|
||||
Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
|
||||
Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
|
||||
Meta = #{
|
||||
name => Filename,
|
||||
checksum => {sha256, Hash},
|
||||
expire_at => 42
|
||||
},
|
||||
|
||||
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
|
||||
_ = emqx_ft_content_gen:consume(
|
||||
Gen,
|
||||
fun({Content, SegmentNum, _SegmentCount}) ->
|
||||
Offset = (SegmentNum - 1) * SegmentSize,
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content})
|
||||
)
|
||||
end
|
||||
),
|
||||
|
||||
{ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer),
|
||||
?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
|
||||
?assertEqual(
|
||||
[Meta],
|
||||
[FM || #{fragment := {filemeta, FM}} <- Fragments],
|
||||
Fragments
|
||||
),
|
||||
|
||||
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
|
||||
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
|
||||
?assertMatch(#{result := ok}, Event),
|
||||
|
||||
AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
|
||||
?assertMatch(
|
||||
{ok, #file_info{type = regular, size = TransferSize}},
|
||||
file:read_file_info(AssemblyFilename)
|
||||
),
|
||||
ok = emqx_ft_content_gen:check_file_consistency(
|
||||
{Transfer, TransferSize},
|
||||
100,
|
||||
AssemblyFilename
|
||||
).
|
||||
|
||||
mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
|
||||
filename:join([?config(storage_root, Config), ClientID, FileID, Filename]).
|
||||
|
||||
on_assembly_finished(Result) ->
|
||||
?tp(test_assembly_finished, #{result => Result}).
|
||||
|
||||
%%
|
||||
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
inspect_storage_root(Config) ->
|
||||
inspect_dir(?config(storage_root, Config)).
|
||||
|
||||
inspect_dir(Dir) ->
|
||||
FileInfos = filelib:fold_files(
|
||||
Dir,
|
||||
".*",
|
||||
true,
|
||||
fun(Filename, Acc) -> orddict:store(Filename, inspect_file(Filename), Acc) end,
|
||||
orddict:new()
|
||||
),
|
||||
ct:pal("inspect '~s': ~p", [Dir, FileInfos]).
|
||||
|
||||
inspect_file(Filename) ->
|
||||
{ok, Info} = file:read_file_info(Filename),
|
||||
{Info#file_info.type, Info#file_info.size, Info#file_info.mtime}.
|
||||
|
||||
mk_fileid() ->
|
||||
integer_to_binary(erlang:system_time(millisecond)).
|
|
@ -0,0 +1,229 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Inspired by
|
||||
%% https://github.com/kafka4beam/kflow/blob/master/src/testbed/payload_gen.erl
|
||||
|
||||
-module(emqx_ft_content_gen).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-dialyzer(no_improper_lists).
|
||||
|
||||
-export([new/2]).
|
||||
-export([generate/3]).
|
||||
-export([next/1]).
|
||||
-export([consume/1]).
|
||||
-export([consume/2]).
|
||||
-export([fold/3]).
|
||||
|
||||
-export([hash/2]).
|
||||
-export([check_file_consistency/3]).
|
||||
|
||||
-export_type([cont/1]).
|
||||
-export_type([stream/1]).
|
||||
-export_type([binary_payload/0]).
|
||||
|
||||
-define(hash_size, 16).
|
||||
|
||||
-type payload() :: {Seed :: term(), Size :: integer()}.
|
||||
|
||||
-type binary_payload() :: {
|
||||
binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
|
||||
}.
|
||||
|
||||
-type cont(Data) ::
|
||||
fun(() -> stream(Data))
|
||||
| stream(Data).
|
||||
|
||||
-type stream(Data) ::
|
||||
maybe_improper_list(Data, cont(Data))
|
||||
| eos.
|
||||
|
||||
-record(chunk_state, {
|
||||
seed :: term(),
|
||||
payload_size :: non_neg_integer(),
|
||||
offset :: non_neg_integer(),
|
||||
chunk_size :: non_neg_integer()
|
||||
}).
|
||||
|
||||
-type chunk_state() :: #chunk_state{}.
|
||||
|
||||
%% -----------------------------------------------------------------------------
|
||||
%% Generic streams
|
||||
%% -----------------------------------------------------------------------------
|
||||
|
||||
%% @doc Consume one element from the stream.
|
||||
-spec next(cont(A)) -> stream(A).
|
||||
next(Fun) when is_function(Fun, 0) ->
|
||||
Fun();
|
||||
next(L) ->
|
||||
L.
|
||||
|
||||
%% @doc Consume all elements of the stream and feed them into a
|
||||
%% callback (e.g. brod:produce)
|
||||
-spec consume(cont(A), fun((A) -> Ret)) -> [Ret].
|
||||
consume([Data | Cont], Callback) ->
|
||||
[Callback(Data) | consume(next(Cont), Callback)];
|
||||
consume(Cont, Callback) when is_function(Cont, 0) ->
|
||||
consume(next(Cont), Callback);
|
||||
consume(eos, _Callback) ->
|
||||
[].
|
||||
|
||||
%% @equiv consume(Stream, fun(A) -> A end)
|
||||
-spec consume(cont(A)) -> [A].
|
||||
consume(Stream) ->
|
||||
consume(Stream, fun(A) -> A end).
|
||||
|
||||
-spec fold(fun((A, Acc) -> Acc), Acc, cont(A)) -> Acc.
|
||||
fold(Fun, Acc, [Data | Cont]) ->
|
||||
fold(Fun, Fun(Data, Acc), next(Cont));
|
||||
fold(Fun, Acc, Cont) when is_function(Cont, 0) ->
|
||||
fold(Fun, Acc, next(Cont));
|
||||
fold(_Fun, Acc, eos) ->
|
||||
Acc.
|
||||
|
||||
%% -----------------------------------------------------------------------------
|
||||
%% Binary streams
|
||||
%% -----------------------------------------------------------------------------
|
||||
|
||||
%% @doc Stream of binary chunks.
|
||||
%% Limitation: `ChunkSize' should be dividable by `?hash_size'
|
||||
-spec new(payload(), integer()) -> cont(binary_payload()).
|
||||
new({Seed, Size}, ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
|
||||
fun() ->
|
||||
generate_next_chunk(#chunk_state{
|
||||
seed = Seed,
|
||||
payload_size = Size,
|
||||
chunk_size = ChunkSize,
|
||||
offset = 0
|
||||
})
|
||||
end.
|
||||
|
||||
%% @doc Generate chunks of data and feed them into
|
||||
%% `Callback'
|
||||
-spec generate(payload(), integer(), fun((binary_payload()) -> A)) -> [A].
|
||||
generate(Payload, ChunkSize, Callback) ->
|
||||
consume(new(Payload, ChunkSize), Callback).
|
||||
|
||||
-spec hash(cont(binary_payload()), crypto:hash_state()) -> binary().
|
||||
hash(Stream, HashCtxIn) ->
|
||||
crypto:hash_final(
|
||||
fold(
|
||||
fun({Chunk, _, _}, HashCtx) ->
|
||||
crypto:hash_update(HashCtx, Chunk)
|
||||
end,
|
||||
HashCtxIn,
|
||||
Stream
|
||||
)
|
||||
).
|
||||
|
||||
-spec check_consistency(
|
||||
payload(),
|
||||
integer(),
|
||||
fun((integer()) -> {ok, binary()} | undefined)
|
||||
) -> ok.
|
||||
check_consistency({Seed, Size}, SampleSize, Callback) ->
|
||||
SeedHash = seed_hash(Seed),
|
||||
Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
|
||||
%% Always check first and last bytes, and one that should not exist:
|
||||
Samples = [0, Size - 1, Size | Random],
|
||||
lists:foreach(
|
||||
fun
|
||||
(N) when N < Size ->
|
||||
Expected = do_get_byte(N, SeedHash),
|
||||
?assertEqual(
|
||||
{N, {ok, Expected}},
|
||||
{N, Callback(N)}
|
||||
);
|
||||
(N) ->
|
||||
?assertMatch(undefined, Callback(N))
|
||||
end,
|
||||
Samples
|
||||
).
|
||||
|
||||
-spec check_file_consistency(
|
||||
payload(),
|
||||
integer(),
|
||||
file:filename()
|
||||
) -> ok.
|
||||
check_file_consistency(Payload, SampleSize, FileName) ->
|
||||
{ok, FD} = file:open(FileName, [read, raw]),
|
||||
try
|
||||
Fun = fun(N) ->
|
||||
case file:pread(FD, [{N, 1}]) of
|
||||
{ok, [[X]]} -> {ok, X};
|
||||
{ok, [eof]} -> undefined
|
||||
end
|
||||
end,
|
||||
check_consistency(Payload, SampleSize, Fun)
|
||||
after
|
||||
file:close(FD)
|
||||
end.
|
||||
|
||||
%% =============================================================================
|
||||
%% Internal functions
|
||||
%% =============================================================================
|
||||
|
||||
%% @doc Continue generating chunks
|
||||
-spec generate_next_chunk(chunk_state()) -> stream(binary()).
|
||||
generate_next_chunk(#chunk_state{offset = Offset, payload_size = Size}) when Offset >= Size ->
|
||||
eos;
|
||||
generate_next_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
|
||||
State = State0#chunk_state{offset = Offset + ChunkSize},
|
||||
Payload = generate_chunk(
|
||||
State#chunk_state.seed,
|
||||
Offset,
|
||||
ChunkSize,
|
||||
State#chunk_state.payload_size
|
||||
),
|
||||
[Payload | fun() -> generate_next_chunk(State) end].
|
||||
|
||||
generate_chunk(Seed, Offset, ChunkSize, Size) ->
|
||||
SeedHash = seed_hash(Seed),
|
||||
To = min(Offset + ChunkSize, Size) - 1,
|
||||
Payload = iolist_to_binary([
|
||||
generator_fun(I, SeedHash)
|
||||
|| I <- lists:seq(Offset div 16, To div 16)
|
||||
]),
|
||||
ChunkNum = Offset div ChunkSize + 1,
|
||||
ChunkCnt = ceil(Size / ChunkSize),
|
||||
Chunk =
|
||||
case Offset + ChunkSize of
|
||||
NextOffset when NextOffset > Size ->
|
||||
binary:part(Payload, 0, Size rem ChunkSize);
|
||||
_ ->
|
||||
Payload
|
||||
end,
|
||||
{Chunk, ChunkNum, ChunkCnt}.
|
||||
|
||||
%% @doc First argument is a chunk number, the second one is a seed.
|
||||
%% This implementation is hardly efficient, but it was chosen for
|
||||
%% clarity reasons
|
||||
-spec generator_fun(integer(), binary()) -> binary().
|
||||
generator_fun(N, Seed) ->
|
||||
crypto:hash(md5, <<N:32, Seed/binary>>).
|
||||
|
||||
%% @doc Hash any term
|
||||
-spec seed_hash(term()) -> binary().
|
||||
seed_hash(Seed) ->
|
||||
crypto:hash(md5, term_to_binary(Seed)).
|
||||
|
||||
%% @private Get byte at offset `N'
|
||||
-spec do_get_byte(integer(), binary()) -> byte().
|
||||
do_get_byte(N, Seed) ->
|
||||
Chunk = generator_fun(N div ?hash_size, Seed),
|
||||
binary:at(Chunk, N rem ?hash_size).
|
Loading…
Reference in New Issue