emqx/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl

239 lines
7.9 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").
-import(emqx_utils_conv, [bin/1]).
%% See `emqx_bridge_s3.hrl`.
-define(BRIDGE_TYPE, <<"s3">>).
-define(CONNECTOR_TYPE, <<"s3">>).
-define(PROXY_NAME, "minio_tcp").
-define(CONTENT_TYPE, "application/x-emqx-payload").
%% CT Setup
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
% Setup toxiproxy
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
_ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_s3,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[
{apps, Apps},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{proxy_name, ?PROXY_NAME}
| Config
].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
%% Testcases
init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(30)),
ok = snabbkaffe:start_trace(),
Name = iolist_to_binary(io_lib:format("~s~p", [TestCase, erlang:unique_integer()])),
ConnectorConfig = connector_config(Name, Config),
ActionConfig = action_config(Name, Name),
[
{connector_type, ?CONNECTOR_TYPE},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name},
{bridge_config, ActionConfig}
| Config
].
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
ok.
connector_config(Name, _Config) ->
BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
<<"enable">> => true,
<<"description">> => <<"S3 Connector">>,
<<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
<<"port">> => maps:get(<<"port">>, BaseConf),
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
<<"transport_options">> => #{
<<"headers">> => #{
<<"content-type">> => <<?CONTENT_TYPE>>
},
<<"connect_timeout">> => <<"500ms">>,
<<"request_timeout">> => <<"1s">>,
<<"pool_size">> => 4,
<<"max_retries">> => 0,
<<"enable_pipelining">> => 1
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"5s">>,
<<"start_timeout">> => <<"5s">>
}
}).
action_config(Name, ConnectorId) ->
parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> => #{
<<"bucket">> => <<"${clientid}">>,
<<"key">> => <<"${topic}">>,
<<"content">> => <<"${payload}">>,
<<"acl">> => <<"public_read">>
},
<<"resource_opts">> => #{
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"3s">>,
<<"inflight_window">> => 40,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"60s">>,
<<"resume_interval">> => <<"3s">>,
<<"worker_pool_size">> => <<"4">>
}
}).
parse_and_check_config(Root, Type, Name, ConfigIn) ->
Schema =
case Root of
<<"connectors">> -> emqx_connector_schema;
<<"actions">> -> emqx_bridge_v2_schema
end,
#{Root := #{Type := #{Name := Config}}} =
hocon_tconf:check_plain(
Schema,
#{Root => #{Type => #{Name => ConfigIn}}},
#{required => false, atom_key => false}
),
ct:pal("parsed config: ~p", [Config]),
ConfigIn.
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
t_start_broken_update_restart(Config) ->
Name = ?config(connector_name, Config),
Type = ?config(connector_type, Config),
ConnectorConf = ?config(connector_config, Config),
ConnectorConfBroken = maps:merge(
ConnectorConf,
#{<<"secret_access_key">> => <<"imnotanadmin">>}
),
?assertMatch(
{ok, {{_HTTP, 201, _}, _, _}},
emqx_bridge_v2_testlib:create_connector_api(Name, Type, ConnectorConfBroken)
),
ConnectorId = emqx_connector_resource:resource_id(Type, Name),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
),
?assertMatch(
{ok, {{_HTTP, 200, _}, _, _}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)
),
?assertMatch(
{ok, {{_HTTP, 204, _}, _, _}},
emqx_bridge_v2_testlib:start_connector_api(Name, Type)
),
?retry(
1_000,
20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ConnectorId))
).
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config).
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
t_sync_query(Config) ->
Bucket = emqx_s3_test_helpers:unique_bucket(),
Topic = "a/b/c",
Payload = rand:bytes(1024),
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun() -> mk_message(Bucket, Topic, Payload) end,
fun(Res) -> ?assertMatch(ok, Res) end,
s3_bridge_connector_upload_ok
),
?assertMatch(
#{
content := Payload,
content_type := ?CONTENT_TYPE
},
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
).
t_query_retry_recoverable(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
BridgeName = ?config(bridge_name, Config),
Bucket = emqx_s3_test_helpers:unique_bucket(),
Topic = "d/e/f",
Payload = rand:bytes(1024),
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
%% Create a bridge with the sample configuration.
?assertMatch(
{ok, _Bridge},
emqx_bridge_v2_testlib:create_bridge(Config)
),
%% Simulate recoverable failure.
_ = emqx_common_test_helpers:enable_failure(timeout, ?PROXY_NAME, ProxyHost, ProxyPort),
_ = timer:apply_after(
_Timeout = 5000,
emqx_common_test_helpers,
heal_failure,
[timeout, ?PROXY_NAME, ProxyHost, ProxyPort]
),
Message = mk_message(Bucket, Topic, Payload),
%% Verify that the message is sent eventually.
ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}),
?assertMatch(
#{content := Payload},
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
).
mk_message(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
Event.