Merge pull request #10350 from keynslug/fix/EMQX-9486/sigv4-s3-urls
fix(ft-s3): use AWS4 signed urls for S3 export URIs
This commit is contained in:
commit
a99eb082d1
|
@ -195,7 +195,8 @@ schema(filemeta) ->
|
|||
{name,
|
||||
hoconsc:mk(string(), #{
|
||||
required => true,
|
||||
validator => validator(filename)
|
||||
validator => validator(filename),
|
||||
converter => converter(unicode_string)
|
||||
})},
|
||||
{size, hoconsc:mk(non_neg_integer())},
|
||||
{expire_at, hoconsc:mk(non_neg_integer())},
|
||||
|
@ -220,6 +221,17 @@ converter(checksum) ->
|
|||
_ = is_binary(Hex) orelse throw({expected_type, string}),
|
||||
_ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}),
|
||||
{sha256, binary:decode_hex(Hex)}
|
||||
end;
|
||||
converter(unicode_string) ->
|
||||
fun
|
||||
(undefined, #{}) ->
|
||||
undefined;
|
||||
(Str, #{make_serializable := true}) ->
|
||||
_ = is_list(Str) orelse throw({expected_type, string}),
|
||||
unicode:characters_to_binary(Str);
|
||||
(Str, #{}) ->
|
||||
_ = is_binary(Str) orelse throw({expected_type, string}),
|
||||
unicode:characters_to_list(Str)
|
||||
end.
|
||||
|
||||
ref(Ref) ->
|
||||
|
|
|
@ -130,10 +130,13 @@ s3_headers({ClientId, FileId}, Filemeta) ->
|
|||
<<"x-amz-meta-clientid">> => ClientId,
|
||||
%% It [Topic Name] MUST be a UTF-8 Encoded String
|
||||
<<"x-amz-meta-fileid">> => FileId,
|
||||
<<"x-amz-meta-filemeta">> => emqx_json:encode(emqx_ft:encode_filemeta(Filemeta)),
|
||||
<<"x-amz-meta-filemeta">> => s3_header_filemeta(Filemeta),
|
||||
<<"x-amz-meta-filemeta-vsn">> => ?FILEMETA_VSN
|
||||
}.
|
||||
|
||||
s3_header_filemeta(Filemeta) ->
|
||||
emqx_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]).
|
||||
|
||||
list(Client, Options) ->
|
||||
case list_key_info(Client, Options) of
|
||||
{ok, KeyInfos} ->
|
||||
|
|
|
@ -169,39 +169,38 @@ t_invalid_filename(Config) ->
|
|||
C = ?config(client, Config),
|
||||
?assertRCName(
|
||||
unspecified_error,
|
||||
emqtt:publish(C, mk_init_topic(<<"f1">>), emqx_json:encode(meta(".", <<>>)), 1)
|
||||
emqtt:publish(C, mk_init_topic(<<"f1">>), encode_meta(meta(".", <<>>)), 1)
|
||||
),
|
||||
?assertRCName(
|
||||
unspecified_error,
|
||||
emqtt:publish(C, mk_init_topic(<<"f2">>), emqx_json:encode(meta("..", <<>>)), 1)
|
||||
emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("..", <<>>)), 1)
|
||||
),
|
||||
?assertRCName(
|
||||
unspecified_error,
|
||||
emqtt:publish(C, mk_init_topic(<<"f2">>), emqx_json:encode(meta("../nice", <<>>)), 1)
|
||||
emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("../nice", <<>>)), 1)
|
||||
),
|
||||
?assertRCName(
|
||||
unspecified_error,
|
||||
emqtt:publish(C, mk_init_topic(<<"f3">>), emqx_json:encode(meta("/etc/passwd", <<>>)), 1)
|
||||
emqtt:publish(C, mk_init_topic(<<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1)
|
||||
),
|
||||
?assertRCName(
|
||||
success,
|
||||
emqtt:publish(C, mk_init_topic(<<"f4">>), emqx_json:encode(meta("146%", <<>>)), 1)
|
||||
emqtt:publish(C, mk_init_topic(<<"f4">>), encode_meta(meta("146%", <<>>)), 1)
|
||||
).
|
||||
|
||||
t_simple_transfer(Config) ->
|
||||
C = ?config(client, Config),
|
||||
|
||||
Filename = <<"topsecret.pdf">>,
|
||||
Filename = "topsecret.pdf",
|
||||
FileId = <<"f1">>,
|
||||
|
||||
Data = [<<"first">>, <<"second">>, <<"third">>],
|
||||
|
||||
Meta = #{size := Filesize} = meta(Filename, Data),
|
||||
MetaPayload = emqx_json:encode(Meta),
|
||||
|
||||
?assertRCName(
|
||||
success,
|
||||
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
|
||||
emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
|
||||
),
|
||||
|
||||
lists:foreach(
|
||||
|
@ -246,23 +245,21 @@ t_nasty_clientids_fileids(_Config) ->
|
|||
t_meta_conflict(Config) ->
|
||||
C = ?config(client, Config),
|
||||
|
||||
Filename = <<"topsecret.pdf">>,
|
||||
Filename = "topsecret.pdf",
|
||||
FileId = <<"f1">>,
|
||||
|
||||
Meta = meta(Filename, [<<"x">>]),
|
||||
MetaPayload = emqx_json:encode(Meta),
|
||||
|
||||
?assertRCName(
|
||||
success,
|
||||
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
|
||||
emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
|
||||
),
|
||||
|
||||
ConflictMeta = Meta#{name => <<"conflict.pdf">>},
|
||||
ConflictMetaPayload = emqx_json:encode(ConflictMeta),
|
||||
ConflictMeta = Meta#{name => "conflict.pdf"},
|
||||
|
||||
?assertRCName(
|
||||
unspecified_error,
|
||||
emqtt:publish(C, mk_init_topic(FileId), ConflictMetaPayload, 1)
|
||||
emqtt:publish(C, mk_init_topic(FileId), encode_meta(ConflictMeta), 1)
|
||||
).
|
||||
|
||||
t_no_meta(Config) ->
|
||||
|
@ -284,17 +281,16 @@ t_no_meta(Config) ->
|
|||
t_no_segment(Config) ->
|
||||
C = ?config(client, Config),
|
||||
|
||||
Filename = <<"topsecret.pdf">>,
|
||||
Filename = "topsecret.pdf",
|
||||
FileId = <<"f1">>,
|
||||
|
||||
Data = [<<"first">>, <<"second">>, <<"third">>],
|
||||
|
||||
Meta = #{size := Filesize} = meta(Filename, Data),
|
||||
MetaPayload = emqx_json:encode(Meta),
|
||||
|
||||
?assertRCName(
|
||||
success,
|
||||
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
|
||||
emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
|
||||
),
|
||||
|
||||
lists:foreach(
|
||||
|
@ -335,13 +331,13 @@ t_invalid_meta(Config) ->
|
|||
t_invalid_checksum(Config) ->
|
||||
C = ?config(client, Config),
|
||||
|
||||
Filename = <<"topsecret.pdf">>,
|
||||
Filename = "topsecret.pdf",
|
||||
FileId = <<"f1">>,
|
||||
|
||||
Data = [<<"first">>, <<"second">>, <<"third">>],
|
||||
|
||||
Meta = #{size := Filesize} = meta(Filename, Data),
|
||||
MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}),
|
||||
MetaPayload = encode_meta(Meta#{checksum => {sha256, sha256(<<"invalid">>)}}),
|
||||
|
||||
?assertRCName(
|
||||
success,
|
||||
|
@ -366,7 +362,7 @@ t_invalid_checksum(Config) ->
|
|||
t_corrupted_segment_retry(Config) ->
|
||||
C = ?config(client, Config),
|
||||
|
||||
Filename = <<"corruption.pdf">>,
|
||||
Filename = "corruption.pdf",
|
||||
FileId = <<"4242-4242">>,
|
||||
|
||||
Data = [<<"first">>, <<"second">>, <<"third">>],
|
||||
|
@ -379,11 +375,11 @@ t_corrupted_segment_retry(Config) ->
|
|||
Checksum1,
|
||||
Checksum2,
|
||||
Checksum3
|
||||
] = [sha256hex(S) || S <- Data],
|
||||
] = [binary:encode_hex(sha256(S)) || S <- Data],
|
||||
|
||||
Meta = #{size := Filesize} = meta(Filename, Data),
|
||||
|
||||
?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), emqx_json:encode(Meta), 1)),
|
||||
?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)),
|
||||
|
||||
?assertRCName(
|
||||
success,
|
||||
|
@ -421,7 +417,7 @@ t_switch_node(Config) ->
|
|||
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, AdditionalNodePort}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
|
||||
Filename = <<"multinode_upload.txt">>,
|
||||
Filename = "multinode_upload.txt",
|
||||
FileId = <<"f1">>,
|
||||
|
||||
Data = [<<"first">>, <<"second">>, <<"third">>],
|
||||
|
@ -430,11 +426,10 @@ t_switch_node(Config) ->
|
|||
%% First, publist metadata and the first segment to the additional node
|
||||
|
||||
Meta = #{size := Filesize} = meta(Filename, Data),
|
||||
MetaPayload = emqx_json:encode(Meta),
|
||||
|
||||
?assertRCName(
|
||||
success,
|
||||
emqtt:publish(C1, mk_init_topic(FileId), MetaPayload, 1)
|
||||
emqtt:publish(C1, mk_init_topic(FileId), encode_meta(Meta), 1)
|
||||
),
|
||||
?assertRCName(
|
||||
success,
|
||||
|
@ -593,7 +588,7 @@ disown_mqtt_client(Context = #{}) ->
|
|||
send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) ->
|
||||
?assertRCName(
|
||||
success,
|
||||
emqtt:publish(Client, mk_init_topic(FileId), emqx_json:encode(Meta), 1)
|
||||
emqtt:publish(Client, mk_init_topic(FileId), encode_meta(Meta), 1)
|
||||
),
|
||||
Context.
|
||||
|
||||
|
@ -650,18 +645,21 @@ with_offsets(Items) ->
|
|||
),
|
||||
List.
|
||||
|
||||
sha256hex(Data) ->
|
||||
binary:encode_hex(crypto:hash(sha256, Data)).
|
||||
sha256(Data) ->
|
||||
crypto:hash(sha256, Data).
|
||||
|
||||
meta(FileName, Data) ->
|
||||
FullData = iolist_to_binary(Data),
|
||||
#{
|
||||
name => FileName,
|
||||
checksum => sha256hex(FullData),
|
||||
checksum => {sha256, sha256(FullData)},
|
||||
expire_at => erlang:system_time(_Unit = second) + 3600,
|
||||
size => byte_size(FullData)
|
||||
}.
|
||||
|
||||
encode_meta(Meta) ->
|
||||
emqx_json:encode(emqx_ft:encode_filemeta(Meta)).
|
||||
|
||||
list_files(ClientId) ->
|
||||
{ok, Files} = emqx_ft_storage:files(),
|
||||
[File || File = #{transfer := {CId, _}} <- Files, CId == ClientId].
|
||||
|
|
|
@ -78,11 +78,13 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
|
|||
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
Meta = #{
|
||||
name => unicode:characters_to_binary(Name),
|
||||
name => Name,
|
||||
expire_at => erlang:system_time(_Unit = second) + 3600,
|
||||
size => Size
|
||||
},
|
||||
MetaPayload = emqx_json:encode(Meta),
|
||||
MetaPayload = emqx_json:encode(emqx_ft:encode_filemeta(Meta)),
|
||||
|
||||
ct:pal("MetaPayload = ~ts", [MetaPayload]),
|
||||
|
||||
MetaTopic = <<"$file/", FileId/binary, "/init">>,
|
||||
{ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{deps, [
|
||||
{emqx, {path, "../../apps/emqx"}},
|
||||
{erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.7-emqx-1"}}}
|
||||
{erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}}
|
||||
]}.
|
||||
|
||||
{project_plugins, [erlfmt]}.
|
||||
|
|
|
@ -189,7 +189,7 @@ list(#{bucket := Bucket, aws_config := AwsConfig}, Options) ->
|
|||
|
||||
-spec uri(client(), key()) -> iodata().
|
||||
uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) ->
|
||||
erlcloud_s3:make_get_url(ExpireTime, Bucket, erlcloud_key(Key), AwsConfig).
|
||||
erlcloud_s3:make_presigned_v4_url(ExpireTime, Bucket, get, erlcloud_key(Key), [], AwsConfig).
|
||||
|
||||
-spec format(client()) -> term().
|
||||
format(#{aws_config := AwsConfig} = Client) ->
|
||||
|
|
|
@ -54,6 +54,8 @@ groups() ->
|
|||
t_happy_path_multi,
|
||||
t_abort_multi,
|
||||
t_abort_simple_put,
|
||||
t_signed_url_download,
|
||||
t_signed_nonascii_url_download,
|
||||
|
||||
{group, noconn_errors},
|
||||
{group, timeout_errors},
|
||||
|
@ -193,6 +195,40 @@ t_happy_path_multi(Config) ->
|
|||
Key
|
||||
).
|
||||
|
||||
t_signed_url_download(_Config) ->
|
||||
Prefix = emqx_s3_test_helpers:unique_key(),
|
||||
Key = Prefix ++ "/ascii.txt",
|
||||
|
||||
{ok, Data} = upload(Key, 1024, 5),
|
||||
|
||||
SignedUrl = emqx_s3:with_client(profile_id(), fun(Client) ->
|
||||
emqx_s3_client:uri(Client, Key)
|
||||
end),
|
||||
|
||||
{ok, {_, _, Body}} = httpc:request(get, {SignedUrl, []}, [], []),
|
||||
|
||||
?assertEqual(
|
||||
iolist_to_binary(Data),
|
||||
iolist_to_binary(Body)
|
||||
).
|
||||
|
||||
t_signed_nonascii_url_download(_Config) ->
|
||||
Prefix = emqx_s3_test_helpers:unique_key(),
|
||||
Key = Prefix ++ "/unicode-🫠.txt",
|
||||
|
||||
{ok, Data} = upload(Key, 1024 * 1024, 8),
|
||||
|
||||
SignedUrl = emqx_s3:with_client(profile_id(), fun(Client) ->
|
||||
emqx_s3_client:uri(Client, Key)
|
||||
end),
|
||||
|
||||
{ok, {_, _, Body}} = httpc:request(get, {SignedUrl, []}, [], []),
|
||||
|
||||
?assertEqual(
|
||||
iolist_to_binary(Data),
|
||||
iolist_to_binary(Body)
|
||||
).
|
||||
|
||||
t_abort_multi(Config) ->
|
||||
Key = emqx_s3_test_helpers:unique_key(),
|
||||
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
|
||||
|
@ -532,3 +568,24 @@ data(Byte, ChunkSize, ChunkCount) ->
|
|||
list_objects(Config) ->
|
||||
Props = erlcloud_s3:list_objects(?config(bucket, Config), [], ?config(test_aws_config, Config)),
|
||||
proplists:get_value(contents, Props).
|
||||
|
||||
upload(Key, ChunkSize, ChunkCount) ->
|
||||
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
|
||||
|
||||
_ = erlang:monitor(process, Pid),
|
||||
|
||||
Data = data($a, ChunkSize, ChunkCount),
|
||||
|
||||
ok = lists:foreach(
|
||||
fun(Chunk) -> ?assertEqual(ok, emqx_s3_uploader:write(Pid, Chunk)) end,
|
||||
Data
|
||||
),
|
||||
|
||||
ok = emqx_s3_uploader:complete(Pid),
|
||||
|
||||
ok = ?assertProcessExited(
|
||||
normal,
|
||||
Pid
|
||||
),
|
||||
|
||||
{ok, Data}.
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
|
||||
{tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.6"}}},
|
||||
{clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.3"}}},
|
||||
{erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.7-emqx-1"}}},
|
||||
{erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}},
|
||||
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.1"}}},
|
||||
{emqx, {path, "../../apps/emqx"}}
|
||||
]}.
|
||||
|
|
4
mix.exs
4
mix.exs
|
@ -94,10 +94,10 @@ defmodule EMQXUmbrella.MixProject do
|
|||
# in conflict by grpc and eetcd
|
||||
{:gpb, "4.19.5", override: true, runtime: false},
|
||||
{:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true},
|
||||
{:erlcloud, github: "emqx/erlcloud", tag: "3.6.7-emqx-1", override: true},
|
||||
{:erlcloud, github: "emqx/erlcloud", tag: "3.6.8-emqx-1", override: true},
|
||||
# erlcloud's rebar.config requires rebar3 and does not support Mix,
|
||||
# so it tries to fetch deps from git. We need to override this.
|
||||
{:lhttpc, "1.6.2", override: true},
|
||||
{:lhttpc, github: "erlcloud/lhttpc", tag: "1.6.2", override: true},
|
||||
{:eini, "1.2.9", override: true},
|
||||
{:base16, "1.0.0", override: true}
|
||||
# end of erlcloud's deps
|
||||
|
|
|
@ -82,7 +82,7 @@
|
|||
{minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.9"}}},
|
||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}},
|
||||
{rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}},
|
||||
{rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.5"}}},
|
||||
% NOTE: depends on recon 2.5.x
|
||||
{observer_cli, "1.7.1"},
|
||||
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}},
|
||||
|
|
Loading…
Reference in New Issue