feat(s3-aggreg): support custom and default S3 object HTTP headers

I.e. configured container decides default `Content-Type` header.
This commit is contained in:
Andrew Mayorov 2024-04-26 13:41:06 +02:00
parent 5b15b2d641
commit 339036045d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 58 additions and 9 deletions

View File

@ -23,7 +23,8 @@
%% Interpreting options
-export([
mk_key_template/1
mk_key_template/1,
mk_upload_options/1
]).
%% emqx_bridge_v2_schema API
@ -160,8 +161,8 @@ desc(_Name) ->
%% Interpreting options
-spec mk_key_template(string()) -> emqx_template:str().
mk_key_template(Key) ->
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
mk_key_template(#{key := Key}) ->
Template = emqx_template:parse(Key),
{_, BindingErrors} = emqx_template:render(Template, #{}),
{UsedBindings, _} = lists:unzip(BindingErrors),
@ -188,6 +189,33 @@ mk_default_binding("datetime.") ->
mk_default_binding(Binding) ->
"${" ++ Binding ++ "}".
-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options().
mk_upload_options(Parameters) ->
Headers = mk_upload_headers(Parameters),
#{
headers => Headers,
acl => maps:get(acl, Parameters, undefined)
}.
mk_upload_headers(Parameters = #{container := Container}) ->
Headers = normalize_headers(maps:get(headers, Parameters, #{})),
ContainerHeaders = mk_container_headers(Container),
maps:merge(ContainerHeaders, Headers).
normalize_headers(Headers) ->
maps:fold(
fun(Header, Value, Acc) ->
maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc)
end,
#{},
Headers
).
mk_container_headers(#{type := csv}) ->
#{"content-type" => "text/csv"};
mk_container_headers(#{}) ->
#{}.
%% Examples
bridge_v2_examples(Method) ->

View File

@ -191,8 +191,7 @@ start_channel(State, #{
max_records := MaxRecords
},
container := Container,
bucket := Bucket,
key := Key
bucket := Bucket
}
}) ->
AggregOpts = #{
@ -202,9 +201,9 @@ start_channel(State, #{
},
DeliveryOpts = #{
bucket => Bucket,
key => emqx_bridge_s3_aggreg_upload:mk_key_template(Key),
key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters),
container => Container,
upload_options => upload_options(Parameters),
upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters),
client_config => maps:get(client_config, State),
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
},

View File

@ -70,7 +70,7 @@ end_per_suite(Config) ->
%% Testcases
init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(10)),
ct:timetrap(timer:seconds(15)),
ok = snabbkaffe:start_trace(),
TS = erlang:system_time(),
Name = iolist_to_binary(io_lib:format("~s-~p", [TestCase, TS])),
@ -124,6 +124,9 @@ action_config(Name, ConnectorId, Bucket) ->
<<"bucket">> => unicode:characters_to_binary(Bucket),
<<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>,
<<"acl">> => <<"public_read">>,
<<"headers">> => #{
<<"X-AMZ-Meta-Version">> => <<"42">>
},
<<"aggregation">> => #{
<<"time_interval">> => <<"4s">>,
<<"max_records">> => ?CONF_MAX_RECORDS
@ -173,7 +176,11 @@ t_aggreg_upload(Config) ->
[BridgeNameString, NodeString, _Datetime, _Seq = "0"],
string:split(Key, "/", all)
),
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
?assertMatch(
#{content_type := "text/csv", "x-amz-meta-version" := "42"},
Upload
),
%% Verify that column order is respected.
?assertMatch(
{ok, [

View File

@ -102,6 +102,14 @@ fields(s3_upload) ->
desc => ?DESC("acl"),
required => false
}
)},
{headers,
hoconsc:mk(
map(),
#{
required => false,
desc => ?DESC("upload_headers")
}
)}
];
fields(s3_uploader) ->

View File

@ -18,6 +18,13 @@ key.desc:
key.label:
"""Object Key"""
upload_headers.label:
"""S3 object HTTP headers"""
upload_headers.desc:
"""HTTP headers to include in the S3 object upload request.<br/>
Useful to specify content type, content encoding, etc. of the S3 object."""
host.desc:
"""The host of the S3 endpoint."""