diff --git a/apps/emqx/src/emqx_maybe.erl b/apps/emqx/src/emqx_maybe.erl new file mode 100644 index 000000000..8c60d7ae7 --- /dev/null +++ b/apps/emqx/src/emqx_maybe.erl @@ -0,0 +1,42 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_maybe). + +-include_lib("emqx/include/types.hrl"). + +-export([to_list/1]). +-export([from_list/1]). +-export([apply/2]). + +-spec to_list(maybe(A)) -> [A]. +to_list(undefined) -> + []; +to_list(Term) -> + [Term]. + +-spec from_list([A]) -> maybe(A). +from_list([]) -> + undefined; +from_list([Term]) -> + Term. + +-spec apply(fun((maybe(A)) -> maybe(A)), maybe(A)) -> + maybe(A). +apply(_Fun, undefined) -> + undefined; +apply(Fun, Term) when is_function(Fun) -> + erlang:apply(Fun, [Term]). diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index d55d5cad9..266d38fe8 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -40,6 +40,7 @@ -export_type([ clientid/0, transfer/0, + bytes/0, offset/0, filemeta/0, segment/0 @@ -145,10 +146,11 @@ on_file_command(PacketId, Msg, FileCommand) -> case string:split(FileCommand, <<"/">>, all) of [FileId, <<"init">>] -> on_init(PacketId, Msg, transfer(Msg, FileId)); - [FileId, <<"fin">>] -> - on_fin(PacketId, Msg, transfer(Msg, FileId), undefined); - [FileId, <<"fin">>, Checksum] -> - on_fin(PacketId, Msg, transfer(Msg, FileId), Checksum); + [FileId, <<"fin">>, FinalSizeBin | ChecksumL] -> + validate([{size, FinalSizeBin}], fun([FinalSize]) -> + Checksum = emqx_maybe:from_list(ChecksumL), + on_fin(PacketId, Msg, transfer(Msg, FileId), FinalSize, Checksum) + end); [FileId, <<"abort">>] -> on_abort(Msg, transfer(Msg, FileId)); [FileId, OffsetBin] -> @@ -235,12 +237,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> end end). -on_fin(PacketId, Msg, Transfer, Checksum) -> +on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> ?SLOG(info, #{ msg => "on_fin", mqtt_msg => Msg, packet_id => PacketId, transfer => Transfer, + final_size => FinalSize, checksum => Checksum }), %% TODO: handle checksum? Do we need it? @@ -249,7 +252,7 @@ on_fin(PacketId, Msg, Transfer, Checksum) -> ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) end, with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() -> - case assemble(Transfer) of + case assemble(Transfer, FinalSize) of %% Assembling completed, ack through the responder right away % ok -> % emqx_ft_responder:ack(FinPacketKey, ok); @@ -298,9 +301,9 @@ store_segment(Transfer, Segment) -> {error, {internal_error, E}} end. -assemble(Transfer) -> +assemble(Transfer, FinalSize) -> try - emqx_ft_storage:assemble(Transfer) + emqx_ft_storage:assemble(Transfer, FinalSize) catch C:E:S -> ?SLOG(error, #{ @@ -359,6 +362,13 @@ do_validate([{offset, Offset} | Rest], Parsed) -> _ -> {error, {invalid_offset, Offset}} end; +do_validate([{size, Size} | Rest], Parsed) -> + case string:to_integer(Size) of + {Int, <<>>} -> + do_validate(Rest, [Int | Parsed]); + _ -> + {error, {invalid_size, Size}} + end; do_validate([{checksum, Checksum} | Rest], Parsed) -> case parse_checksum(Checksum) of {ok, Bin} -> diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 8ba7e42d4..275d16499 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -16,7 +16,7 @@ -module(emqx_ft_assembler). --export([start_link/2]). +-export([start_link/3]). -behaviour(gen_statem). -export([callback_mode/0]). @@ -36,11 +36,11 @@ %% -start_link(Storage, Transfer) -> +start_link(Storage, Transfer, Size) -> %% TODO %% Additional callbacks? They won't survive restarts by the supervisor, which brings a %% question if we even need to retry with the help of supervisor. - gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer}, []). + gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []). %% @@ -49,11 +49,11 @@ start_link(Storage, Transfer) -> callback_mode() -> handle_event_function. -init({Storage, Transfer}) -> +init({Storage, Transfer, Size}) -> St = #st{ storage = Storage, transfer = Transfer, - assembly = emqx_ft_assembly:new(), + assembly = emqx_ft_assembly:new(Size), hash = crypto:hash_init(sha256) }, {ok, idle, St}. diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index 34783cbd3..bdefdac47 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -17,7 +17,7 @@ -module(emqx_ft_assembler_sup). -export([start_link/0]). --export([ensure_child/2]). +-export([ensure_child/3]). -behaviour(supervisor). -export([init/1]). @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -ensure_child(Storage, Transfer) -> +ensure_child(Storage, Transfer, Size) -> Childspec = #{ id => Transfer, - start => {emqx_ft_assembler, start_link, [Storage, Transfer]}, + start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]}, restart => temporary }, case supervisor:start_child(?MODULE, Childspec) of diff --git a/apps/emqx_ft/src/emqx_ft_assembly.erl b/apps/emqx_ft/src/emqx_ft_assembly.erl index 0f2729f42..2d78b540b 100644 --- a/apps/emqx_ft/src/emqx_ft_assembly.erl +++ b/apps/emqx_ft/src/emqx_ft_assembly.erl @@ -16,7 +16,7 @@ -module(emqx_ft_assembly). --export([new/0]). +-export([new/1]). -export([append/3]). -export([update/1]). @@ -35,12 +35,12 @@ size }). -new() -> +new(Size) -> #asm{ status = {incomplete, {missing, filemeta}}, meta = orddict:new(), segs = orddict:new(), - size = 0 + size = Size }. append(Asm, Node, Fragments) when is_list(Fragments) -> @@ -114,8 +114,7 @@ append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) -> % In theory it's possible to have two segments with same offset + size on % different nodes but with differing content. We'd need a checksum to % be able to disambiguate them though. - segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs), - size = max(End, Asm#asm.size) + segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs) }. coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor -> @@ -186,7 +185,7 @@ segsize(#{fragment := {segment, Info}}) -> incomplete_new_test() -> ?assertEqual( {incomplete, {missing, filemeta}}, - status(update(new())) + status(update(new(42))) ). incomplete_test() -> @@ -194,7 +193,7 @@ incomplete_test() -> {incomplete, {missing, filemeta}}, status( update( - append(new(), node(), [ + append(new(142), node(), [ segment(p1, 0, 42), segment(p1, 42, 100) ]) @@ -203,13 +202,13 @@ incomplete_test() -> ). consistent_test() -> - Asm1 = append(new(), n1, [filemeta(m1, "blarg")]), + Asm1 = append(new(42), n1, [filemeta(m1, "blarg")]), Asm2 = append(Asm1, n2, [segment(s2, 0, 42)]), Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]), ?assertMatch({complete, _}, status(meta, Asm3)). inconsistent_test() -> - Asm1 = append(new(), node(), [segment(s1, 0, 42)]), + Asm1 = append(new(42), node(), [segment(s1, 0, 42)]), Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]), Asm3 = append(Asm2, n2, [segment(s2, 0, 42), filemeta(m1, "blorg")]), Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]), @@ -231,7 +230,7 @@ simple_coverage_test() -> {Node, segment(n3, 50, 50)}, {Node, segment(n4, 10, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(100), Segs), ?assertMatch( {complete, [ @@ -256,7 +255,7 @@ redundant_coverage_test() -> {Node, segment(n7, 50, 10)}, {node1, segment(n8, 40, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(70), Segs), ?assertMatch( {complete, [ @@ -279,7 +278,7 @@ redundant_coverage_prefer_local_test() -> {Node, segment(n5, 30, 10)}, {Node, segment(n6, 20, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(40), Segs), ?assertMatch( {complete, [ @@ -301,7 +300,7 @@ missing_coverage_test() -> {node2, segment(n4, 50, 50)}, {Node, segment(n5, 40, 60)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(100), Segs), ?assertEqual( % {incomplete, {missing, {segment, 30, 40}}}, ??? {incomplete, {missing, {segment, 20, 40}}}, @@ -314,7 +313,7 @@ missing_end_coverage_test() -> {Node, segment(n1, 0, 15)}, {node1, segment(n3, 10, 10)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(20), Segs), ?assertEqual( {incomplete, {missing, {segment, 15, 20}}}, status(coverage, Asm) @@ -328,7 +327,7 @@ missing_coverage_with_redudancy_test() -> {node43, segment(n4, 10, 50)}, {node(), segment(n5, 40, 60)} ], - Asm = append_many(new(), Segs), + Asm = append_many(new(100), Segs), ?assertEqual( % {incomplete, {missing, {segment, 50, 60}}}, ??? {incomplete, {missing, {segment, 20, 40}}}, diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 6ca9e9ecb..7a95a0454 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -20,7 +20,7 @@ [ store_filemeta/2, store_segment/2, - assemble/1, + assemble/2, ready_transfers/0, get_ready_transfer/1, @@ -53,7 +53,7 @@ ok | {async, pid()} | {error, term()}. -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) -> ok | {async, pid()} | {error, term()}. --callback assemble(storage(), emqx_ft:transfer()) -> +-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. -callback ready_transfers(storage()) -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. @@ -76,11 +76,11 @@ store_segment(Transfer, Segment) -> Mod = mod(), Mod:store_segment(storage(), Transfer, Segment). --spec assemble(emqx_ft:transfer()) -> +-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. -assemble(Transfer) -> +assemble(Transfer, Size) -> Mod = mod(), - Mod:assemble(storage(), Transfer). + Mod:assemble(storage(), Transfer, Size). -spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. ready_transfers() -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 81fed0f21..103f0e48d 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -24,7 +24,7 @@ -export([store_segment/3]). -export([list/3]). -export([pread/5]). --export([assemble/2]). +-export([assemble/3]). -export([transfers/1]). @@ -167,12 +167,12 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> {error, Reason} end. --spec assemble(storage(), transfer()) -> +-spec assemble(storage(), transfer(), emqx_ft:bytes()) -> % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. {async, _Assembler :: pid()} | {error, _TODO}. -assemble(Storage, Transfer) -> +assemble(Storage, Transfer, Size) -> % TODO: ask cluster if the transfer is already assembled - {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer), + {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size), {async, Pid}. get_ready_transfer(_Storage, ReadyTransferId) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 8708f9808..8f00a1884 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -37,8 +37,14 @@ all() -> groups() -> [ - {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- [t_switch_node]}, - {cluster, [sequence], [t_switch_node]} + {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- group_cluster()}, + {cluster, [sequence], group_cluster()} + ]. + +group_cluster() -> + [ + t_switch_node, + t_unreliable_migrating_client ]. init_per_suite(Config) -> @@ -61,25 +67,61 @@ set_special_configs(Config) -> init_per_testcase(Case, Config) -> ClientId = atom_to_binary(Case), - {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), - {ok, _} = emqtt:connect(C), - [{client, C}, {clientid, ClientId} | Config]. + case ?config(group, Config) of + cluster -> + [{clientid, ClientId} | Config]; + _ -> + {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(C), + [{client, C}, {clientid, ClientId} | Config] + end. end_per_testcase(_Case, Config) -> - C = ?config(client, Config), - ok = emqtt:stop(C), + _ = [ok = emqtt:stop(C) || {client, C} <- Config], ok. -init_per_group(cluster, Config) -> - Node = emqx_ft_test_helpers:start_additional_node(Config, emqx_ft1), - [{additional_node, Node} | Config]; -init_per_group(_Group, Config) -> - Config. +init_per_group(Group = cluster, Config) -> + Cluster = mk_cluster_specs(Config), + ct:pal("Starting ~p", [Cluster]), + Nodes = [ + emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()}) + || {Name, Opts} <- Cluster + ], + [{group, Group}, {cluster_nodes, Nodes} | Config]; +init_per_group(Group, Config) -> + [{group, Group} | Config]. end_per_group(cluster, Config) -> - ok = emqx_ft_test_helpers:stop_additional_node(Config); + ok = lists:foreach( + fun emqx_ft_test_helpers:stop_additional_node/1, + ?config(cluster_nodes, Config) + ); end_per_group(_Group, _Config) -> ok. +mk_cluster_specs(Config) -> + Specs = [ + {core, emqx_ft_SUITE1, #{listener_ports => [{tcp, 2883}]}}, + {core, emqx_ft_SUITE2, #{listener_ports => [{tcp, 3883}]}} + ], + CommOpts = [ + {env, [{emqx, boot_modules, [broker, listeners]}]}, + {apps, [emqx_ft]}, + {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, + {env_handler, fun + (emqx_ft) -> + ok = emqx_config:put([file_transfer, storage], #{ + type => local, + root => emqx_ft_test_helpers:ft_root(Config, node()) + }); + (_) -> + ok + end} + ], + emqx_common_test_helpers:emqx_cluster( + Specs, + CommOpts + ). + %%-------------------------------------------------------------------- %% Tests %%-------------------------------------------------------------------- @@ -125,7 +167,7 @@ t_simple_transfer(Config) -> Data = [<<"first">>, <<"second">>, <<"third">>], - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -145,7 +187,7 @@ t_simple_transfer(Config) -> with_offsets(Data) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( success, emqtt:publish(C, FinTopic, <<>>, 1) @@ -194,7 +236,7 @@ t_no_meta(Config) -> emqtt:publish(C, SegmentTopic, Data, 1) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/42">>, ?assertRCName( unspecified_error, emqtt:publish(C, FinTopic, <<>>, 1) @@ -208,7 +250,7 @@ t_no_segment(Config) -> Data = [<<"first">>, <<"second">>, <<"third">>], - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -229,7 +271,7 @@ t_no_segment(Config) -> tl(with_offsets(Data)) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( unspecified_error, emqtt:publish(C, FinTopic, <<>>, 1) @@ -264,7 +306,7 @@ t_invalid_checksum(Config) -> Data = [<<"first">>, <<"second">>, <<"third">>], - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -284,14 +326,15 @@ t_invalid_checksum(Config) -> with_offsets(Data) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( unspecified_error, emqtt:publish(C, FinTopic, <<>>, 1) ). t_switch_node(Config) -> - AdditionalNodePort = emqx_ft_test_helpers:tcp_port(?config(additional_node, Config)), + [Node | _] = ?config(cluster_nodes, Config), + AdditionalNodePort = emqx_ft_test_helpers:tcp_port(Node), ClientId = <<"t_switch_node-migrating_client">>, @@ -306,7 +349,7 @@ t_switch_node(Config) -> %% First, publist metadata and the first segment to the additional node - Meta = meta(Filename, Data), + Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), MetaTopic = <<"$file/", FileId/binary, "/init">>, @@ -335,7 +378,7 @@ t_switch_node(Config) -> emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1) ), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( success, emqtt:publish(C2, FinTopic, <<>>, 1) @@ -360,13 +403,134 @@ t_assemble_crash(Config) -> C = ?config(client, Config), meck:new(emqx_ft_storage_fs), - meck:expect(emqx_ft_storage_fs, assemble, fun(_, _) -> meck:exception(error, oops) end), + meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1) ). +t_unreliable_migrating_client(Config) -> + NodeSelf = node(), + [Node1, Node2] = ?config(cluster_nodes, Config), + + ClientId = ?config(clientid, Config), + FileId = emqx_guid:to_hexstr(emqx_guid:gen()), + Filename = "migratory-birds-in-southern-hemisphere-2013.pdf", + Filesize = 1000, + Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16), + Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)), + Meta = meta(Filename, Payload), + + Context = #{ + clientid => ClientId, + fileid => FileId, + filesize => Filesize, + payload => Payload + }, + Commands = [ + {fun connect_mqtt_client/2, [NodeSelf]}, + {fun send_filemeta/2, [Meta]}, + {fun send_segment/3, [0, 100]}, + {fun send_segment/3, [100, 100]}, + {fun send_segment/3, [200, 100]}, + {fun stop_mqtt_client/1, []}, + {fun connect_mqtt_client/2, [Node1]}, + {fun connect_mqtt_client/2, [Node2]}, + {fun send_filemeta/2, [Meta]}, + {fun send_segment/3, [0, 200]}, + {fun send_segment/3, [200, 200]}, + {fun send_segment/3, [400, 100]}, + {fun connect_mqtt_client/2, [Node2]}, + {fun send_segment/3, [200, 200]}, + {fun send_segment/3, [400, 200]}, + {fun connect_mqtt_client/2, [Node1]}, + {fun send_segment/3, [400, 200]}, + {fun send_segment/3, [600, eof]}, + {fun send_finish/1, []}, + {fun connect_mqtt_client/2, [NodeSelf]}, + {fun send_finish/1, []} + ], + _Context = run_commands(Commands, Context), + + {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(), + ReadyTransferIds = + [Id || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers, CId == ClientId], + + Node1Bin = atom_to_binary(Node1), + NodeSelfBin = atom_to_binary(NodeSelf), + ?assertMatch( + [#{<<"node">> := Node1Bin}, #{<<"node">> := NodeSelfBin}], + lists:sort(ReadyTransferIds) + ), + + [ + begin + {ok, TableQH} = emqx_ft_storage:get_ready_transfer(Id), + ?assertEqual( + Payload, + iolist_to_binary(qlc:eval(TableQH)) + ) + end + || Id <- ReadyTransferIds + ]. + +run_commands(Commands, Context) -> + lists:foldl(fun run_command/2, Context, Commands). + +run_command({Command, Args}, Context) -> + ct:pal("COMMAND ~p ~p", [erlang:fun_info(Command, name), Args]), + erlang:apply(Command, Args ++ [Context]). + +connect_mqtt_client(Node, ContextIn) -> + Context = #{clientid := ClientId} = disown_mqtt_client(ContextIn), + NodePort = emqx_ft_test_helpers:tcp_port(Node), + {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, NodePort}]), + {ok, _} = emqtt:connect(Client), + Context#{client => Client}. + +stop_mqtt_client(Context = #{client := Client}) -> + _ = emqtt:stop(Client), + maps:remove(client, Context). + +disown_mqtt_client(Context = #{client := Client}) -> + _ = erlang:unlink(Client), + maps:remove(client, Context); +disown_mqtt_client(Context = #{}) -> + Context. + +send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) -> + Topic = <<"$file/", FileId/binary, "/init">>, + MetaPayload = emqx_json:encode(Meta), + ?assertRCName( + success, + emqtt:publish(Client, Topic, MetaPayload, 1) + ), + Context. + +send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}) -> + Topic = <<"$file/", FileId/binary, "/", (integer_to_binary(Offset))/binary>>, + Data = + case Size of + eof -> + binary:part(Payload, Offset, byte_size(Payload) - Offset); + N -> + binary:part(Payload, Offset, N) + end, + ?assertRCName( + success, + emqtt:publish(Client, Topic, Data, 1) + ), + Context. + +send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize}) -> + Topic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, + ?assertRCName( + success, + emqtt:publish(Client, Topic, <<>>, 1) + ), + Context. + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index d4b619e43..bb8590cc7 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -84,7 +84,7 @@ t_assemble_empty_transfer(Config) -> ]}, emqx_ft_storage_fs:list(Storage, Transfer, fragment) ), - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, 0), ?assertEqual({shutdown, ok}, Status), ?assertEqual( {ok, <<>>}, @@ -132,7 +132,7 @@ t_assemble_complete_local_transfer(Config) -> Fragments ), - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, TransferSize), ?assertEqual({shutdown, ok}, Status), AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename), @@ -171,20 +171,20 @@ t_assemble_incomplete_transfer(Config) -> expire_at => 42 }, ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta), - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, TransferSize), ?assertMatch({shutdown, {error, _}}, Status). t_assemble_no_meta(Config) -> Storage = storage(Config), Transfer = {?CLIENTID2, ?config(file_id, Config)}, - Status = complete_assemble(Storage, Transfer), + Status = complete_assemble(Storage, Transfer, 42), ?assertMatch({shutdown, {error, {incomplete, _}}}, Status). -complete_assemble(Storage, Transfer) -> - complete_assemble(Storage, Transfer, 1000). +complete_assemble(Storage, Transfer, Size) -> + complete_assemble(Storage, Transfer, Size, 1000). -complete_assemble(Storage, Transfer, Timeout) -> - {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer), +complete_assemble(Storage, Transfer, Size, Timeout) -> + {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), MRef = erlang:monitor(process, Pid), Pid ! kickoff, receive diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index cc3da8bc3..3bda8042c 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -67,7 +67,7 @@ init_per_group(_Group, Config) -> Config. end_per_group(cluster, Config) -> - ok = emqx_ft_test_helpers:stop_additional_node(Config); + ok = emqx_ft_test_helpers:stop_additional_node(?config(additional_node, Config)); end_per_group(_Group, _Config) -> ok. diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index f7d8bc879..956e63553 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -21,13 +21,12 @@ -include_lib("common_test/include/ct.hrl"). -start_additional_node(Config, Node) -> - SelfNode = node(), +start_additional_node(Config, Name) -> emqx_common_test_helpers:start_slave( - Node, + Name, [ {apps, [emqx_ft]}, - {join_to, SelfNode}, + {join_to, node()}, {configure_gen_rpc, true}, {env_handler, fun (emqx_ft) -> @@ -40,8 +39,7 @@ start_additional_node(Config, Node) -> ] ). -stop_additional_node(Config) -> - Node = ?config(additional_node, Config), +stop_additional_node(Node) -> ok = rpc:call(Node, ekka, leave, []), ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]), {ok, _} = emqx_common_test_helpers:stop_slave(Node), @@ -58,13 +56,14 @@ ft_root(Config, Node) -> upload_file(ClientId, FileId, Data, Node) -> Port = tcp_port(Node), + Size = byte_size(Data), {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), {ok, _} = emqtt:connect(C1), Meta = #{ name => FileId, expire_at => erlang:system_time(_Unit = second) + 3600, - size => byte_size(Data) + size => Size }, MetaPayload = emqx_json:encode(Meta), @@ -72,6 +71,6 @@ upload_file(ClientId, FileId, Data, Node) -> {ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), {ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1), - FinTopic = <<"$file/", FileId/binary, "/fin">>, + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>, {ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1), ok = emqtt:stop(C1).