From 58115715ddbc11bf1a40d749d477424f8f967ed8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 17 Feb 2023 17:27:38 +0300 Subject: [PATCH] fix(ft): require final size in `fin` packet Otherwise there are situations when it's not entirely clear if a transfer is really ready to be assembled. Since the `size` field in a filemeta is not required (and rightly so), we need client to tell us the final transfer size at the end of the process. Also synthesize a testcase to show why it's needed. Also worth noting that right now `fin` packets require final size, even if a client already told us the size through filemeta. The latter is regarded as serving informational purposes only (which means that, for example, it might differ from the final size, or some tranfer progress might show >100% somewhere because of that). --- apps/emqx/src/emqx_maybe.erl | 42 ++++ apps/emqx_ft/src/emqx_ft.erl | 26 ++- apps/emqx_ft/src/emqx_ft_assembler.erl | 10 +- apps/emqx_ft/src/emqx_ft_assembler_sup.erl | 6 +- apps/emqx_ft/src/emqx_ft_assembly.erl | 29 ++- apps/emqx_ft/src/emqx_ft_storage.erl | 10 +- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 8 +- apps/emqx_ft/test/emqx_ft_SUITE.erl | 212 ++++++++++++++++-- apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl | 16 +- .../emqx_ft/test/emqx_ft_storage_fs_SUITE.erl | 2 +- apps/emqx_ft/test/emqx_ft_test_helpers.erl | 15 +- 11 files changed, 295 insertions(+), 81 deletions(-) create mode 100644 apps/emqx/src/emqx_maybe.erl diff --git a/apps/emqx/src/emqx_maybe.erl b/apps/emqx/src/emqx_maybe.erl new file mode 100644 index 000000000..8c60d7ae7 --- /dev/null +++ b/apps/emqx/src/emqx_maybe.erl @@ -0,0 +1,42 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_maybe). + +-include_lib("emqx/include/types.hrl"). + +-export([to_list/1]). +-export([from_list/1]). +-export([apply/2]). + +-spec to_list(maybe(A)) -> [A]. +to_list(undefined) -> + []; +to_list(Term) -> + [Term]. + +-spec from_list([A]) -> maybe(A). +from_list([]) -> + undefined; +from_list([Term]) -> + Term. + +-spec apply(fun((maybe(A)) -> maybe(A)), maybe(A)) -> + maybe(A). +apply(_Fun, undefined) -> + undefined; +apply(Fun, Term) when is_function(Fun) -> + erlang:apply(Fun, [Term]). diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index d55d5cad9..266d38fe8 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -40,6 +40,7 @@ -export_type([ clientid/0, transfer/0, + bytes/0, offset/0, filemeta/0, segment/0 @@ -145,10 +146,11 @@ on_file_command(PacketId, Msg, FileCommand) -> case string:split(FileCommand, <<"/">>, all) of [FileId, <<"init">>] -> on_init(PacketId, Msg, transfer(Msg, FileId)); - [FileId, <<"fin">>] -> - on_fin(PacketId, Msg, transfer(Msg, FileId), undefined); - [FileId, <<"fin">>, Checksum] -> - on_fin(PacketId, Msg, transfer(Msg, FileId), Checksum); + [FileId, <<"fin">>, FinalSizeBin | ChecksumL] -> + validate([{size, FinalSizeBin}], fun([FinalSize]) -> + Checksum = emqx_maybe:from_list(ChecksumL), + on_fin(PacketId, Msg, transfer(Msg, FileId), FinalSize, Checksum) + end); [FileId, <<"abort">>] -> on_abort(Msg, transfer(Msg, FileId)); [FileId, OffsetBin] -> @@ -235,12 +237,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> end end). -on_fin(PacketId, Msg, Transfer, Checksum) -> +on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> ?SLOG(info, #{ msg => "on_fin", mqtt_msg => Msg, packet_id => PacketId, transfer => Transfer, + final_size => FinalSize, checksum => Checksum }), %% TODO: handle checksum? Do we need it? @@ -249,7 +252,7 @@ on_fin(PacketId, Msg, Transfer, Checksum) -> ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) end, with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() -> - case assemble(Transfer) of + case assemble(Transfer, FinalSize) of %% Assembling completed, ack through the responder right away % ok -> % emqx_ft_responder:ack(FinPacketKey, ok); @@ -298,9 +301,9 @@ store_segment(Transfer, Segment) -> {error, {internal_error, E}} end. -assemble(Transfer) -> +assemble(Transfer, FinalSize) -> try - emqx_ft_storage:assemble(Transfer) + emqx_ft_storage:assemble(Transfer, FinalSize) catch C:E:S -> ?SLOG(error, #{ @@ -359,6 +362,13 @@ do_validate([{offset, Offset} | Rest], Parsed) -> _ -> {error, {invalid_offset, Offset}} end; +do_validate([{size, Size} | Rest], Parsed) -> + case string:to_integer(Size) of + {Int, <<>>} -> + do_validate(Rest, [Int | Parsed]); + _ -> + {error, {invalid_size, Size}} + end; do_validate([{checksum, Checksum} | Rest], Parsed) -> case parse_checksum(Checksum) of {ok, Bin} -> diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 8ba7e42d4..275d16499 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -16,7 +16,7 @@ -module(emqx_ft_assembler). --export([start_link/2]). +-export([start_link/3]). -behaviour(gen_statem). -export([callback_mode/0]). @@ -36,11 +36,11 @@ %% -start_link(Storage, Transfer) -> +start_link(Storage, Transfer, Size) -> %% TODO %% Additional callbacks? They won't survive restarts by the supervisor, which brings a %% question if we even need to retry with the help of supervisor. - gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer}, []). + gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []). %% @@ -49,11 +49,11 @@ start_link(Storage, Transfer) -> callback_mode() -> handle_event_function. -init({Storage, Transfer}) -> +init({Storage, Transfer, Size}) -> St = #st{ storage = Storage, transfer = Transfer, - assembly = emqx_ft_assembly:new(), + assembly = emqx_ft_assembly:new(Size), hash = crypto:hash_init(sha256) }, {ok, idle, St}. diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index 34783cbd3..bdefdac47 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -17,7 +17,7 @@ -module(emqx_ft_assembler_sup). -export([start_link/0]). --export([ensure_child/2]). +-export([ensure_child/3]). -behaviour(supervisor). -export([init/1]). @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -ensure_child(Storage, Transfer) -> +ensure_child(Storage, Transfer, Size) -> Childspec = #{ id => Transfer, - start => {emqx_ft_assembler, start_link, [Storage, Transfer]}, + start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]}, restart => temporary }, case supervisor:start_child(?MODULE, Childspec) of diff --git a/apps/emqx_ft/src/emqx_ft_assembly.erl b/apps/emqx_ft/src/emqx_ft_assembly.erl index 0f2729f42..2d78b540b 100644 --- a/apps/emqx_ft/src/emqx_ft_assembly.erl +++ b/apps/emqx_ft/src/emqx_ft_assembly.erl @@ -16,7 +16,7 @@ -module(emqx_ft_assembly). --export([new/0]). +-export([new/1]). -export([append/3]). -export([update/1]). @@ -35,12 +35,12 @@ size }). -new() -> +new(Size) -> #asm{ status = {incomplete, {missing, filemeta}}, meta = orddict:new(), segs = orddict:new(), - size = 0 + size = Size }. append(Asm, Node, Fragments) when is_list(Fragments) -> @@ -114,8 +114,7 @@ append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) -> % In theory it's possible to have two segments with same offset + size on % different nodes but with differing content. We'd need a checksum to % be able to disambiguate them though. - segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs), - size = max(End, Asm#asm.size) + segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs) }. coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor -> @@ -186,7 +185,7 @@ segsize(#{fragment := {segment, Info}}) -> incomplete_new_test() -> ?assertEqual( {incomplete, {missing, filemeta}}, - status(update(new())) + status(update(new(42))) ). incomplete_test() -> @@ -194,7 +193,7 @@ incomplete_test() -> {incomplete, {missing, filemeta}}, status( update( - append(new(), node(), [ + append(new(142), node(), [ segment(p1, 0, 42), segment(p1, 42, 100) ]) @@ -203,13 +202,13 @@ incomplete_test() -> ). consistent_test() -> - Asm1 = append(new(), n1, [filemeta(m1, "blarg")]), + Asm1 = append(new(42), n1, [filemeta(m1, "blarg")]), Asm2 = append(Asm1, n2, [segment(s2, 0, 42)]), Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]), ?assertMatch({complete, _}, status(meta, Asm3)). inconsistent_test() -> - Asm1 = append(new(), node(), [segment(s1, 0, 42)]), + Asm1 = append(new(42), node(), [segment(s1, 0, 42)]), Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]), Asm3 = append(Asm2, n2, [segment(s2, 0, 42), filemeta(m1, "blorg")]), Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]), @@ -231,7 +230,7 @@ simple_coverage_test() -> {Node, segment(n3, 50, 50)}, {Node, segment(n4, 10, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(100), Segs), ?assertMatch( {complete, [ @@ -256,7 +255,7 @@ redundant_coverage_test() -> {Node, segment(n7, 50, 10)}, {node1, segment(n8, 40, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(70), Segs), ?assertMatch( {complete, [ @@ -279,7 +278,7 @@ redundant_coverage_prefer_local_test() -> {Node, segment(n5, 30, 10)}, {Node, segment(n6, 20, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(40), Segs), ?assertMatch( {complete, [ @@ -301,7 +300,7 @@ missing_coverage_test() -> {node2, segment(n4, 50, 50)}, {Node, segment(n5, 40, 60)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(100), Segs), ?assertEqual( % {incomplete, {missing, {segment, 30, 40}}}, ??? {incomplete, {missing, {segment, 20, 40}}}, @@ -314,7 +313,7 @@ missing_end_coverage_test() -> {Node, segment(n1, 0, 15)}, {node1, segment(n3, 10, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(20), Segs), ?assertEqual( {incomplete, {missing, {segment, 15, 20}}}, status(coverage, Asm) @@ -328,7 +327,7 @@ missing_coverage_with_redudancy_test() -> {node43, segment(n4, 10, 50)}, {node(), segment(n5, 40, 60)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(100), Segs), ?assertEqual( % {incomplete, {missing, {segment, 50, 60}}}, ??? {incomplete, {missing, {segment, 20, 40}}}, diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 6ca9e9ecb..7a95a0454 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -20,7 +20,7 @@ [ store_filemeta/2, store_segment/2, - assemble/1, + assemble/2, ready_transfers/0, get_ready_transfer/1, @@ -53,7 +53,7 @@ ok | {async, pid()} | {error, term()}. -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) -> ok | {async, pid()} | {error, term()}. --callback assemble(storage(), emqx_ft:transfer()) -> +-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. -callback ready_transfers(storage()) -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. @@ -76,11 +76,11 @@ store_segment(Transfer, Segment) -> Mod = mod(), Mod:store_segment(storage(), Transfer, Segment). --spec assemble(emqx_ft:transfer()) -> +-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. -assemble(Transfer) -> +assemble(Transfer, Size) -> Mod = mod(), - Mod:assemble(storage(), Transfer). + Mod:assemble(storage(), Transfer, Size). -spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. ready_transfers() -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 81fed0f21..103f0e48d 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -24,7 +24,7 @@ -export([store_segment/3]). -export([list/3]). -export([pread/5]). --export([assemble/2]). +-export([assemble/3]). -export([transfers/1]). @@ -167,12 +167,12 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> {error, Reason} end. --spec assemble(storage(), transfer()) -> +-spec assemble(storage(), transfer(), emqx_ft:bytes()) -> % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. {async, _Assembler :: pid()} | {error, _TODO}. -assemble(Storage, Transfer) -> +assemble(Storage, Transfer, Size) -> % TODO: ask cluster if the transfer is already assembled - {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer), + {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size), {async, Pid}. get_ready_transfer(_Storage, ReadyTransferId) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 8708f9808..8f00a1884 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -37,8 +37,14 @@ all() -> groups() -> [ - {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- [t_switch_node]}, - {cluster, [sequence], [t_switch_node]} + {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- group_cluster()}, + {cluster, [sequence], group_cluster()} + ]. + +group_cluster() -> + [ + t_switch_node, + t_unreliable_migrating_client ]. init_per_suite(Config) -> @@ -61,25 +67,61 @@ set_special_configs(Config) -> init_per_testcase(Case, Config) -> ClientId = atom_to_binary(Case), - {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), - {ok, _} = emqtt:connect(C), - [{client, C}, {clientid, ClientId} | Config]. + case ?config(group, Config) of + cluster -> + [{clientid, ClientId} | Config]; + _ -> + {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(C), + [{client, C}, {clientid, ClientId} | Config] + end. end_per_testcase(_Case, Config) -> - C = ?config(client, Config), - ok = emqtt:stop(C), + _ = [ok = emqtt:stop(C) || {client, C} <- Config], ok. -init_per_group(cluster, Config) -> - Node = emqx_ft_test_helpers:start_additional_node(Config, emqx_ft1), - [{additional_node, Node} | Config]; -init_per_group(_Group, Config) -> - Config. +init_per_group(Group = cluster, Config) -> + Cluster = mk_cluster_specs(Config), + ct:pal("Starting ~p", [Cluster]), + Nodes = [ + emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()}) + || {Name, Opts} <- Cluster + ], + [{group, Group}, {cluster_nodes, Nodes} | Config]; +init_per_group(Group, Config) -> + [{group, Group} | Config]. end_per_group(cluster, Config) -> - ok = emqx_ft_test_helpers:stop_additional_node(Config); + ok = lists:foreach( + fun emqx_ft_test_helpers:stop_additional_node/1, + ?config(cluster_nodes, Config) + ); end_per_group(_Group, _Config) -> ok. +mk_cluster_specs(Config) -> + Specs = [ + {core, emqx_ft_SUITE1, #{listener_ports => [{tcp, 2883}]}}, + {core, emqx_ft_SUITE2, #{listener_ports => [{tcp, 3883}]}} + ], + CommOpts = [ + {env, [{emqx, boot_modules, [broker, listeners]}]}, + {apps, [emqx_ft]}, + {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, + {env_handler, fun + (emqx_ft) -> + ok = emqx_config:put([file_transfer, storage], #{ + type => local, + root => emqx_ft_test_helpers:ft_root(Config, node()) + }); + (_) -> + ok + end} + ], + emqx_common_test_helpers:emqx_cluster( + Specs, + CommOpts + ). + %%-------------------------------------------------------------------- %% Tests %%-------------------------------------------------------------------- @@ -125,7 +167,7 @@ t_simple_transfer(Config) -> Data = [<<"first">>, <<"second">>, <<"third">>], - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -145,7 +187,7 @@ t_simple_transfer(Config) -> with_offsets(Data) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( success, emqtt:publish(C, FinTopic, <<>>, 1) @@ -194,7 +236,7 @@ t_no_meta(Config) -> emqtt:publish(C, SegmentTopic, Data, 1) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/42">>, ?assertRCName( unspecified_error, emqtt:publish(C, FinTopic, <<>>, 1) @@ -208,7 +250,7 @@ t_no_segment(Config) -> Data = [<<"first">>, <<"second">>, <<"third">>], - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -229,7 +271,7 @@ t_no_segment(Config) -> tl(with_offsets(Data)) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( unspecified_error, emqtt:publish(C, FinTopic, <<>>, 1) @@ -264,7 +306,7 @@ t_invalid_checksum(Config) -> Data = [<<"first">>, <<"second">>, <<"third">>], - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -284,14 +326,15 @@ t_invalid_checksum(Config) -> with_offsets(Data) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( unspecified_error, emqtt:publish(C, FinTopic, <<>>, 1) ). t_switch_node(Config) -> - AdditionalNodePort = emqx_ft_test_helpers:tcp_port(?config(additional_node, Config)), + [Node | _] = ?config(cluster_nodes, Config), + AdditionalNodePort = emqx_ft_test_helpers:tcp_port(Node), ClientId = <<"t_switch_node-migrating_client">>, @@ -306,7 +349,7 @@ t_switch_node(Config) -> %% First, publist metadata and the first segment to the additional node - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -335,7 +378,7 @@ t_switch_node(Config) -> emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( success, emqtt:publish(C2, FinTopic, <<>>, 1) @@ -360,13 +403,134 @@ t_assemble_crash(Config) -> C = ?config(client, Config), meck:new(emqx_ft_storage_fs), - meck:expect(emqx_ft_storage_fs, assemble, fun(_, _) -> meck:exception(error, oops) end), + meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1) ). +t_unreliable_migrating_client(Config) -> + NodeSelf = node(), + [Node1, Node2] = ?config(cluster_nodes, Config), + + ClientId = ?config(clientid, Config), + FileId = emqx_guid:to_hexstr(emqx_guid:gen()), + Filename = "migratory-birds-in-southern-hemisphere-2013.pdf", + Filesize = 1000, + Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16), + Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)), + Meta = meta(Filename, Payload), + + Context = #{ + clientid => ClientId, + fileid => FileId, + filesize => Filesize, + payload => Payload + }, + Commands = [ + {fun connect_mqtt_client/2, [NodeSelf]}, + {fun send_filemeta/2, [Meta]}, + {fun send_segment/3, [0, 100]}, + {fun send_segment/3, [100, 100]}, + {fun send_segment/3, [200, 100]}, + {fun stop_mqtt_client/1, []}, + {fun connect_mqtt_client/2, [Node1]}, + {fun connect_mqtt_client/2, [Node2]}, + {fun send_filemeta/2, [Meta]}, + {fun send_segment/3, [0, 200]}, + {fun send_segment/3, [200, 200]}, + {fun send_segment/3, [400, 100]}, + {fun connect_mqtt_client/2, [Node2]}, + {fun send_segment/3, [200, 200]}, + {fun send_segment/3, [400, 200]}, + {fun connect_mqtt_client/2, [Node1]}, + {fun send_segment/3, [400, 200]}, + {fun send_segment/3, [600, eof]}, + {fun send_finish/1, []}, + {fun connect_mqtt_client/2, [NodeSelf]}, + {fun send_finish/1, []} + ], + _Context = run_commands(Commands, Context), + + {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(), + ReadyTransferIds = + [Id || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers, CId == ClientId], + + Node1Bin = atom_to_binary(Node1), + NodeSelfBin = atom_to_binary(NodeSelf), + ?assertMatch( + [#{<<"node">> := Node1Bin}, #{<<"node">> := NodeSelfBin}], + lists:sort(ReadyTransferIds) + ), + + [ + begin + {ok, TableQH} = emqx_ft_storage:get_ready_transfer(Id), + ?assertEqual( + Payload, + iolist_to_binary(qlc:eval(TableQH)) + ) + end + || Id <- ReadyTransferIds + ]. + +run_commands(Commands, Context) -> + lists:foldl(fun run_command/2, Context, Commands). + +run_command({Command, Args}, Context) -> + ct:pal("COMMAND ~p ~p", [erlang:fun_info(Command, name), Args]), + erlang:apply(Command, Args ++ [Context]). + +connect_mqtt_client(Node, ContextIn) -> + Context = #{clientid := ClientId} = disown_mqtt_client(ContextIn), + NodePort = emqx_ft_test_helpers:tcp_port(Node), + {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, NodePort}]), + {ok, _} = emqtt:connect(Client), + Context#{client => Client}. + +stop_mqtt_client(Context = #{client := Client}) -> + _ = emqtt:stop(Client), + maps:remove(client, Context). + +disown_mqtt_client(Context = #{client := Client}) -> + _ = erlang:unlink(Client), + maps:remove(client, Context); +disown_mqtt_client(Context = #{}) -> + Context. + +send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) -> + Topic = <<"$file/", FileId/binary, "/init">>, + MetaPayload = emqx_json:encode(Meta), + ?assertRCName( + success, + emqtt:publish(Client, Topic, MetaPayload, 1) + ), + Context. + +send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}) -> + Topic = <<"$file/", FileId/binary, "/", (integer_to_binary(Offset))/binary>>, + Data = + case Size of + eof -> + binary:part(Payload, Offset, byte_size(Payload) - Offset); + N -> + binary:part(Payload, Offset, N) + end, + ?assertRCName( + success, + emqtt:publish(Client, Topic, Data, 1) + ), + Context. + +send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize}) -> + Topic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, + ?assertRCName( + success, + emqtt:publish(Client, Topic, <<>>, 1) + ), + Context. + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index d4b619e43..bb8590cc7 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -84,7 +84,7 @@ t_assemble_empty_transfer(Config) -> ]}, emqx_ft_storage_fs:list(Storage, Transfer, fragment) ), - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, 0), ?assertEqual({shutdown, ok}, Status), ?assertEqual( {ok, <<>>}, @@ -132,7 +132,7 @@ t_assemble_complete_local_transfer(Config) -> Fragments ), - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, TransferSize), ?assertEqual({shutdown, ok}, Status), AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename), @@ -171,20 +171,20 @@ t_assemble_incomplete_transfer(Config) -> expire_at => 42 }, ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta), - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, TransferSize), ?assertMatch({shutdown, {error, _}}, Status). t_assemble_no_meta(Config) -> Storage = storage(Config), Transfer = {?CLIENTID2, ?config(file_id, Config)}, - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, 42), ?assertMatch({shutdown, {error, {incomplete, _}}}, Status). -complete_assemble(Storage, Transfer) -> - complete_assemble(Storage, Transfer, 1000). +complete_assemble(Storage, Transfer, Size) -> + complete_assemble(Storage, Transfer, Size, 1000). -complete_assemble(Storage, Transfer, Timeout) -> - {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer), +complete_assemble(Storage, Transfer, Size, Timeout) -> + {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), MRef = erlang:monitor(process, Pid), Pid ! kickoff, receive diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index cc3da8bc3..3bda8042c 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -67,7 +67,7 @@ init_per_group(_Group, Config) -> Config. end_per_group(cluster, Config) -> - ok = emqx_ft_test_helpers:stop_additional_node(Config); + ok = emqx_ft_test_helpers:stop_additional_node(?config(additional_node, Config)); end_per_group(_Group, _Config) -> ok. diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index f7d8bc879..956e63553 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -21,13 +21,12 @@ -include_lib("common_test/include/ct.hrl"). -start_additional_node(Config, Node) -> - SelfNode = node(), +start_additional_node(Config, Name) -> emqx_common_test_helpers:start_slave( - Node, + Name, [ {apps, [emqx_ft]}, - {join_to, SelfNode}, + {join_to, node()}, {configure_gen_rpc, true}, {env_handler, fun (emqx_ft) -> @@ -40,8 +39,7 @@ start_additional_node(Config, Node) -> ] ). -stop_additional_node(Config) -> - Node = ?config(additional_node, Config), +stop_additional_node(Node) -> ok = rpc:call(Node, ekka, leave, []), ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]), {ok, _} = emqx_common_test_helpers:stop_slave(Node), @@ -58,13 +56,14 @@ ft_root(Config, Node) -> upload_file(ClientId, FileId, Data, Node) -> Port = tcp_port(Node), + Size = byte_size(Data), {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), {ok, _} = emqtt:connect(C1), Meta = #{ name => FileId, expire_at => erlang:system_time(_Unit = second) + 3600, - size => byte_size(Data) + size => Size }, MetaPayload = emqx_json:encode(Meta), @@ -72,6 +71,6 @@ upload_file(ClientId, FileId, Data, Node) -> {ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), {ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>, {ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1), ok = emqtt:stop(C1).