239 lines
7.9 KiB
Erlang
239 lines
7.9 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_s3_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/test_macros.hrl").
|
|
|
|
-import(emqx_utils_conv, [bin/1]).
|
|
|
|
%% See `emqx_bridge_s3.hrl`.
|
|
-define(BRIDGE_TYPE, <<"s3">>).
|
|
-define(CONNECTOR_TYPE, <<"s3">>).
|
|
|
|
-define(PROXY_NAME, "minio_tcp").
|
|
-define(CONTENT_TYPE, "application/x-emqx-payload").
|
|
|
|
%% CT Setup
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
% Setup toxiproxy
|
|
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
|
_ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
Apps = emqx_cth_suite:start(
|
|
[
|
|
emqx,
|
|
emqx_conf,
|
|
emqx_connector,
|
|
emqx_bridge_s3,
|
|
emqx_bridge,
|
|
emqx_rule_engine,
|
|
emqx_management,
|
|
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
),
|
|
{ok, _} = emqx_common_test_http:create_default_app(),
|
|
[
|
|
{apps, Apps},
|
|
{proxy_host, ProxyHost},
|
|
{proxy_port, ProxyPort},
|
|
{proxy_name, ?PROXY_NAME}
|
|
| Config
|
|
].
|
|
|
|
end_per_suite(Config) ->
|
|
ok = emqx_cth_suite:stop(?config(apps, Config)).
|
|
|
|
%% Testcases
|
|
|
|
init_per_testcase(TestCase, Config) ->
|
|
ct:timetrap(timer:seconds(30)),
|
|
ok = snabbkaffe:start_trace(),
|
|
Name = iolist_to_binary(io_lib:format("~s~p", [TestCase, erlang:unique_integer()])),
|
|
ConnectorConfig = connector_config(Name, Config),
|
|
ActionConfig = action_config(Name, Name),
|
|
[
|
|
{connector_type, ?CONNECTOR_TYPE},
|
|
{connector_name, Name},
|
|
{connector_config, ConnectorConfig},
|
|
{bridge_type, ?BRIDGE_TYPE},
|
|
{bridge_name, Name},
|
|
{bridge_config, ActionConfig}
|
|
| Config
|
|
].
|
|
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ok = snabbkaffe:stop(),
|
|
ok.
|
|
|
|
connector_config(Name, _Config) ->
|
|
BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
|
|
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
|
|
<<"enable">> => true,
|
|
<<"description">> => <<"S3 Connector">>,
|
|
<<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
|
|
<<"port">> => maps:get(<<"port">>, BaseConf),
|
|
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
|
|
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
|
|
<<"transport_options">> => #{
|
|
<<"headers">> => #{
|
|
<<"content-type">> => <<?CONTENT_TYPE>>
|
|
},
|
|
<<"connect_timeout">> => <<"500ms">>,
|
|
<<"request_timeout">> => <<"1s">>,
|
|
<<"pool_size">> => 4,
|
|
<<"max_retries">> => 0,
|
|
<<"enable_pipelining">> => 1
|
|
},
|
|
<<"resource_opts">> => #{
|
|
<<"health_check_interval">> => <<"5s">>,
|
|
<<"start_timeout">> => <<"5s">>
|
|
}
|
|
}).
|
|
|
|
action_config(Name, ConnectorId) ->
|
|
parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{
|
|
<<"enable">> => true,
|
|
<<"connector">> => ConnectorId,
|
|
<<"parameters">> => #{
|
|
<<"bucket">> => <<"${clientid}">>,
|
|
<<"key">> => <<"${topic}">>,
|
|
<<"content">> => <<"${payload}">>,
|
|
<<"acl">> => <<"public_read">>
|
|
},
|
|
<<"resource_opts">> => #{
|
|
<<"buffer_mode">> => <<"memory_only">>,
|
|
<<"buffer_seg_bytes">> => <<"10MB">>,
|
|
<<"health_check_interval">> => <<"3s">>,
|
|
<<"inflight_window">> => 40,
|
|
<<"max_buffer_bytes">> => <<"256MB">>,
|
|
<<"metrics_flush_interval">> => <<"1s">>,
|
|
<<"query_mode">> => <<"sync">>,
|
|
<<"request_ttl">> => <<"60s">>,
|
|
<<"resume_interval">> => <<"3s">>,
|
|
<<"worker_pool_size">> => <<"4">>
|
|
}
|
|
}).
|
|
|
|
parse_and_check_config(Root, Type, Name, ConfigIn) ->
|
|
Schema =
|
|
case Root of
|
|
<<"connectors">> -> emqx_connector_schema;
|
|
<<"actions">> -> emqx_bridge_v2_schema
|
|
end,
|
|
#{Root := #{Type := #{Name := Config}}} =
|
|
hocon_tconf:check_plain(
|
|
Schema,
|
|
#{Root => #{Type => #{Name => ConfigIn}}},
|
|
#{required => false, atom_key => false}
|
|
),
|
|
ct:pal("parsed config: ~p", [Config]),
|
|
ConfigIn.
|
|
|
|
t_start_stop(Config) ->
|
|
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
|
|
|
|
t_start_broken_update_restart(Config) ->
|
|
Name = ?config(connector_name, Config),
|
|
Type = ?config(connector_type, Config),
|
|
ConnectorConf = ?config(connector_config, Config),
|
|
ConnectorConfBroken = maps:merge(
|
|
ConnectorConf,
|
|
#{<<"secret_access_key">> => <<"imnotanadmin">>}
|
|
),
|
|
?assertMatch(
|
|
{ok, {{_HTTP, 201, _}, _, _}},
|
|
emqx_bridge_v2_testlib:create_connector_api(Name, Type, ConnectorConfBroken)
|
|
),
|
|
ConnectorId = emqx_connector_resource:resource_id(Type, Name),
|
|
?retry(
|
|
_Sleep = 1_000,
|
|
_Attempts = 20,
|
|
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId))
|
|
),
|
|
?assertMatch(
|
|
{ok, {{_HTTP, 200, _}, _, _}},
|
|
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf)
|
|
),
|
|
?assertMatch(
|
|
{ok, {{_HTTP, 204, _}, _, _}},
|
|
emqx_bridge_v2_testlib:start_connector_api(Name, Type)
|
|
),
|
|
?retry(
|
|
1_000,
|
|
20,
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ConnectorId))
|
|
).
|
|
|
|
t_create_via_http(Config) ->
|
|
emqx_bridge_v2_testlib:t_create_via_http(Config).
|
|
|
|
t_on_get_status(Config) ->
|
|
emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
|
|
|
|
t_sync_query(Config) ->
|
|
Bucket = emqx_s3_test_helpers:unique_bucket(),
|
|
Topic = "a/b/c",
|
|
Payload = rand:bytes(1024),
|
|
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
|
|
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
|
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
|
Config,
|
|
fun() -> mk_message(Bucket, Topic, Payload) end,
|
|
fun(Res) -> ?assertMatch(ok, Res) end,
|
|
s3_bridge_connector_upload_ok
|
|
),
|
|
?assertMatch(
|
|
#{
|
|
content := Payload,
|
|
content_type := ?CONTENT_TYPE
|
|
},
|
|
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
|
|
).
|
|
|
|
t_query_retry_recoverable(Config) ->
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
BridgeName = ?config(bridge_name, Config),
|
|
Bucket = emqx_s3_test_helpers:unique_bucket(),
|
|
Topic = "d/e/f",
|
|
Payload = rand:bytes(1024),
|
|
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
|
|
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
|
|
%% Create a bridge with the sample configuration.
|
|
?assertMatch(
|
|
{ok, _Bridge},
|
|
emqx_bridge_v2_testlib:create_bridge(Config)
|
|
),
|
|
%% Simulate recoverable failure.
|
|
_ = emqx_common_test_helpers:enable_failure(timeout, ?PROXY_NAME, ProxyHost, ProxyPort),
|
|
_ = timer:apply_after(
|
|
_Timeout = 5000,
|
|
emqx_common_test_helpers,
|
|
heal_failure,
|
|
[timeout, ?PROXY_NAME, ProxyHost, ProxyPort]
|
|
),
|
|
Message = mk_message(Bucket, Topic, Payload),
|
|
%% Verify that the message is sent eventually.
|
|
ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}),
|
|
?assertMatch(
|
|
#{content := Payload},
|
|
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
|
|
).
|
|
|
|
mk_message(ClientId, Topic, Payload) ->
|
|
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
|
|
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
|
|
Event.
|