283 lines
8.5 KiB
Erlang
283 lines
8.5 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_s3_profile_conf_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("emqx/include/asserts.hrl").
|
|
|
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
|
|
|
suite() -> [{timetrap, {minutes, 1}}].
|
|
|
|
init_per_suite(Config) ->
|
|
{ok, _} = application:ensure_all_started(emqx_s3),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
ok = application:stop(emqx_s3).
|
|
|
|
init_per_testcase(_TestCase, Config) ->
|
|
ok = snabbkaffe:start_trace(),
|
|
TestAwsConfig = emqx_s3_test_helpers:aws_config(tcp),
|
|
|
|
Bucket = emqx_s3_test_helpers:unique_bucket(),
|
|
ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig),
|
|
|
|
ProfileBaseConfig = emqx_s3_test_helpers:base_config(tcp),
|
|
ProfileConfig = ProfileBaseConfig#{bucket => Bucket},
|
|
ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
|
|
|
|
[{profile_config, ProfileConfig} | Config].
|
|
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ok = snabbkaffe:stop(),
|
|
_ = emqx_s3:stop_profile(profile_id()).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_regular_outdated_pool_cleanup(Config) ->
|
|
_ = process_flag(trap_exit, true),
|
|
Key = emqx_s3_test_helpers:unique_key(),
|
|
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
|
|
|
|
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
|
|
|
|
ProfileBaseConfig = ?config(profile_config, Config),
|
|
ProfileConfig = emqx_utils_maps:deep_put(
|
|
[transport_options, pool_size], ProfileBaseConfig, 16
|
|
),
|
|
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
|
|
|
|
?assertEqual(
|
|
2,
|
|
length(emqx_s3_profile_http_pools:all(profile_id()))
|
|
),
|
|
|
|
?assertWaitEvent(
|
|
ok = emqx_s3_uploader:abort(Pid),
|
|
#{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
|
|
1000
|
|
),
|
|
|
|
[NewPool] = emqx_s3_profile_http_pools:all(profile_id()),
|
|
|
|
?assertWaitEvent(
|
|
ok = emqx_s3:stop_profile(profile_id()),
|
|
#{?snk_kind := "s3_stop_http_pool", pool_name := NewPool},
|
|
1000
|
|
),
|
|
|
|
?assertEqual(
|
|
0,
|
|
length(emqx_s3_profile_http_pools:all(profile_id()))
|
|
).
|
|
|
|
t_timeout_pool_cleanup(Config) ->
|
|
_ = process_flag(trap_exit, true),
|
|
|
|
%% We restart the profile to set `http_pool_timeout` value suitable for test
|
|
ok = emqx_s3:stop_profile(profile_id()),
|
|
ProfileBaseConfig = ?config(profile_config, Config),
|
|
ProfileConfig = ProfileBaseConfig#{
|
|
http_pool_timeout => 500,
|
|
http_pool_cleanup_interval => 100
|
|
},
|
|
ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
|
|
|
|
%% Start uploader
|
|
Key = emqx_s3_test_helpers:unique_key(),
|
|
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
|
|
ok = emqx_s3_uploader:write(Pid, <<"data">>),
|
|
|
|
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
|
|
|
|
NewProfileConfig = emqx_utils_maps:deep_put(
|
|
[transport_options, pool_size], ProfileConfig, 16
|
|
),
|
|
|
|
%% We update profile to create new pool and wait for the old one to be stopped by timeout
|
|
?assertWaitEvent(
|
|
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig),
|
|
#{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
|
|
1000
|
|
),
|
|
|
|
%% The uploader now has no valid pool and should fail
|
|
?assertMatch(
|
|
{error, _},
|
|
emqx_s3_uploader:complete(Pid)
|
|
).
|
|
|
|
t_checkout_no_profile(_Config) ->
|
|
?assertEqual(
|
|
{error, profile_not_found},
|
|
emqx_s3_profile_conf:checkout_config(<<"no_such_profile">>)
|
|
).
|
|
|
|
t_httpc_pool_start_error(Config) ->
|
|
%% `ehhtpc_pool`s are lazy so it is difficult to trigger an error
|
|
%% passing some bad connection options.
|
|
%% So we emulate some unknown crash with `meck`.
|
|
meck:new(ehttpc_pool, [passthrough]),
|
|
meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
|
|
|
|
?assertMatch(
|
|
{error, _},
|
|
emqx_s3:start_profile(<<"profile">>, ?config(profile_config, Config))
|
|
).
|
|
|
|
t_httpc_pool_update_error(Config) ->
|
|
%% `ehhtpc_pool`s are lazy so it is difficult to trigger an error
|
|
%% passing some bad connection options.
|
|
%% So we emulate some unknown crash with `meck`.
|
|
meck:new(ehttpc_pool, [passthrough]),
|
|
meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
|
|
|
|
ProfileBaseConfig = ?config(profile_config, Config),
|
|
NewProfileConfig = emqx_utils_maps:deep_put(
|
|
[transport_options, pool_size], ProfileBaseConfig, 16
|
|
),
|
|
|
|
?assertMatch(
|
|
{error, _},
|
|
emqx_s3:start_profile(<<"profile">>, NewProfileConfig)
|
|
).
|
|
|
|
t_orphaned_pools_cleanup(_Config) ->
|
|
ProfileId = profile_id(),
|
|
Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}),
|
|
|
|
%% We kill conf and wait for it to restart
|
|
%% and create a new pool
|
|
?assertWaitEvent(
|
|
exit(Pid, kill),
|
|
#{?snk_kind := "s3_start_http_pool", profile_id := ProfileId},
|
|
1000
|
|
),
|
|
|
|
%% We should still have only one pool
|
|
?assertEqual(
|
|
1,
|
|
length(emqx_s3_profile_http_pools:all(ProfileId))
|
|
).
|
|
|
|
t_orphaned_pools_cleanup_non_graceful(_Config) ->
|
|
ProfileId = profile_id(),
|
|
Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}),
|
|
|
|
%% We stop pool, conf server should not fail when attempting to stop it once more
|
|
[PoolName] = emqx_s3_profile_http_pools:all(ProfileId),
|
|
ok = ehttpc_pool:stop_pool(PoolName),
|
|
|
|
%% We kill conf and wait for it to restart
|
|
%% and create a new pool
|
|
?assertWaitEvent(
|
|
exit(Pid, kill),
|
|
#{?snk_kind := "s3_start_http_pool", profile_id := ProfileId},
|
|
1000
|
|
),
|
|
|
|
%% We should still have only one pool
|
|
?assertEqual(
|
|
1,
|
|
length(emqx_s3_profile_http_pools:all(ProfileId))
|
|
).
|
|
|
|
t_checkout_client(Config) ->
|
|
ProfileId = profile_id(),
|
|
Key = emqx_s3_test_helpers:unique_key(),
|
|
Caller = self(),
|
|
Pid = spawn_link(fun() ->
|
|
emqx_s3:with_client(
|
|
ProfileId,
|
|
fun(Client) ->
|
|
receive
|
|
put_object ->
|
|
Caller ! {put_object, emqx_s3_client:put_object(Client, Key, <<"data">>)}
|
|
end,
|
|
receive
|
|
list_objects ->
|
|
Caller ! {list_objects, emqx_s3_client:list(Client, [])}
|
|
end
|
|
end
|
|
),
|
|
Caller ! client_released,
|
|
receive
|
|
stop -> ok
|
|
end
|
|
end),
|
|
|
|
%% Ask spawned process to put object
|
|
Pid ! put_object,
|
|
receive
|
|
{put_object, ok} -> ok
|
|
after 1000 ->
|
|
ct:fail("put_object fail")
|
|
end,
|
|
|
|
%% Now change config for the profile
|
|
ProfileBaseConfig = ?config(profile_config, Config),
|
|
NewProfileConfig0 = ProfileBaseConfig#{bucket => <<"new_bucket">>},
|
|
NewProfileConfig1 = emqx_utils_maps:deep_put(
|
|
[transport_options, pool_size], NewProfileConfig0, 16
|
|
),
|
|
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
|
|
|
|
%% We should have two pools now, because the old one is still in use
|
|
%% by the spawned process
|
|
?assertEqual(
|
|
2,
|
|
length(emqx_s3_profile_http_pools:all(ProfileId))
|
|
),
|
|
|
|
%% Ask spawned process to list objects
|
|
Pid ! list_objects,
|
|
receive
|
|
{list_objects, Result} ->
|
|
{ok, OkResult} = Result,
|
|
Contents = proplists:get_value(contents, OkResult),
|
|
?assertEqual(1, length(Contents)),
|
|
?assertEqual(Key, proplists:get_value(key, hd(Contents)))
|
|
after 1000 ->
|
|
ct:fail("list_objects fail")
|
|
end,
|
|
|
|
%% Wait till spawned process releases client
|
|
receive
|
|
client_released -> ok
|
|
after 1000 ->
|
|
ct:fail("client not released")
|
|
end,
|
|
|
|
%% We should have only one pool now, because the old one is released
|
|
?assertEqual(
|
|
1,
|
|
length(emqx_s3_profile_http_pools:all(ProfileId))
|
|
).
|
|
|
|
t_unknown_messages(_Config) ->
|
|
Pid = gproc:where({n, l, emqx_s3_profile_conf:id(profile_id())}),
|
|
|
|
Pid ! unknown,
|
|
ok = gen_server:cast(Pid, unknown),
|
|
|
|
?assertEqual(
|
|
{error, not_implemented},
|
|
gen_server:call(Pid, unknown)
|
|
).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test helpers
|
|
%%--------------------------------------------------------------------
|
|
|
|
profile_id() ->
|
|
<<"test">>.
|