emqx/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

267 lines
8.1 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_assembler_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("kernel/include/file.hrl").
all() ->
[
t_assemble_empty_transfer,
t_assemble_complete_local_transfer,
t_assemble_incomplete_transfer,
t_assemble_no_meta,
% NOTE
% It depends on the side effects of all previous testcases.
t_list_transfers
].
init_per_suite(Config) ->
{ok, Apps} = application:ensure_all_started(gproc),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop_apps(?config(suite_apps, Config)).
init_per_testcase(TC, Config) ->
ok = snabbkaffe:start_trace(),
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
[
{storage_root, <<"file_transfer_root">>},
{exports_root, <<"file_transfer_exports">>},
{file_id, atom_to_binary(TC)},
{assembler_sup, Pid}
| Config
].
end_per_testcase(_TC, Config) ->
ok = inspect_storage_root(Config),
ok = gen:stop(?config(assembler_sup, Config)),
ok = snabbkaffe:stop(),
ok.
%%
-define(CLIENTID1, <<"thatsme">>).
-define(CLIENTID2, <<"thatsnotme">>).
t_assemble_empty_transfer(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID1, ?config(file_id, Config)},
Filename = "important.pdf",
Meta = #{
name => Filename,
size => 0,
expire_at => 42
},
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
?assertMatch(
{ok, [
#{
path := _,
timestamp := {{_, _, _}, {_, _, _}},
fragment := {filemeta, Meta}
}
]},
emqx_ft_storage_fs:list(Storage, Transfer, fragment)
),
Status = complete_assemble(Storage, Transfer, 0),
?assertEqual({shutdown, ok}, Status),
{ok, [_Result = #{size := _Size = 0}]} = list_exports(Config, Transfer),
% ?assertEqual(
% {error, eof},
% emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
% ),
ok.
t_assemble_complete_local_transfer(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID2, ?config(file_id, Config)},
Filename = "topsecret.pdf",
TransferSize = 10000 + rand:uniform(50000),
SegmentSize = 4096,
Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
Meta = #{
name => Filename,
checksum => {sha256, Hash},
expire_at => 42
},
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
_ = emqx_ft_content_gen:consume(
Gen,
fun({Content, SegmentNum, _Meta}) ->
Offset = (SegmentNum - 1) * SegmentSize,
?assertEqual(
ok,
emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content})
)
end
),
{ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
?assertEqual(
[Meta],
[FM || #{fragment := {filemeta, FM}} <- Fragments],
Fragments
),
Status = complete_assemble(Storage, Transfer, TransferSize),
?assertEqual({shutdown, ok}, Status),
?assertMatch(
{ok, [
#{
size := TransferSize,
meta := #{}
}
]},
list_exports(Config, Transfer)
),
{ok, [#{path := AssemblyFilename}]} = list_exports(Config, Transfer),
?assertMatch(
{ok, #file_info{type = regular, size = TransferSize}},
file:read_file_info(AssemblyFilename)
),
ok = emqx_ft_content_gen:check_file_consistency(
{Transfer, TransferSize},
100,
AssemblyFilename
).
t_assemble_incomplete_transfer(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID2, ?config(file_id, Config)},
Filename = "incomplete.pdf",
TransferSize = 10000 + rand:uniform(50000),
SegmentSize = 4096,
Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
Meta = #{
name => Filename,
checksum => {sha256, Hash},
size => TransferSize,
expire_at => 42
},
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
Status = complete_assemble(Storage, Transfer, TransferSize),
?assertMatch({shutdown, {error, _}}, Status).
t_assemble_no_meta(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID2, ?config(file_id, Config)},
Status = complete_assemble(Storage, Transfer, 42),
?assertMatch({shutdown, {error, {incomplete, _}}}, Status).
complete_assemble(Storage, Transfer, Size) ->
complete_assemble(Storage, Transfer, Size, 1000).
complete_assemble(Storage, Transfer, Size, Timeout) ->
{async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}),
MRef = erlang:monitor(process, Pid),
Pid ! kickoff,
receive
{'DOWN', MRef, process, Pid, Result} ->
Result
after Timeout ->
ct:fail("Assembler did not finish in time")
end.
%%
t_list_transfers(Config) ->
{ok, Exports} = list_exports(Config),
?assertMatch(
[
#{
transfer := {?CLIENTID2, <<"t_assemble_complete_local_transfer">>},
path := _,
size := Size,
meta := #{name := "topsecret.pdf"}
},
#{
transfer := {?CLIENTID1, <<"t_assemble_empty_transfer">>},
path := _,
size := 0,
meta := #{name := "important.pdf"}
}
] when Size > 0,
lists:sort(Exports)
).
%%
-include_lib("kernel/include/file.hrl").
inspect_storage_root(Config) ->
inspect_dir(?config(storage_root, Config)).
inspect_dir(Dir) ->
FileInfos = filelib:fold_files(
Dir,
".*",
true,
fun(Filename, Acc) -> orddict:store(Filename, inspect_file(Filename), Acc) end,
orddict:new()
),
ct:pal("inspect '~s': ~p", [Dir, FileInfos]).
inspect_file(Filename) ->
{ok, Info} = file:read_file_info(Filename),
{Info#file_info.type, Info#file_info.size, Info#file_info.mtime}.
mk_fileid() ->
integer_to_binary(erlang:system_time(millisecond)).
list_exports(Config) ->
{emqx_ft_storage_exporter_fs, Options} = exporter(Config),
emqx_ft_storage_exporter_fs:list_local(Options).
list_exports(Config, Transfer) ->
{emqx_ft_storage_exporter_fs, Options} = exporter(Config),
emqx_ft_storage_exporter_fs:list_local_transfer(Options, Transfer).
exporter(Config) ->
emqx_ft_storage_exporter:exporter(storage(Config)).
storage(Config) ->
emqx_utils_maps:deep_get(
[storage, local],
emqx_ft_schema:translate(#{
<<"storage">> => #{
<<"local">> => #{
<<"segments">> => #{
<<"root">> => ?config(storage_root, Config)
},
<<"exporter">> => #{
<<"local">> => #{
<<"enable">> => true,
<<"root">> => ?config(exports_root, Config)
}
}
}
}
})
).