diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 6a6599366..baa0126d6 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -143,26 +143,37 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> %% TODO Move to emqx_ft_mqtt? on_file_command(PacketId, Msg, FileCommand) -> - case string:split(FileCommand, <<"/">>, all) of - [FileId, <<"init">>] -> - on_init(PacketId, Msg, transfer(Msg, FileId)); - [FileId, <<"fin">>, FinalSizeBin | MaybeChecksum] when length(MaybeChecksum) =< 1 -> + case emqx_topic:tokens(FileCommand) of + [FileIdIn | Rest] -> + validate([{fileid, FileIdIn}], fun([FileId]) -> + on_file_command(PacketId, FileId, Msg, Rest) + end); + [] -> + ?RC_UNSPECIFIED_ERROR + end. + +on_file_command(PacketId, FileId, Msg, FileCommand) -> + Transfer = transfer(Msg, FileId), + case FileCommand of + [<<"init">>] -> + on_init(PacketId, Msg, Transfer); + [<<"fin">>, FinalSizeBin | MaybeChecksum] when length(MaybeChecksum) =< 1 -> ChecksumBin = emqx_maybe:from_list(MaybeChecksum), validate( [{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}], fun([FinalSize, Checksum]) -> - on_fin(PacketId, Msg, transfer(Msg, FileId), FinalSize, Checksum) + on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) end ); - [FileId, <<"abort">>] -> - on_abort(Msg, transfer(Msg, FileId)); - [FileId, OffsetBin] -> + [<<"abort">>] -> + on_abort(Msg, Transfer); + [OffsetBin] -> validate([{offset, OffsetBin}], fun([Offset]) -> - on_segment(PacketId, Msg, transfer(Msg, FileId), Offset, undefined) + on_segment(PacketId, Msg, Transfer, Offset, undefined) end); - [FileId, OffsetBin, ChecksumBin] -> + [OffsetBin, ChecksumBin] -> validate([{offset, OffsetBin}, {checksum, ChecksumBin}], fun([Offset, Checksum]) -> - on_segment(PacketId, Msg, transfer(Msg, FileId), Offset, Checksum) + on_segment(PacketId, Msg, Transfer, Offset, Checksum) end); _ -> ?RC_UNSPECIFIED_ERROR @@ -358,6 +369,13 @@ validate(Validations, Fun) -> do_validate([], Parsed) -> {ok, lists:reverse(Parsed)}; +do_validate([{fileid, FileId} | Rest], Parsed) -> + case byte_size(FileId) of + S when S > 0 -> + do_validate(Rest, [FileId | Parsed]); + 0 -> + {error, {invalid_fileid, FileId}} + end; do_validate([{offset, Offset} | Rest], Parsed) -> case string:to_integer(Offset) of {Int, <<>>} -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index d3331a091..667ca091a 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -93,7 +93,6 @@ % Quota? Some lower level errors? ok | {error, conflict} | {error, file_error()}. store_filemeta(Storage, Transfer, Meta) -> - % TODO safeguard against bad clientids / fileids. Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), case read_file(Filepath, fun decode_filemeta/1) of {ok, Meta} -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index be3b19a8c..37cc38d4f 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -159,6 +159,13 @@ t_invalid_topic_format(Config) -> emqtt:publish(C, <<"$file">>, <<>>, 1) ). +t_invalid_fileid(Config) -> + C = ?config(client, Config), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file//init">>, <<>>, 1) + ). + t_simple_transfer(Config) -> C = ?config(client, Config),