From f6a0598f270e3dd907ff3f82efd6e2a9c839d536 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 15 Feb 2023 11:42:27 +0200 Subject: [PATCH] feat(ft): add file transfer tests --- apps/emqx/test/emqx_common_test_helpers.erl | 7 +- apps/emqx_ft/src/emqx_ft.erl | 16 +- apps/emqx_ft/src/emqx_ft_conf.erl | 4 +- apps/emqx_ft/src/emqx_ft_storage_dummy.erl | 43 ----- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 5 +- .../src/proto/emqx_ft_storage_fs_proto_v1.erl | 6 - apps/emqx_ft/test/emqx_ft_SUITE.erl | 167 +++++++++++++++--- apps/emqx_ft/test/emqx_ft_conf_SUITE.erl | 65 +++++++ .../emqx_ft/test/emqx_ft_storage_fs_SUITE.erl | 151 ++++++++++++++++ apps/emqx_ft/test/emqx_ft_test_helpers.erl | 77 ++++++++ 10 files changed, 456 insertions(+), 85 deletions(-) delete mode 100644 apps/emqx_ft/src/emqx_ft_storage_dummy.erl create mode 100644 apps/emqx_ft/test/emqx_ft_conf_SUITE.erl create mode 100644 apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl create mode 100644 apps/emqx_ft/test/emqx_ft_test_helpers.erl diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index ecdf8f827..18a3d9f3e 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -63,7 +63,7 @@ ]). -export([ - maybe_fix_gen_rpc/0, + set_gen_rpc_stateless/0, emqx_cluster/1, emqx_cluster/2, start_epmd/0, @@ -617,13 +617,14 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) -> listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}] }. --spec maybe_fix_gen_rpc() -> ok. -maybe_fix_gen_rpc() -> +-spec set_gen_rpc_stateless() -> ok. +set_gen_rpc_stateless() -> %% When many tests run in an obscure order, it may occur that %% `gen_rpc` started with its default settings before `emqx_conf`. %% `gen_rpc` and `emqx_conf` have different default `port_discovery` modes, %% so we reinitialize `gen_rpc` explicitly. ok = application:stop(gen_rpc), + ok = application:set_env(gen_rpc, port_discovery, stateless), ok = application:start(gen_rpc). -spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}]. diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index ed4edd33b..2704e1e55 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -175,7 +175,13 @@ on_init(Msg, FileId) -> case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of ok -> ?RC_SUCCESS; - {error, _Reason} -> + {error, Reason} -> + ?SLOG(warning, #{ + msg => "store_filemeta_failed", + mqtt_msg => Msg, + file_id => FileId, + reason => Reason + }), ?RC_UNSPECIFIED_ERROR end; {error, Reason} -> @@ -235,7 +241,13 @@ on_fin(PacketId, Msg, FileId, Checksum) -> {ok, _} -> undefined; %% Assembling failed, unregister the packet key - {error, _} -> + {error, Reason} -> + ?SLOG(warning, #{ + msg => "assemble_not_started", + mqtt_msg => Msg, + file_id => FileId, + reason => Reason + }), case emqx_ft_responder:unregister(FinPacketKey) of %% We successfully unregistered the packet key, %% so we can send the error code at once diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index b88fd2532..d56dd8d32 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -50,8 +50,8 @@ unload() -> -spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) -> {ok, emqx_config:update_request()} | {error, term()}. -pre_config_update(_, _Req, Config) -> - {ok, Config}. +pre_config_update(_, Req, _Config) -> + {ok, Req}. -spec post_config_update( list(atom()), diff --git a/apps/emqx_ft/src/emqx_ft_storage_dummy.erl b/apps/emqx_ft/src/emqx_ft_storage_dummy.erl deleted file mode 100644 index 4ed8ba487..000000000 --- a/apps/emqx_ft/src/emqx_ft_storage_dummy.erl +++ /dev/null @@ -1,43 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ft_storage_dummy). - --behaviour(emqx_ft_storage). - --export([ - store_filemeta/3, - store_segment/3, - assemble/3, - ready_transfers/1, - get_ready_transfer/2 -]). - -store_filemeta(_Storage, _Transfer, _Meta) -> - ok. - -store_segment(_Storage, _Transfer, _Segment) -> - ok. - -assemble(_Storage, _Transfer, Callback) -> - Pid = spawn(fun() -> Callback({error, not_implemented}) end), - {ok, Pid}. - -ready_transfers(_Storage) -> - {ok, []}. - -get_ready_transfer(_Storage, _Id) -> - {error, not_implemented}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 01a3e19c2..c6cb09cf2 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -93,7 +93,10 @@ store_filemeta(Storage, Transfer, Meta) -> {ok, Meta} -> _ = touch_file(Filepath), ok; - {ok, _Conflict} -> + {ok, Conflict} -> + ?SLOG(warning, #{ + msg => "filemeta_conflict", transfer => Transfer, new => Meta, old => Conflict + }), % TODO % We won't see conflicts in case of concurrent `store_filemeta` % requests. It's rather odd scenario so it's fine not to worry diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index f69033b4b..e2c4c93d7 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -20,7 +20,6 @@ -export([introduced_in/0]). --export([list/3]). -export([multilist/3]). -export([pread/5]). -export([ready_transfers/1]). @@ -35,11 +34,6 @@ introduced_in() -> "5.0.17". --spec list(node(), transfer(), fragment | result) -> - {ok, [filefrag()]} | {error, term()} | no_return(). -list(Node, Transfer, What) -> - erpc:call(Node, emqx_ft_storage_fs_proxy, list_local, [Transfer, What]). - -spec multilist([node()], transfer(), fragment | result) -> emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}). multilist(Nodes, Transfer, What) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index ff7cba403..6a125449f 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -29,35 +29,58 @@ ) ). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, single_node}, + {group, cluster} + ]. + +groups() -> + [ + {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- [t_switch_node]}, + {cluster, [sequence], [t_switch_node]} + ]. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun set_special_configs/1), - ok = emqx_common_test_helpers:maybe_fix_gen_rpc(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config)), + ok = emqx_common_test_helpers:set_gen_rpc_stateless(), Config. end_per_suite(_Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), ok. -set_special_configs(emqx_ft) -> - {ok, _} = emqx:update_config([file_transfer, storage], #{<<"type">> => <<"local">>}), - ok; -set_special_configs(_App) -> - ok. +set_special_configs(Config) -> + fun + (emqx_ft) -> + ok = emqx_config:put([file_transfer, storage], #{ + type => local, root => emqx_ft_test_helpers:ft_root(Config, node()) + }); + (_) -> + ok + end. -init_per_testcase(_Case, Config) -> - _ = file:del_dir_r(filename:join(emqx:data_dir(), "file_transfer")), - ClientId = <<"client">>, +init_per_testcase(Case, Config) -> + ClientId = atom_to_binary(Case), {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), {ok, _} = emqtt:connect(C), [{client, C}, {clientid, ClientId} | Config]. - end_per_testcase(_Case, Config) -> C = ?config(client, Config), ok = emqtt:stop(C), ok. +init_per_group(cluster, Config) -> + Node = emqx_ft_test_helpers:start_additional_node(Config, test2), + [{additional_node, Node} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(cluster, Config) -> + ok = emqx_ft_test_helpers:stop_additional_node(Config); +end_per_group(_Group, _Config) -> + ok. + %%-------------------------------------------------------------------- %% Tests %%-------------------------------------------------------------------- @@ -65,15 +88,34 @@ end_per_testcase(_Case, Config) -> t_invalid_topic_format(Config) -> C = ?config(client, Config), - %% TODO: more invalid topics - ?assertRCName( unspecified_error, - emqtt:publish(C, <<"$file/XYZ">>, <<>>, 1) + emqtt:publish(C, <<"$file/fileid">>, <<>>, 1) + ), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/fileid/">>, <<>>, 1) + ), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/fileid/offset">>, <<>>, 1) + ), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1) + ), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/">>, <<>>, 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/X/Y/Z">>, <<>>, 1) + ), + %% should not be handled by `emqx_ft` + ?assertRCName( + no_matching_subscribers, + emqtt:publish(C, <<"$file">>, <<>>, 1) ). t_simple_transfer(Config) -> @@ -95,8 +137,7 @@ t_simple_transfer(Config) -> lists:foreach( fun({Chunk, Offset}) -> - OffsetBin = integer_to_binary(Offset), - SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>, + SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>, ?assertRCName( success, emqtt:publish(C, SegmentTopic, Chunk, 1) @@ -111,12 +152,7 @@ t_simple_transfer(Config) -> emqtt:publish(C, FinTopic, <<>>, 1) ), - ReadyTransferId = #{ - <<"fileid">> => FileId, - <<"clientid">> => ?config(clientid, Config), - <<"node">> => atom_to_binary(node(), utf8) - }, - + {ok, [{ReadyTransferId, _}]} = emqx_ft_storage:ready_transfers(), {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId), ?assertEqual( @@ -184,8 +220,7 @@ t_no_segment(Config) -> lists:foreach( fun({Chunk, Offset}) -> - OffsetBin = integer_to_binary(Offset), - SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>, + SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>, ?assertRCName( success, emqtt:publish(C, SegmentTopic, Chunk, 1) @@ -241,8 +276,7 @@ t_invalid_checksum(Config) -> lists:foreach( fun({Chunk, Offset}) -> - OffsetBin = integer_to_binary(Offset), - SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>, + SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>, ?assertRCName( success, emqtt:publish(C, SegmentTopic, Chunk, 1) @@ -257,6 +291,83 @@ t_invalid_checksum(Config) -> emqtt:publish(C, FinTopic, <<>>, 1) ). +t_switch_node(Config) -> + AdditionalNodePort = emqx_ft_test_helpers:tcp_port(?config(additional_node, Config)), + + ClientId = <<"t_switch_node-migrating_client">>, + + {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, AdditionalNodePort}]), + {ok, _} = emqtt:connect(C1), + + Filename = <<"multinode_upload.txt">>, + FileId = <<"f1">>, + + Data = [<<"first">>, <<"second">>, <<"third">>], + [{Data0, Offset0}, {Data1, Offset1}, {Data2, Offset2}] = with_offsets(Data), + + %% First, publist metadata and the first segment to the additional node + + Meta = meta(Filename, Data), + MetaPayload = emqx_json:encode(Meta), + + MetaTopic = <<"$file/", FileId/binary, "/init">>, + ?assertRCName( + success, + emqtt:publish(C1, MetaTopic, MetaPayload, 1) + ), + ?assertRCName( + success, + emqtt:publish(C1, <<"$file/", FileId/binary, "/", Offset0/binary>>, Data0, 1) + ), + + %% Then, switch the client to the main node + %% and publish the rest of the segments + + ok = emqtt:stop(C1), + {ok, C2} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(C2), + + ?assertRCName( + success, + emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset1/binary>>, Data1, 1) + ), + ?assertRCName( + success, + emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1) + ), + + FinTopic = <<"$file/", FileId/binary, "/fin">>, + ?assertRCName( + success, + emqtt:publish(C2, FinTopic, <<>>, 1) + ), + + ok = emqtt:stop(C2), + + %% Now check consistency of the file + + {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(), + {ReadyTransferIds, _} = lists:unzip(ReadyTransfers), + [ReadyTransferId] = [Id || #{<<"clientid">> := CId} = Id <- ReadyTransferIds, CId == ClientId], + + {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId), + + ?assertEqual( + iolist_to_binary(Data), + iolist_to_binary(qlc:eval(TableQH)) + ). + +t_assemble_crash(Config) -> + C = ?config(client, Config), + + meck:new(emqx_ft_storage_fs), + meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end), + + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1) + ). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- @@ -264,7 +375,7 @@ t_invalid_checksum(Config) -> with_offsets(Items) -> {List, _} = lists:mapfoldl( fun(Item, Offset) -> - {{Item, Offset}, Offset + byte_size(Item)} + {{Item, integer_to_binary(Offset)}, Offset + byte_size(Item)} end, 0, Items diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl new file mode 100644 index 000000000..cca69796d --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -0,0 +1,65 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_conf_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft]), + Config. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), + ok. + +init_per_testcase(_Case, Config) -> + Config. + +end_per_testcase(_Case, _Config) -> + ok. + +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + +t_update_config(_Config) -> + ?assertMatch( + {error, #{kind := validation_error}}, + emqx_conf:update( + [file_transfer], + #{<<"storage">> => #{<<"type">> => <<"unknown">>}}, + #{} + ) + ), + ?assertMatch( + {ok, _}, + emqx_conf:update( + [file_transfer], + #{<<"storage">> => #{<<"type">> => <<"local">>, <<"root">> => <<"/tmp/path">>}}, + #{} + ) + ), + ?assertEqual( + <<"/tmp/path">>, + emqx_config:get([file_transfer, storage, root]) + ). diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl new file mode 100644 index 000000000..20645bcd9 --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -0,0 +1,151 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(assertInclude(Pattern, List), + ?assert( + lists:any( + fun + (Pattern) -> true; + (_) -> false + end, + List + ) + ) +). + +all() -> + [ + {group, single_node}, + {group, cluster} + ]. + +-define(CLUSTER_CASES, [t_multinode_ready_transfers]). + +groups() -> + [ + {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- ?CLUSTER_CASES}, + {cluster, [sequence], ?CLUSTER_CASES} + ]. + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config)), + ok = emqx_common_test_helpers:set_gen_rpc_stateless(), + Config. +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), + ok. + +set_special_configs(Config) -> + fun + (emqx_ft) -> + ok = emqx_config:put([file_transfer, storage], #{ + type => local, root => emqx_ft_test_helpers:ft_root(Config, node()) + }); + (_) -> + ok + end. + +init_per_testcase(Case, Config) -> + [{tc, Case} | Config]. +end_per_testcase(_Case, _Config) -> + ok. + +init_per_group(cluster, Config) -> + Node = emqx_ft_test_helpers:start_additional_node(Config, test2), + [{additional_node, Node} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(cluster, Config) -> + ok = emqx_ft_test_helpers:stop_additional_node(Config); +end_per_group(_Group, _Config) -> + ok. + +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + +t_invalid_ready_transfer_id(Config) -> + ?assertMatch( + {error, _}, + emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{ + <<"clientid">> => client_id(Config), + <<"fileid">> => <<"fileid">>, + <<"node">> => atom_to_binary('nonexistent@127.0.0.1') + }) + ), + ?assertMatch( + {error, _}, + emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{ + <<"clientid">> => client_id(Config), + <<"fileid">> => <<"fileid">>, + <<"node">> => <<"nonexistent_as_atom@127.0.0.1">> + }) + ), + ?assertMatch( + {error, _}, + emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{ + <<"clientid">> => client_id(Config), + <<"fileid">> => <<"nonexistent_file">>, + <<"node">> => node() + }) + ). + +t_multinode_ready_transfers(Config) -> + Node1 = ?config(additional_node, Config), + ok = emqx_ft_test_helpers:upload_file(<<"c1">>, <<"f1">>, <<"data">>, Node1), + + Node2 = node(), + ok = emqx_ft_test_helpers:upload_file(<<"c2">>, <<"f2">>, <<"data">>, Node2), + + ?assertInclude( + #{<<"clientid">> := <<"c1">>, <<"fileid">> := <<"f1">>}, + ready_transfer_ids(Config) + ), + + ?assertInclude( + #{<<"clientid">> := <<"c2">>, <<"fileid">> := <<"f2">>}, + ready_transfer_ids(Config) + ). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +client_id(Config) -> + atom_to_binary(?config(tc, Config), utf8). + +storage(Config) -> + #{ + type => local, + root => ft_root(Config) + }. + +ft_root(Config) -> + emqx_ft_test_helpers:ft_root(Config, node()). + +ready_transfer_ids(Config) -> + {ok, ReadyTransfers} = emqx_ft_storage_fs:ready_transfers(storage(Config)), + {ReadyTransferIds, _} = lists:unzip(ReadyTransfers), + ReadyTransferIds. diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl new file mode 100644 index 000000000..ca854bda0 --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -0,0 +1,77 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_test_helpers). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). + +start_additional_node(Config, Node) -> + SelfNode = node(), + emqx_common_test_helpers:start_slave( + Node, + [ + {apps, [emqx_ft]}, + {join_to, SelfNode}, + {configure_gen_rpc, false}, + {env_handler, fun + (emqx_ft) -> + ok = emqx_config:put([file_transfer, storage], #{ + type => local, root => ft_root(Config, node()) + }); + (_) -> + ok + end} + ] + ). + +stop_additional_node(Config) -> + Node = ?config(additional_node, Config), + ok = rpc:call(Node, ekka, leave, []), + ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]), + {ok, _} = emqx_common_test_helpers:stop_slave(Node), + ok. + +tcp_port(Node) -> + {_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), + Port. + +ft_root(Config, Node) -> + filename:join([ + ?config(priv_dir, Config), <<"file_transfer">>, atom_to_binary(Node) + ]). + +upload_file(ClientId, FileId, Data, Node) -> + Port = tcp_port(Node), + + {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), + {ok, _} = emqtt:connect(C1), + Meta = #{ + name => FileId, + expire_at => erlang:system_time(_Unit = second) + 3600, + size => byte_size(Data) + }, + MetaPayload = emqx_json:encode(Meta), + + MetaTopic = <<"$file/", FileId/binary, "/init">>, + {ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), + {ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1), + + FinTopic = <<"$file/", FileId/binary, "/fin">>, + {ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1), + ok = emqtt:stop(C1).