feat(ft-s3): store metadata in ASCII-safe format

Also ensure consistent encoding and decoding filenames throughout
the `emqx_ft` application.
This commit is contained in:
Andrew Mayorov 2023-04-07 21:58:27 +03:00
parent 0d86ef8d3a
commit 8daa38ef06
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 49 additions and 34 deletions

View File

@ -195,7 +195,8 @@ schema(filemeta) ->
{name, {name,
hoconsc:mk(string(), #{ hoconsc:mk(string(), #{
required => true, required => true,
validator => validator(filename) validator => validator(filename),
converter => converter(unicode_string)
})}, })},
{size, hoconsc:mk(non_neg_integer())}, {size, hoconsc:mk(non_neg_integer())},
{expire_at, hoconsc:mk(non_neg_integer())}, {expire_at, hoconsc:mk(non_neg_integer())},
@ -220,6 +221,17 @@ converter(checksum) ->
_ = is_binary(Hex) orelse throw({expected_type, string}), _ = is_binary(Hex) orelse throw({expected_type, string}),
_ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}), _ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}),
{sha256, binary:decode_hex(Hex)} {sha256, binary:decode_hex(Hex)}
end;
converter(unicode_string) ->
fun
(undefined, #{}) ->
undefined;
(Str, #{make_serializable := true}) ->
_ = is_list(Str) orelse throw({expected_type, string}),
unicode:characters_to_binary(Str);
(Str, #{}) ->
_ = is_binary(Str) orelse throw({expected_type, string}),
unicode:characters_to_list(Str)
end. end.
ref(Ref) -> ref(Ref) ->

View File

@ -130,10 +130,13 @@ s3_headers({ClientId, FileId}, Filemeta) ->
<<"x-amz-meta-clientid">> => ClientId, <<"x-amz-meta-clientid">> => ClientId,
%% It [Topic Name] MUST be a UTF-8 Encoded String %% It [Topic Name] MUST be a UTF-8 Encoded String
<<"x-amz-meta-fileid">> => FileId, <<"x-amz-meta-fileid">> => FileId,
<<"x-amz-meta-filemeta">> => emqx_json:encode(emqx_ft:encode_filemeta(Filemeta)), <<"x-amz-meta-filemeta">> => s3_header_filemeta(Filemeta),
<<"x-amz-meta-filemeta-vsn">> => ?FILEMETA_VSN <<"x-amz-meta-filemeta-vsn">> => ?FILEMETA_VSN
}. }.
s3_header_filemeta(Filemeta) ->
emqx_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]).
list(Client, Options) -> list(Client, Options) ->
case list_key_info(Client, Options) of case list_key_info(Client, Options) of
{ok, KeyInfos} -> {ok, KeyInfos} ->

View File

@ -169,39 +169,38 @@ t_invalid_filename(Config) ->
C = ?config(client, Config), C = ?config(client, Config),
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, mk_init_topic(<<"f1">>), emqx_json:encode(meta(".", <<>>)), 1) emqtt:publish(C, mk_init_topic(<<"f1">>), encode_meta(meta(".", <<>>)), 1)
), ),
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, mk_init_topic(<<"f2">>), emqx_json:encode(meta("..", <<>>)), 1) emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("..", <<>>)), 1)
), ),
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, mk_init_topic(<<"f2">>), emqx_json:encode(meta("../nice", <<>>)), 1) emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("../nice", <<>>)), 1)
), ),
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, mk_init_topic(<<"f3">>), emqx_json:encode(meta("/etc/passwd", <<>>)), 1) emqtt:publish(C, mk_init_topic(<<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1)
), ),
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(C, mk_init_topic(<<"f4">>), emqx_json:encode(meta("146%", <<>>)), 1) emqtt:publish(C, mk_init_topic(<<"f4">>), encode_meta(meta("146%", <<>>)), 1)
). ).
t_simple_transfer(Config) -> t_simple_transfer(Config) ->
C = ?config(client, Config), C = ?config(client, Config),
Filename = <<"topsecret.pdf">>, Filename = "topsecret.pdf",
FileId = <<"f1">>, FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = #{size := Filesize} = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta),
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
), ),
lists:foreach( lists:foreach(
@ -246,23 +245,21 @@ t_nasty_clientids_fileids(_Config) ->
t_meta_conflict(Config) -> t_meta_conflict(Config) ->
C = ?config(client, Config), C = ?config(client, Config),
Filename = <<"topsecret.pdf">>, Filename = "topsecret.pdf",
FileId = <<"f1">>, FileId = <<"f1">>,
Meta = meta(Filename, [<<"x">>]), Meta = meta(Filename, [<<"x">>]),
MetaPayload = emqx_json:encode(Meta),
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
), ),
ConflictMeta = Meta#{name => <<"conflict.pdf">>}, ConflictMeta = Meta#{name => "conflict.pdf"},
ConflictMetaPayload = emqx_json:encode(ConflictMeta),
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, mk_init_topic(FileId), ConflictMetaPayload, 1) emqtt:publish(C, mk_init_topic(FileId), encode_meta(ConflictMeta), 1)
). ).
t_no_meta(Config) -> t_no_meta(Config) ->
@ -284,17 +281,16 @@ t_no_meta(Config) ->
t_no_segment(Config) -> t_no_segment(Config) ->
C = ?config(client, Config), C = ?config(client, Config),
Filename = <<"topsecret.pdf">>, Filename = "topsecret.pdf",
FileId = <<"f1">>, FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = #{size := Filesize} = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta),
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)
), ),
lists:foreach( lists:foreach(
@ -335,13 +331,13 @@ t_invalid_meta(Config) ->
t_invalid_checksum(Config) -> t_invalid_checksum(Config) ->
C = ?config(client, Config), C = ?config(client, Config),
Filename = <<"topsecret.pdf">>, Filename = "topsecret.pdf",
FileId = <<"f1">>, FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = #{size := Filesize} = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}), MetaPayload = encode_meta(Meta#{checksum => {sha256, sha256(<<"invalid">>)}}),
?assertRCName( ?assertRCName(
success, success,
@ -366,7 +362,7 @@ t_invalid_checksum(Config) ->
t_corrupted_segment_retry(Config) -> t_corrupted_segment_retry(Config) ->
C = ?config(client, Config), C = ?config(client, Config),
Filename = <<"corruption.pdf">>, Filename = "corruption.pdf",
FileId = <<"4242-4242">>, FileId = <<"4242-4242">>,
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
@ -379,11 +375,11 @@ t_corrupted_segment_retry(Config) ->
Checksum1, Checksum1,
Checksum2, Checksum2,
Checksum3 Checksum3
] = [sha256hex(S) || S <- Data], ] = [binary:encode_hex(sha256(S)) || S <- Data],
Meta = #{size := Filesize} = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), emqx_json:encode(Meta), 1)), ?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)),
?assertRCName( ?assertRCName(
success, success,
@ -421,7 +417,7 @@ t_switch_node(Config) ->
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, AdditionalNodePort}]), {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, AdditionalNodePort}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
Filename = <<"multinode_upload.txt">>, Filename = "multinode_upload.txt",
FileId = <<"f1">>, FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
@ -430,11 +426,10 @@ t_switch_node(Config) ->
%% First, publist metadata and the first segment to the additional node %% First, publist metadata and the first segment to the additional node
Meta = #{size := Filesize} = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta),
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(C1, mk_init_topic(FileId), MetaPayload, 1) emqtt:publish(C1, mk_init_topic(FileId), encode_meta(Meta), 1)
), ),
?assertRCName( ?assertRCName(
success, success,
@ -593,7 +588,7 @@ disown_mqtt_client(Context = #{}) ->
send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) -> send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) ->
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(Client, mk_init_topic(FileId), emqx_json:encode(Meta), 1) emqtt:publish(Client, mk_init_topic(FileId), encode_meta(Meta), 1)
), ),
Context. Context.
@ -650,18 +645,21 @@ with_offsets(Items) ->
), ),
List. List.
sha256hex(Data) -> sha256(Data) ->
binary:encode_hex(crypto:hash(sha256, Data)). crypto:hash(sha256, Data).
meta(FileName, Data) -> meta(FileName, Data) ->
FullData = iolist_to_binary(Data), FullData = iolist_to_binary(Data),
#{ #{
name => FileName, name => FileName,
checksum => sha256hex(FullData), checksum => {sha256, sha256(FullData)},
expire_at => erlang:system_time(_Unit = second) + 3600, expire_at => erlang:system_time(_Unit = second) + 3600,
size => byte_size(FullData) size => byte_size(FullData)
}. }.
encode_meta(Meta) ->
emqx_json:encode(emqx_ft:encode_filemeta(Meta)).
list_files(ClientId) -> list_files(ClientId) ->
{ok, Files} = emqx_ft_storage:files(), {ok, Files} = emqx_ft_storage:files(),
[File || File = #{transfer := {CId, _}} <- Files, CId == ClientId]. [File || File = #{transfer := {CId, _}} <- Files, CId == ClientId].

View File

@ -78,11 +78,13 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
Meta = #{ Meta = #{
name => unicode:characters_to_binary(Name), name => Name,
expire_at => erlang:system_time(_Unit = second) + 3600, expire_at => erlang:system_time(_Unit = second) + 3600,
size => Size size => Size
}, },
MetaPayload = emqx_json:encode(Meta), MetaPayload = emqx_json:encode(emqx_ft:encode_filemeta(Meta)),
ct:pal("MetaPayload = ~ts", [MetaPayload]),
MetaTopic = <<"$file/", FileId/binary, "/init">>, MetaTopic = <<"$file/", FileId/binary, "/init">>,
{ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), {ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),

View File

@ -97,7 +97,7 @@ defmodule EMQXUmbrella.MixProject do
{:erlcloud, github: "emqx/erlcloud", tag: "3.6.8-emqx-1", override: true}, {:erlcloud, github: "emqx/erlcloud", tag: "3.6.8-emqx-1", override: true},
# erlcloud's rebar.config requires rebar3 and does not support Mix, # erlcloud's rebar.config requires rebar3 and does not support Mix,
# so it tries to fetch deps from git. We need to override this. # so it tries to fetch deps from git. We need to override this.
{:lhttpc, github: "https://github.com/erlcloud/lhttpc", tag: "1.6.2", override: true}, {:lhttpc, github: "erlcloud/lhttpc", tag: "1.6.2", override: true},
{:eini, "1.2.9", override: true}, {:eini, "1.2.9", override: true},
{:base16, "1.0.0", override: true} {:base16, "1.0.0", override: true}
# end of erlcloud's deps # end of erlcloud's deps