emqx/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl

594 lines
15 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_s3_uploader_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(assertProcessExited(Reason, Pid),
receive
{'DOWN', _, _, Pid, Reason} ->
ok
after 3000 ->
ct:fail("uploader process did not exit")
end
).
-define(assertObjectEqual(Value, AwsConfig, Bucket, Key),
?assertEqual(
Value,
proplists:get_value(
content,
erlcloud_s3:get_object(
Bucket,
Key,
AwsConfig
)
)
)
).
all() ->
[
{group, tcp},
{group, tls}
].
groups() ->
[
{tcp, [
{group, common_cases},
{group, tcp_cases}
]},
{tls, [
{group, common_cases},
{group, tls_cases}
]},
{common_cases, [], [
t_happy_path_simple_put,
t_happy_path_multi,
t_abort_multi,
t_abort_simple_put,
t_signed_url_download,
t_signed_nonascii_url_download,
{group, noconn_errors},
{group, timeout_errors},
{group, http_errors}
]},
{tcp_cases, [
t_config_switch,
t_too_large,
t_no_profile
]},
{tls_cases, [
t_tls_error,
t_config_switch_http_settings
]},
{noconn_errors, [{group, transport_errors}]},
{timeout_errors, [{group, transport_errors}]},
{http_errors, [{group, transport_errors}]},
{transport_errors, [
t_start_multipart_error,
t_upload_part_error,
t_complete_multipart_error,
t_abort_multipart_error,
t_put_object_error
]}
].
suite() -> [{timetrap, {minutes, 1}}].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_s3),
Config.
end_per_suite(_Config) ->
ok = application:stop(emqx_s3).
init_per_group(Group, Config) when Group =:= tcp orelse Group =:= tls ->
[{conn_type, Group} | Config];
init_per_group(noconn_errors, Config) ->
[{failure, down} | Config];
init_per_group(timeout_errors, Config) ->
[{failure, timeout} | Config];
init_per_group(http_errors, Config) ->
[{failure, ehttpc_500} | Config];
init_per_group(_ConnType, Config) ->
Config.
end_per_group(_ConnType, _Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
ConnType = ?config(conn_type, Config),
TestAwsConfig = emqx_s3_test_helpers:aws_config(ConnType),
Bucket = emqx_s3_test_helpers:unique_bucket(),
ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig),
ProfileBaseConfig = emqx_s3_test_helpers:base_config(ConnType),
ProfileConfig = ProfileBaseConfig#{bucket => Bucket},
ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
[{bucket, Bucket}, {test_aws_config, TestAwsConfig}, {profile_config, ProfileConfig} | Config].
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
_ = emqx_s3:stop_profile(profile_id()).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_happy_path_simple_put(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
Data = data($a, 1024, 10),
lists:foreach(
fun(Chunk) ->
?assertEqual(
ok,
emqx_s3_uploader:write(Pid, Chunk)
)
end,
Data
),
ok = emqx_s3_uploader:complete(Pid),
?assertProcessExited(
normal,
Pid
),
?assertObjectEqual(
iolist_to_binary(Data),
?config(test_aws_config, Config),
?config(bucket, Config),
Key
).
t_happy_path_multi(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
Data = data($a, 1024 * 1024, 10),
lists:foreach(
fun(Chunk) ->
?assertEqual(
ok,
emqx_s3_uploader:write(Pid, Chunk)
)
end,
Data
),
ok = emqx_s3_uploader:complete(Pid),
?assertProcessExited(
normal,
Pid
),
?assertObjectEqual(
iolist_to_binary(Data),
?config(test_aws_config, Config),
?config(bucket, Config),
Key
).
t_signed_url_download(_Config) ->
Prefix = emqx_s3_test_helpers:unique_key(),
Key = Prefix ++ "/ascii.txt",
{ok, Data} = upload(Key, 1024, 5),
SignedUrl = emqx_s3:with_client(profile_id(), fun(Client) ->
emqx_s3_client:uri(Client, Key)
end),
HttpOpts = [{ssl, [{verify, verify_none}]}],
{ok, {_, _, Body}} = httpc:request(get, {SignedUrl, []}, HttpOpts, []),
?assertEqual(
iolist_to_binary(Data),
iolist_to_binary(Body)
).
t_signed_nonascii_url_download(_Config) ->
Prefix = emqx_s3_test_helpers:unique_key(),
Key = Prefix ++ "/unicode-🫠.txt",
{ok, Data} = upload(Key, 1024 * 1024, 8),
SignedUrl = emqx_s3:with_client(profile_id(), fun(Client) ->
emqx_s3_client:uri(Client, Key)
end),
HttpOpts = [{ssl, [{verify, verify_none}]}],
{ok, {_, _, Body}} = httpc:request(get, {SignedUrl, []}, HttpOpts, []),
?assertEqual(
iolist_to_binary(Data),
iolist_to_binary(Body)
).
t_abort_multi(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data] = data($a, 6 * 1024 * 1024, 1),
ok = emqx_s3_uploader:write(Pid, Data),
?assertMatch(
[],
list_objects(Config)
),
ok = emqx_s3_uploader:abort(Pid),
?assertMatch(
[],
list_objects(Config)
),
?assertProcessExited(
normal,
Pid
).
t_abort_simple_put(_Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data] = data($a, 10 * 1024, 1),
ok = emqx_s3_uploader:write(Pid, Data),
ok = emqx_s3_uploader:abort(Pid),
?assertProcessExited(
normal,
Pid
).
t_config_switch(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
OldBucket = ?config(bucket, Config),
{ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}),
[Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
ok = emqx_s3_uploader:write(Pid0, Data0),
%% Switch to the new config, but without changing HTTP settings
ProfileConfig = ?config(profile_config, Config),
NewBucket = emqx_s3_test_helpers:unique_bucket(),
ok = erlcloud_s3:create_bucket(NewBucket, ?config(test_aws_config, Config)),
NewProfileConfig = ProfileConfig#{bucket => NewBucket},
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig),
%% Already started uploader should be OK and use previous config
ok = emqx_s3_uploader:write(Pid0, Data1),
ok = emqx_s3_uploader:complete(Pid0),
?assertObjectEqual(
iolist_to_binary([Data0, Data1]),
?config(test_aws_config, Config),
OldBucket,
Key
),
%% Now check that new uploader uses new config
{ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}),
ok = emqx_s3_uploader:write(Pid1, Data0),
ok = emqx_s3_uploader:complete(Pid1),
?assertObjectEqual(
iolist_to_binary(Data0),
?config(test_aws_config, Config),
NewBucket,
Key
).
t_config_switch_http_settings(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
OldBucket = ?config(bucket, Config),
{ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}),
[Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
ok = emqx_s3_uploader:write(Pid0, Data0),
%% Switch to the new config, completely changing HTTP settings (tcp -> tls)
NewBucket = emqx_s3_test_helpers:unique_bucket(),
NewTestAwsConfig = emqx_s3_test_helpers:aws_config(tls),
ok = erlcloud_s3:create_bucket(NewBucket, NewTestAwsConfig),
NewProfileConfig0 = emqx_s3_test_helpers:base_config(tls),
NewProfileConfig1 = NewProfileConfig0#{bucket => NewBucket},
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
%% Already started uploader should be OK and use previous config
ok = emqx_s3_uploader:write(Pid0, Data1),
ok = emqx_s3_uploader:complete(Pid0),
?assertObjectEqual(
iolist_to_binary([Data0, Data1]),
?config(test_aws_config, Config),
OldBucket,
Key
),
%% Now check that new uploader uses new config
{ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}),
ok = emqx_s3_uploader:write(Pid1, Data0),
ok = emqx_s3_uploader:complete(Pid1),
?assertObjectEqual(
iolist_to_binary(Data0),
NewTestAwsConfig,
NewBucket,
Key
).
t_start_multipart_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data] = data($a, 6 * 1024 * 1024, 1),
emqx_s3_test_helpers:with_failure(
?config(conn_type, Config),
?config(failure, Config),
fun() ->
?assertMatch(
{error, _},
emqx_s3_uploader:write(Pid, Data)
)
end
),
?assertProcessExited(
{error, _},
Pid
).
t_upload_part_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
ok = emqx_s3_uploader:write(Pid, Data0),
emqx_s3_test_helpers:with_failure(
?config(conn_type, Config),
?config(failure, Config),
fun() ->
?assertMatch(
{error, _},
emqx_s3_uploader:write(Pid, Data1)
)
end
),
?assertProcessExited(
{error, _},
Pid
).
t_abort_multipart_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data] = data($a, 6 * 1024 * 1024, 1),
ok = emqx_s3_uploader:write(Pid, Data),
emqx_s3_test_helpers:with_failure(
?config(conn_type, Config),
?config(failure, Config),
fun() ->
?assertMatch(
{error, _},
emqx_s3_uploader:abort(Pid)
)
end
),
?assertProcessExited(
{error, _},
Pid
).
t_complete_multipart_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data] = data($a, 6 * 1024 * 1024, 1),
ok = emqx_s3_uploader:write(Pid, Data),
emqx_s3_test_helpers:with_failure(
?config(conn_type, Config),
?config(failure, Config),
fun() ->
?assertMatch(
{error, _},
emqx_s3_uploader:complete(Pid)
)
end
),
?assertProcessExited(
{error, _},
Pid
).
t_put_object_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
%% Little data to avoid multipart upload
[Data] = data($a, 1024, 1),
emqx_s3_test_helpers:with_failure(
?config(conn_type, Config),
?config(failure, Config),
fun() ->
ok = emqx_s3_uploader:write(Pid, Data),
?assertMatch(
{error, _},
emqx_s3_uploader:complete(Pid)
)
end
),
?assertProcessExited(
{error, _},
Pid
).
t_too_large(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data] = data($a, 1024, 1),
[DataLarge] = data($a, 20 * 1024 * 1024, 1),
?assertMatch(
{error, {too_large, _}},
emqx_s3_uploader:write(Pid, DataLarge)
),
ok = emqx_s3_uploader:write(Pid, Data),
ok = emqx_s3_uploader:complete(Pid),
?assertProcessExited(
normal,
Pid
),
?assertObjectEqual(
iolist_to_binary(Data),
?config(test_aws_config, Config),
?config(bucket, Config),
Key
).
t_tls_error(Config) ->
_ = process_flag(trap_exit, true),
ProfileBaseConfig = ?config(profile_config, Config),
ProfileConfig = emqx_utils_maps:deep_put(
[transport_options, ssl, server_name_indication], ProfileBaseConfig, "invalid-hostname"
),
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
[Data] = data($a, 6 * 1024 * 1024, 1),
?assertMatch(
{error, _},
emqx_s3_uploader:write(Pid, Data)
),
?assertProcessExited(
{error, _},
Pid
).
t_no_profile(_Config) ->
Key = emqx_s3_test_helpers:unique_key(),
?assertMatch(
{error, profile_not_found},
emqx_s3:start_uploader(<<"no-profile">>, Key, #{})
).
%%--------------------------------------------------------------------
%% Test helpers
%%--------------------------------------------------------------------
profile_id() ->
<<"test">>.
data(Byte, ChunkSize, ChunkCount) ->
Chunk = iolist_to_binary([Byte || _ <- lists:seq(1, ChunkSize)]),
[Chunk || _ <- lists:seq(1, ChunkCount)].
list_objects(Config) ->
Props = erlcloud_s3:list_objects(?config(bucket, Config), [], ?config(test_aws_config, Config)),
proplists:get_value(contents, Props).
upload(Key, ChunkSize, ChunkCount) ->
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
Data = data($a, ChunkSize, ChunkCount),
ok = lists:foreach(
fun(Chunk) -> ?assertEqual(ok, emqx_s3_uploader:write(Pid, Chunk)) end,
Data
),
ok = emqx_s3_uploader:complete(Pid),
ok = ?assertProcessExited(
normal,
Pid
),
{ok, Data}.