From 339036045dcc57aff165b185742123dbbdcec38f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 26 Apr 2024 13:41:06 +0200 Subject: [PATCH] feat(s3-aggreg): support custom and default S3 object HTTP headers I.e. configured container decides default `Content-Type` header. --- .../src/emqx_bridge_s3_aggreg_upload.erl | 34 +++++++++++++++++-- .../src/emqx_bridge_s3_connector.erl | 7 ++-- .../emqx_bridge_s3_aggreg_upload_SUITE.erl | 11 ++++-- apps/emqx_s3/src/emqx_s3_schema.erl | 8 +++++ rel/i18n/emqx_s3_schema.hocon | 7 ++++ 5 files changed, 58 insertions(+), 9 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl index 497f59ca9..02d43f4f0 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl @@ -23,7 +23,8 @@ %% Interpreting options -export([ - mk_key_template/1 + mk_key_template/1, + mk_upload_options/1 ]). %% emqx_bridge_v2_schema API @@ -160,8 +161,8 @@ desc(_Name) -> %% Interpreting options --spec mk_key_template(string()) -> emqx_template:str(). -mk_key_template(Key) -> +-spec mk_key_template(_Parameters :: map()) -> emqx_template:str(). +mk_key_template(#{key := Key}) -> Template = emqx_template:parse(Key), {_, BindingErrors} = emqx_template:render(Template, #{}), {UsedBindings, _} = lists:unzip(BindingErrors), @@ -188,6 +189,33 @@ mk_default_binding("datetime.") -> mk_default_binding(Binding) -> "${" ++ Binding ++ "}". +-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options(). +mk_upload_options(Parameters) -> + Headers = mk_upload_headers(Parameters), + #{ + headers => Headers, + acl => maps:get(acl, Parameters, undefined) + }. + +mk_upload_headers(Parameters = #{container := Container}) -> + Headers = normalize_headers(maps:get(headers, Parameters, #{})), + ContainerHeaders = mk_container_headers(Container), + maps:merge(ContainerHeaders, Headers). + +normalize_headers(Headers) -> + maps:fold( + fun(Header, Value, Acc) -> + maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc) + end, + #{}, + Headers + ). + +mk_container_headers(#{type := csv}) -> + #{"content-type" => "text/csv"}; +mk_container_headers(#{}) -> + #{}. + %% Examples bridge_v2_examples(Method) -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 972294ef7..d135a087a 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -191,8 +191,7 @@ start_channel(State, #{ max_records := MaxRecords }, container := Container, - bucket := Bucket, - key := Key + bucket := Bucket } }) -> AggregOpts = #{ @@ -202,9 +201,9 @@ start_channel(State, #{ }, DeliveryOpts = #{ bucket => Bucket, - key => emqx_bridge_s3_aggreg_upload:mk_key_template(Key), + key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters), container => Container, - upload_options => upload_options(Parameters), + upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters), client_config => maps:get(client_config, State), uploader_config => maps:with([min_part_size, max_part_size], Parameters) }, diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index 45a830294..c7eeee2bb 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -70,7 +70,7 @@ end_per_suite(Config) -> %% Testcases init_per_testcase(TestCase, Config) -> - ct:timetrap(timer:seconds(10)), + ct:timetrap(timer:seconds(15)), ok = snabbkaffe:start_trace(), TS = erlang:system_time(), Name = iolist_to_binary(io_lib:format("~s-~p", [TestCase, TS])), @@ -124,6 +124,9 @@ action_config(Name, ConnectorId, Bucket) -> <<"bucket">> => unicode:characters_to_binary(Bucket), <<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>, <<"acl">> => <<"public_read">>, + <<"headers">> => #{ + <<"X-AMZ-Meta-Version">> => <<"42">> + }, <<"aggregation">> => #{ <<"time_interval">> => <<"4s">>, <<"max_records">> => ?CONF_MAX_RECORDS @@ -173,7 +176,11 @@ t_aggreg_upload(Config) -> [BridgeNameString, NodeString, _Datetime, _Seq = "0"], string:split(Key, "/", all) ), - _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + ?assertMatch( + #{content_type := "text/csv", "x-amz-meta-version" := "42"}, + Upload + ), %% Verify that column order is respected. ?assertMatch( {ok, [ diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index 1199948d0..1fa6d31cd 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -102,6 +102,14 @@ fields(s3_upload) -> desc => ?DESC("acl"), required => false } + )}, + {headers, + hoconsc:mk( + map(), + #{ + required => false, + desc => ?DESC("upload_headers") + } )} ]; fields(s3_uploader) -> diff --git a/rel/i18n/emqx_s3_schema.hocon b/rel/i18n/emqx_s3_schema.hocon index 44f4bbc56..b8b963539 100644 --- a/rel/i18n/emqx_s3_schema.hocon +++ b/rel/i18n/emqx_s3_schema.hocon @@ -18,6 +18,13 @@ key.desc: key.label: """Object Key""" +upload_headers.label: +"""S3 object HTTP headers""" + +upload_headers.desc: +"""HTTP headers to include in the S3 object upload request.
+Useful to specify content type, content encoding, etc. of the S3 object.""" + host.desc: """The host of the S3 endpoint."""