diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 077ebe138..ecdf8f827 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -63,6 +63,7 @@ ]). -export([ + maybe_fix_gen_rpc/0, emqx_cluster/1, emqx_cluster/2, start_epmd/0, @@ -616,6 +617,15 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) -> listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}] }. +-spec maybe_fix_gen_rpc() -> ok. +maybe_fix_gen_rpc() -> + %% When many tests run in an obscure order, it may occur that + %% `gen_rpc` started with its default settings before `emqx_conf`. + %% `gen_rpc` and `emqx_conf` have different default `port_discovery` modes, + %% so we reinitialize `gen_rpc` explicitly. + ok = application:stop(gen_rpc), + ok = application:start(gen_rpc). + -spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}]. emqx_cluster(Specs) -> emqx_cluster(Specs, #{}). diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index c4f1caed5..ed4edd33b 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -31,6 +31,10 @@ on_message_puback/4 ]). +-export([ + decode_filemeta/1 +]). + -export([on_assemble_timeout/1]). -export_type([ @@ -86,6 +90,27 @@ unhook() -> ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}). +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +decode_filemeta(Payload) when is_binary(Payload) -> + case emqx_json:safe_decode(Payload, [return_maps]) of + {ok, Map} -> + decode_filemeta(Map); + {error, Error} -> + {error, {invalid_filemeta_json, Error}} + end; +decode_filemeta(Map) when is_map(Map) -> + Schema = emqx_ft_schema:schema(filemeta), + try + Meta = hocon_tconf:check_plain(Schema, Map, #{atom_key => true, required => false}), + {ok, Meta} + catch + throw:Error -> + {error, {invalid_filemeta, Error}} + end. + %%-------------------------------------------------------------------- %% Hooks %%-------------------------------------------------------------------- @@ -113,6 +138,8 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> %% Handlers for transfer messages %%-------------------------------------------------------------------- +%% TODO Move to emqx_ft_mqtt? + on_file_command(PacketId, Msg, FileCommand) -> case string:split(FileCommand, <<"/">>, all) of [FileId, <<"init">>] -> @@ -123,10 +150,14 @@ on_file_command(PacketId, Msg, FileCommand) -> on_fin(PacketId, Msg, FileId, Checksum); [FileId, <<"abort">>] -> on_abort(Msg, FileId); - [FileId, Offset] -> - on_segment(Msg, FileId, Offset, undefined); - [FileId, Offset, Checksum] -> - on_segment(Msg, FileId, Offset, Checksum); + [FileId, OffsetBin] -> + validate([{offset, OffsetBin}], fun([Offset]) -> + on_segment(Msg, FileId, Offset, undefined) + end); + [FileId, OffsetBin, ChecksumBin] -> + validate([{offset, OffsetBin}, {checksum, ChecksumBin}], fun([Offset, Checksum]) -> + on_segment(Msg, FileId, Offset, Checksum) + end); _ -> ?RC_UNSPECIFIED_ERROR end. @@ -139,11 +170,21 @@ on_init(Msg, FileId) -> }), Payload = Msg#message.payload, % %% Add validations here - Meta = emqx_json:decode(Payload, [return_maps]), - case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of - ok -> - ?RC_SUCCESS; - {error, _Reason} -> + case decode_filemeta(Payload) of + {ok, Meta} -> + case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of + ok -> + ?RC_SUCCESS; + {error, _Reason} -> + ?RC_UNSPECIFIED_ERROR + end; + {error, Reason} -> + ?SLOG(error, #{ + msg => "on_init: invalid filemeta", + mqtt_msg => Msg, + file_id => FileId, + reason => Reason + }), ?RC_UNSPECIFIED_ERROR end. @@ -161,7 +202,7 @@ on_segment(Msg, FileId, Offset, Checksum) -> }), %% TODO: handle checksum Payload = Msg#message.payload, - Segment = {binary_to_integer(Offset), Payload}, + Segment = {Offset, Payload}, %% Add offset/checksum validations case emqx_ft_storage:store_segment(transfer(Msg, FileId), Segment) of ok -> @@ -247,3 +288,42 @@ transfer(Msg, FileId) -> on_assemble_timeout({ChanPid, PacketId}) -> ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}), erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}). + +validate(Validations, Fun) -> + case do_validate(Validations, []) of + {ok, Parsed} -> + Fun(Parsed); + {error, Reason} -> + ?SLOG(error, #{ + msg => "validate: invalid $file command", + reason => Reason + }), + ?RC_UNSPECIFIED_ERROR + end. + +do_validate([], Parsed) -> + {ok, lists:reverse(Parsed)}; +do_validate([{offset, Offset} | Rest], Parsed) -> + case string:to_integer(Offset) of + {Int, <<>>} -> + do_validate(Rest, [Int | Parsed]); + _ -> + {error, {invalid_offset, Offset}} + end; +do_validate([{checksum, Checksum} | Rest], Parsed) -> + case parse_checksum(Checksum) of + {ok, Bin} -> + do_validate(Rest, [Bin | Parsed]); + {error, _Reason} -> + {error, {invalid_checksum, Checksum}} + end. + +parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 -> + try + {ok, binary:decode_hex(Checksum)} + catch + error:badarg -> + {error, invalid_checksum} + end; +parse_checksum(_Checksum) -> + {error, invalid_checksum}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index a6559d470..01a3e19c2 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -387,10 +387,14 @@ encode_filemeta(Meta) -> Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}), emqx_json:encode(?PRELUDE(_Vsn = 1, Term)). -decode_filemeta(Binary) -> - Schema = emqx_ft_schema:schema(filemeta), - ?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]), - hocon_tconf:check_plain(Schema, Term, #{atom_key => true, required => false}). +decode_filemeta(Binary) when is_binary(Binary) -> + ?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]), + case emqx_ft:decode_filemeta(Map) of + {ok, Meta} -> + Meta; + {error, Reason} -> + error(Reason) + end. mk_segment_filename({Offset, Content}) -> lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index 992d62c48..f69033b4b 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl index db5e35f94..f8fe02d36 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl new file mode 100644 index 000000000..ff7cba403 --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -0,0 +1,284 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(assertRCName(RCName, PublishRes), + ?assertMatch( + {ok, #{reason_code_name := RCName}}, + PublishRes + ) +). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun set_special_configs/1), + ok = emqx_common_test_helpers:maybe_fix_gen_rpc(), + Config. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), + ok. + +set_special_configs(emqx_ft) -> + {ok, _} = emqx:update_config([file_transfer, storage], #{<<"type">> => <<"local">>}), + ok; +set_special_configs(_App) -> + ok. + +init_per_testcase(_Case, Config) -> + _ = file:del_dir_r(filename:join(emqx:data_dir(), "file_transfer")), + ClientId = <<"client">>, + {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(C), + [{client, C}, {clientid, ClientId} | Config]. + +end_per_testcase(_Case, Config) -> + C = ?config(client, Config), + ok = emqtt:stop(C), + ok. + +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + +t_invalid_topic_format(Config) -> + C = ?config(client, Config), + + %% TODO: more invalid topics + + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/XYZ">>, <<>>, 1) + ), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/X/Y/Z">>, <<>>, 1) + ). + +t_simple_transfer(Config) -> + C = ?config(client, Config), + + Filename = <<"topsecret.pdf">>, + FileId = <<"f1">>, + + Data = [<<"first">>, <<"second">>, <<"third">>], + + Meta = meta(Filename, Data), + MetaPayload = emqx_json:encode(Meta), + + MetaTopic = <<"$file/", FileId/binary, "/init">>, + ?assertRCName( + success, + emqtt:publish(C, MetaTopic, MetaPayload, 1) + ), + + lists:foreach( + fun({Chunk, Offset}) -> + OffsetBin = integer_to_binary(Offset), + SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>, + ?assertRCName( + success, + emqtt:publish(C, SegmentTopic, Chunk, 1) + ) + end, + with_offsets(Data) + ), + + FinTopic = <<"$file/", FileId/binary, "/fin">>, + ?assertRCName( + success, + emqtt:publish(C, FinTopic, <<>>, 1) + ), + + ReadyTransferId = #{ + <<"fileid">> => FileId, + <<"clientid">> => ?config(clientid, Config), + <<"node">> => atom_to_binary(node(), utf8) + }, + + {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId), + + ?assertEqual( + iolist_to_binary(Data), + iolist_to_binary(qlc:eval(TableQH)) + ). + +t_meta_conflict(Config) -> + C = ?config(client, Config), + + Filename = <<"topsecret.pdf">>, + FileId = <<"f1">>, + + Meta = meta(Filename, [<<"x">>]), + MetaPayload = emqx_json:encode(Meta), + + MetaTopic = <<"$file/", FileId/binary, "/init">>, + ?assertRCName( + success, + emqtt:publish(C, MetaTopic, MetaPayload, 1) + ), + + ConflictMeta = Meta#{name => <<"conflict.pdf">>}, + ConflictMetaPayload = emqx_json:encode(ConflictMeta), + + ?assertRCName( + unspecified_error, + emqtt:publish(C, MetaTopic, ConflictMetaPayload, 1) + ). + +t_no_meta(Config) -> + C = ?config(client, Config), + + FileId = <<"f1">>, + Data = <<"first">>, + + SegmentTopic = <<"$file/", FileId/binary, "/0">>, + ?assertRCName( + success, + emqtt:publish(C, SegmentTopic, Data, 1) + ), + + FinTopic = <<"$file/", FileId/binary, "/fin">>, + ?assertRCName( + unspecified_error, + emqtt:publish(C, FinTopic, <<>>, 1) + ). + +t_no_segment(Config) -> + C = ?config(client, Config), + + Filename = <<"topsecret.pdf">>, + FileId = <<"f1">>, + + Data = [<<"first">>, <<"second">>, <<"third">>], + + Meta = meta(Filename, Data), + MetaPayload = emqx_json:encode(Meta), + + MetaTopic = <<"$file/", FileId/binary, "/init">>, + ?assertRCName( + success, + emqtt:publish(C, MetaTopic, MetaPayload, 1) + ), + + lists:foreach( + fun({Chunk, Offset}) -> + OffsetBin = integer_to_binary(Offset), + SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>, + ?assertRCName( + success, + emqtt:publish(C, SegmentTopic, Chunk, 1) + ) + end, + %% Skip the first segment + tl(with_offsets(Data)) + ), + + FinTopic = <<"$file/", FileId/binary, "/fin">>, + ?assertRCName( + unspecified_error, + emqtt:publish(C, FinTopic, <<>>, 1) + ). + +t_invalid_meta(Config) -> + C = ?config(client, Config), + + FileId = <<"f1">>, + + MetaTopic = <<"$file/", FileId/binary, "/init">>, + + %% Invalid schema + Meta = #{foo => <<"bar">>}, + MetaPayload = emqx_json:encode(Meta), + ?assertRCName( + unspecified_error, + emqtt:publish(C, MetaTopic, MetaPayload, 1) + ), + + %% Invalid JSON + ?assertRCName( + unspecified_error, + emqtt:publish(C, MetaTopic, <<"{oops;">>, 1) + ). + +t_invalid_checksum(Config) -> + C = ?config(client, Config), + + Filename = <<"topsecret.pdf">>, + FileId = <<"f1">>, + + Data = [<<"first">>, <<"second">>, <<"third">>], + + Meta = meta(Filename, Data), + MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}), + + MetaTopic = <<"$file/", FileId/binary, "/init">>, + ?assertRCName( + success, + emqtt:publish(C, MetaTopic, MetaPayload, 1) + ), + + lists:foreach( + fun({Chunk, Offset}) -> + OffsetBin = integer_to_binary(Offset), + SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>, + ?assertRCName( + success, + emqtt:publish(C, SegmentTopic, Chunk, 1) + ) + end, + with_offsets(Data) + ), + + FinTopic = <<"$file/", FileId/binary, "/fin">>, + ?assertRCName( + unspecified_error, + emqtt:publish(C, FinTopic, <<>>, 1) + ). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +with_offsets(Items) -> + {List, _} = lists:mapfoldl( + fun(Item, Offset) -> + {{Item, Offset}, Offset + byte_size(Item)} + end, + 0, + Items + ), + List. + +sha256hex(Data) -> + binary:encode_hex(crypto:hash(sha256, Data)). + +meta(FileName, Data) -> + FullData = iolist_to_binary(Data), + #{ + name => FileName, + checksum => sha256hex(FullData), + expire_at => erlang:system_time(_Unit = second) + 3600, + size => byte_size(FullData) + }. diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index dd3ffedad..c5dbd418f 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_ft/test/emqx_ft_content_gen.erl b/apps/emqx_ft/test/emqx_ft_content_gen.erl index feca78949..bd24d8c94 100644 --- a/apps/emqx_ft/test/emqx_ft_content_gen.erl +++ b/apps/emqx_ft/test/emqx_ft_content_gen.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl index c08986120..9098edcf6 100644 --- a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl index e979d06fc..0ac5d2844 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.