From a4eac75b25f8975dbc6a7c748f4e885181caf64f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 12 Feb 2024 18:20:03 +0100 Subject: [PATCH] fix(s3-bridge): handle recoverable AWS errors --- .../src/emqx_bridge_s3_connector.erl | 4 +- .../test/emqx_bridge_s3_SUITE.erl | 39 +++++++++++++++++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 9a0f110fe..a072c0464 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -196,8 +196,10 @@ run_simple_upload( map_error({socket_error, _} = Reason) -> {recoverable_error, Reason}; +map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> + %% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList + {recoverable_error, Reason}; map_error(Reason) -> - %% TODO: Recoverable errors. {unrecoverable_error, Reason}. render_bucket(Template, Data) -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index 5de30578b..da9787911 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). -import(emqx_utils_conv, [bin/1]). @@ -89,8 +90,8 @@ connector_config(Name, _Config) -> <<"headers">> => #{ <<"content-type">> => <> }, - <<"connect_timeout">> => 1000, - <<"request_timeout">> => 1000, + <<"connect_timeout">> => <<"500ms">>, + <<"request_timeout">> => <<"1s">>, <<"pool_size">> => 4, <<"max_retries">> => 0, <<"enable_pipelining">> => 1 @@ -110,13 +111,13 @@ action_config(Name, ConnectorId) -> <<"resource_opts">> => #{ <<"buffer_mode">> => <<"memory_only">>, <<"buffer_seg_bytes">> => <<"10MB">>, - <<"health_check_interval">> => <<"5s">>, + <<"health_check_interval">> => <<"3s">>, <<"inflight_window">> => 40, <<"max_buffer_bytes">> => <<"256MB">>, <<"metrics_flush_interval">> => <<"1s">>, <<"query_mode">> => <<"sync">>, <<"request_ttl">> => <<"60s">>, - <<"resume_interval">> => <<"5s">>, + <<"resume_interval">> => <<"3s">>, <<"worker_pool_size">> => <<"4">> } }). @@ -165,6 +166,36 @@ t_sync_query(Config) -> maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig)) ). +t_query_retry_recoverable(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + BridgeName = ?config(bridge_name, Config), + Bucket = emqx_s3_test_helpers:unique_bucket(), + Topic = "d/e/f", + Payload = rand:bytes(1024), + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + ok = erlcloud_s3:create_bucket(Bucket, AwsConfig), + %% Create a bridge with the sample configuration. + ?assertMatch( + {ok, _Bridge}, + emqx_bridge_v2_testlib:create_bridge(Config) + ), + %% Simulate recoverable failure. + _ = emqx_common_test_helpers:enable_failure(timeout, ?PROXY_NAME, ProxyHost, ProxyPort), + _ = timer:apply_after( + _Timeout = 5000, + emqx_common_test_helpers, + heal_failure, + [timeout, ?PROXY_NAME, ProxyHost, ProxyPort] + ), + Message = mk_message(Bucket, Topic, Payload), + %% Verify that the message is sent eventually. + ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}), + ?assertMatch( + #{content := Payload}, + maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig)) + ). + mk_message(ClientId, Topic, Payload) -> Message = emqx_message:make(bin(ClientId), bin(Topic), Payload), {Event, _} = emqx_rule_events:eventmsg_publish(Message),