emqx/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

210 lines
6.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_assembler_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() ->
[
t_assemble_empty_transfer,
t_assemble_complete_local_transfer,
% NOTE
% It depends on the side effects of all previous testcases.
t_list_transfers
].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(TC, Config) ->
ok = snabbkaffe:start_trace(),
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
[
{storage_root, "file_transfer_root"},
{file_id, atom_to_binary(TC)},
{assembler_sup, Pid}
| Config
].
end_per_testcase(_TC, Config) ->
ok = inspect_storage_root(Config),
ok = gen:stop(?config(assembler_sup, Config)),
ok = snabbkaffe:stop(),
ok.
%%
-define(CLIENTID1, <<"thatsme">>).
-define(CLIENTID2, <<"thatsnotme">>).
t_assemble_empty_transfer(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID1, ?config(file_id, Config)},
Filename = "important.pdf",
Meta = #{
name => Filename,
size => 0,
expire_at => 42
},
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
?assertMatch(
{ok, [
#{
path := _,
timestamp := {{_, _, _}, {_, _, _}},
fragment := {filemeta, Meta}
}
]},
emqx_ft_storage_fs:list(Storage, Transfer, fragment)
),
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
?assertMatch(#{result := ok}, Event),
?assertEqual(
{ok, <<>>},
% TODO
file:read_file(mk_assembly_filename(Config, Transfer, Filename))
),
{ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
?assertEqual(
{error, eof},
emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
),
ok.
t_assemble_complete_local_transfer(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID2, ?config(file_id, Config)},
Filename = "topsecret.pdf",
TransferSize = 10000 + rand:uniform(50000),
SegmentSize = 4096,
Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
Meta = #{
name => Filename,
checksum => {sha256, Hash},
expire_at => 42
},
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
_ = emqx_ft_content_gen:consume(
Gen,
fun({Content, SegmentNum, _SegmentCount}) ->
Offset = (SegmentNum - 1) * SegmentSize,
?assertEqual(
ok,
emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content})
)
end
),
{ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
?assertEqual(
[Meta],
[FM || #{fragment := {filemeta, FM}} <- Fragments],
Fragments
),
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
?assertMatch(#{result := ok}, Event),
AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
?assertMatch(
{ok, [
#{
path := AssemblyFilename,
size := TransferSize,
fragment := {result, #{}}
}
]},
emqx_ft_storage_fs:list(Storage, Transfer, result)
),
?assertMatch(
{ok, #file_info{type = regular, size = TransferSize}},
file:read_file_info(AssemblyFilename)
),
ok = emqx_ft_content_gen:check_file_consistency(
{Transfer, TransferSize},
100,
AssemblyFilename
).
mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]).
on_assembly_finished(Result) ->
?tp(test_assembly_finished, #{result => Result}).
%%
t_list_transfers(Config) ->
Storage = storage(Config),
?assertMatch(
{ok, #{
{?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{
status := complete,
result := [#{path := _, size := 0, fragment := {result, _}}]
},
{?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{
status := complete,
result := [#{path := _, size := Size, fragment := {result, _}}]
}
}} when Size > 0,
emqx_ft_storage_fs:transfers(Storage)
).
%%
-include_lib("kernel/include/file.hrl").
inspect_storage_root(Config) ->
inspect_dir(?config(storage_root, Config)).
inspect_dir(Dir) ->
FileInfos = filelib:fold_files(
Dir,
".*",
true,
fun(Filename, Acc) -> orddict:store(Filename, inspect_file(Filename), Acc) end,
orddict:new()
),
ct:pal("inspect '~s': ~p", [Dir, FileInfos]).
inspect_file(Filename) ->
{ok, Info} = file:read_file_info(Filename),
{Info#file_info.type, Info#file_info.size, Info#file_info.mtime}.
mk_fileid() ->
integer_to_binary(erlang:system_time(millisecond)).
storage(Config) ->
#{
root => ?config(storage_root, Config)
}.