emqx/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl

283 lines
8.5 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_s3_profile_conf_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
suite() -> [{timetrap, {minutes, 1}}].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(emqx_s3),
Config.
end_per_suite(_Config) ->
ok = application:stop(emqx_s3).
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
TestAwsConfig = emqx_s3_test_helpers:aws_config(tcp),
Bucket = emqx_s3_test_helpers:unique_bucket(),
ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig),
ProfileBaseConfig = emqx_s3_test_helpers:base_config(tcp),
ProfileConfig = ProfileBaseConfig#{bucket => Bucket},
ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
[{profile_config, ProfileConfig} | Config].
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
_ = emqx_s3:stop_profile(profile_id()).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_regular_outdated_pool_cleanup(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
ProfileBaseConfig = ?config(profile_config, Config),
ProfileConfig = emqx_utils_maps:deep_put(
[transport_options, pool_size], ProfileBaseConfig, 16
),
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
?assertEqual(
2,
length(emqx_s3_profile_http_pools:all(profile_id()))
),
?assertWaitEvent(
ok = emqx_s3_uploader:abort(Pid),
#{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
1000
),
[NewPool] = emqx_s3_profile_http_pools:all(profile_id()),
?assertWaitEvent(
ok = emqx_s3:stop_profile(profile_id()),
#{?snk_kind := "s3_stop_http_pool", pool_name := NewPool},
1000
),
?assertEqual(
0,
length(emqx_s3_profile_http_pools:all(profile_id()))
).
t_timeout_pool_cleanup(Config) ->
_ = process_flag(trap_exit, true),
%% We restart the profile to set `http_pool_timeout` value suitable for test
ok = emqx_s3:stop_profile(profile_id()),
ProfileBaseConfig = ?config(profile_config, Config),
ProfileConfig = ProfileBaseConfig#{
http_pool_timeout => 500,
http_pool_cleanup_interval => 100
},
ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
%% Start uploader
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
ok = emqx_s3_uploader:write(Pid, <<"data">>),
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
NewProfileConfig = emqx_utils_maps:deep_put(
[transport_options, pool_size], ProfileConfig, 16
),
%% We update profile to create new pool and wait for the old one to be stopped by timeout
?assertWaitEvent(
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig),
#{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
1000
),
%% The uploader now has no valid pool and should fail
?assertMatch(
{error, _},
emqx_s3_uploader:complete(Pid)
).
t_checkout_no_profile(_Config) ->
?assertEqual(
{error, profile_not_found},
emqx_s3_profile_conf:checkout_config(<<"no_such_profile">>)
).
t_httpc_pool_start_error(Config) ->
%% `ehhtpc_pool`s are lazy so it is difficult to trigger an error
%% passing some bad connection options.
%% So we emulate some unknown crash with `meck`.
meck:new(ehttpc_pool, [passthrough]),
meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
?assertMatch(
{error, _},
emqx_s3:start_profile(<<"profile">>, ?config(profile_config, Config))
).
t_httpc_pool_update_error(Config) ->
%% `ehhtpc_pool`s are lazy so it is difficult to trigger an error
%% passing some bad connection options.
%% So we emulate some unknown crash with `meck`.
meck:new(ehttpc_pool, [passthrough]),
meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
ProfileBaseConfig = ?config(profile_config, Config),
NewProfileConfig = emqx_utils_maps:deep_put(
[transport_options, pool_size], ProfileBaseConfig, 16
),
?assertMatch(
{error, _},
emqx_s3:start_profile(<<"profile">>, NewProfileConfig)
).
t_orphaned_pools_cleanup(_Config) ->
ProfileId = profile_id(),
Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}),
%% We kill conf and wait for it to restart
%% and create a new pool
?assertWaitEvent(
exit(Pid, kill),
#{?snk_kind := "s3_start_http_pool", profile_id := ProfileId},
1000
),
%% We should still have only one pool
?assertEqual(
1,
length(emqx_s3_profile_http_pools:all(ProfileId))
).
t_orphaned_pools_cleanup_non_graceful(_Config) ->
ProfileId = profile_id(),
Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}),
%% We stop pool, conf server should not fail when attempting to stop it once more
[PoolName] = emqx_s3_profile_http_pools:all(ProfileId),
ok = ehttpc_pool:stop_pool(PoolName),
%% We kill conf and wait for it to restart
%% and create a new pool
?assertWaitEvent(
exit(Pid, kill),
#{?snk_kind := "s3_start_http_pool", profile_id := ProfileId},
1000
),
%% We should still have only one pool
?assertEqual(
1,
length(emqx_s3_profile_http_pools:all(ProfileId))
).
t_checkout_client(Config) ->
ProfileId = profile_id(),
Key = emqx_s3_test_helpers:unique_key(),
Caller = self(),
Pid = spawn_link(fun() ->
emqx_s3:with_client(
ProfileId,
fun(Client) ->
receive
put_object ->
Caller ! {put_object, emqx_s3_client:put_object(Client, Key, <<"data">>)}
end,
receive
list_objects ->
Caller ! {list_objects, emqx_s3_client:list(Client, [])}
end
end
),
Caller ! client_released,
receive
stop -> ok
end
end),
%% Ask spawned process to put object
Pid ! put_object,
receive
{put_object, ok} -> ok
after 1000 ->
ct:fail("put_object fail")
end,
%% Now change config for the profile
ProfileBaseConfig = ?config(profile_config, Config),
NewProfileConfig0 = ProfileBaseConfig#{bucket => <<"new_bucket">>},
NewProfileConfig1 = emqx_utils_maps:deep_put(
[transport_options, pool_size], NewProfileConfig0, 16
),
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
%% We should have two pools now, because the old one is still in use
%% by the spawned process
?assertEqual(
2,
length(emqx_s3_profile_http_pools:all(ProfileId))
),
%% Ask spawned process to list objects
Pid ! list_objects,
receive
{list_objects, Result} ->
{ok, OkResult} = Result,
Contents = proplists:get_value(contents, OkResult),
?assertEqual(1, length(Contents)),
?assertEqual(Key, proplists:get_value(key, hd(Contents)))
after 1000 ->
ct:fail("list_objects fail")
end,
%% Wait till spawned process releases client
receive
client_released -> ok
after 1000 ->
ct:fail("client not released")
end,
%% We should have only one pool now, because the old one is released
?assertEqual(
1,
length(emqx_s3_profile_http_pools:all(ProfileId))
).
t_unknown_messages(_Config) ->
Pid = gproc:where({n, l, emqx_s3_profile_conf:id(profile_id())}),
Pid ! unknown,
ok = gen_server:cast(Pid, unknown),
?assertEqual(
{error, not_implemented},
gen_server:call(Pid, unknown)
).
%%--------------------------------------------------------------------
%% Test helpers
%%--------------------------------------------------------------------
profile_id() ->
<<"test">>.