Merge pull request #12506 from keynslug/fix/EMQX-11830/recoverable

fix(s3-bridge): handle recoverable AWS errors
This commit is contained in:
Andrew Mayorov 2024-02-13 12:44:23 +01:00 committed by GitHub
commit 6fbb6f6846
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 38 additions and 5 deletions

View File

@ -196,8 +196,10 @@ run_simple_upload(
map_error({socket_error, _} = Reason) ->
{recoverable_error, Reason};
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
%% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
{recoverable_error, Reason};
map_error(Reason) ->
%% TODO: Recoverable errors.
{unrecoverable_error, Reason}.
render_bucket(Template, Data) ->

View File

@ -9,6 +9,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").
-import(emqx_utils_conv, [bin/1]).
@ -89,8 +90,8 @@ connector_config(Name, _Config) ->
<<"headers">> => #{
<<"content-type">> => <<?CONTENT_TYPE>>
},
<<"connect_timeout">> => 1000,
<<"request_timeout">> => 1000,
<<"connect_timeout">> => <<"500ms">>,
<<"request_timeout">> => <<"1s">>,
<<"pool_size">> => 4,
<<"max_retries">> => 0,
<<"enable_pipelining">> => 1
@ -110,13 +111,13 @@ action_config(Name, ConnectorId) ->
<<"resource_opts">> => #{
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"5s">>,
<<"health_check_interval">> => <<"3s">>,
<<"inflight_window">> => 40,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"60s">>,
<<"resume_interval">> => <<"5s">>,
<<"resume_interval">> => <<"3s">>,
<<"worker_pool_size">> => <<"4">>
}
}).
@ -165,6 +166,36 @@ t_sync_query(Config) ->
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
).
t_query_retry_recoverable(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
BridgeName = ?config(bridge_name, Config),
Bucket = emqx_s3_test_helpers:unique_bucket(),
Topic = "d/e/f",
Payload = rand:bytes(1024),
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
%% Create a bridge with the sample configuration.
?assertMatch(
{ok, _Bridge},
emqx_bridge_v2_testlib:create_bridge(Config)
),
%% Simulate recoverable failure.
_ = emqx_common_test_helpers:enable_failure(timeout, ?PROXY_NAME, ProxyHost, ProxyPort),
_ = timer:apply_after(
_Timeout = 5000,
emqx_common_test_helpers,
heal_failure,
[timeout, ?PROXY_NAME, ProxyHost, ProxyPort]
),
Message = mk_message(Bucket, Topic, Payload),
%% Verify that the message is sent eventually.
ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}),
?assertMatch(
#{content := Payload},
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
).
mk_message(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),