%%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_ft_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -define(assertRCName(RCName, PublishRes), ?assertMatch( {ok, #{reason_code_name := RCName}}, PublishRes ) ). all() -> [ {group, async_mode}, {group, sync_mode}, {group, cluster} ]. groups() -> [ {single_node, [], [ t_assemble_crash, t_corrupted_segment_retry, t_invalid_checksum, t_invalid_fileid, t_invalid_filename, t_invalid_meta, t_invalid_topic_format, t_meta_conflict, t_nasty_clientids_fileids, t_nasty_filenames, t_no_meta, t_no_segment, t_simple_transfer, t_assemble_timeout ]}, {async_mode, [], [ {group, single_node} ]}, {sync_mode, [], [ {group, single_node} ]}, {cluster, [], [ t_switch_node, t_unreliable_migrating_client, {g_concurrent_fins, [{repeat_until_any_fail, 8}], [ t_concurrent_fins ]} ]} ]. suite() -> [{timetrap, {seconds, 90}}]. init_per_suite(Config) -> % NOTE % Inhibit local fs GC to simulate it isn't fast enough to collect % complete transfers. Storage = emqx_utils_maps:deep_merge( emqx_ft_test_helpers:local_storage(Config), #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}} ), FTConfig = emqx_ft_test_helpers:config(Storage, #{<<"assemble_timeout">> => <<"2s">>}), Apps = emqx_cth_suite:start( [ {emqx_ft, #{config => FTConfig}} ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{suite_apps, Apps} | Config]. end_per_suite(Config) -> ok = emqx_cth_suite:stop(?config(suite_apps, Config)), ok. init_per_testcase(Case, Config) -> ClientId = iolist_to_binary([ atom_to_binary(Case), <<"-">>, emqx_ft_test_helpers:unique_binary_string() ]), ok = set_client_specific_ft_dirs(ClientId, Config), case ?config(group, Config) of cluster -> [{clientid, ClientId} | Config]; _ -> {ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), {ok, _} = emqtt:connect(C), [{client, C}, {clientid, ClientId} | Config] end. end_per_testcase(_Case, Config) -> _ = [ok = emqtt:stop(C) || {client, C} <- Config], ok. init_per_group(Group = cluster, Config) -> WorkDir = ?config(priv_dir, Config), Cluster = mk_cluster_specs(Config), Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), [{group, Group}, {cluster_nodes, Nodes} | Config]; init_per_group(_Group = async_mode, Config) -> [{mode, sync} | Config]; init_per_group(_Group = sync_mode, Config) -> [{mode, async} | Config]; init_per_group(Group, Config) -> [{group, Group} | Config]. end_per_group(cluster, Config) -> ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); end_per_group(_Group, _Config) -> ok. mk_cluster_specs(_Config) -> CommonOpts = #{ role => core, join_to => node(), apps => [ {emqx_conf, #{start => false}}, {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}}, {emqx_ft, "file_transfer { enable = true }"} ] }, [ {emqx_ft_SUITE1, CommonOpts}, {emqx_ft_SUITE2, CommonOpts} ]. %%-------------------------------------------------------------------- %% Single node tests %%-------------------------------------------------------------------- t_invalid_topic_format(Config) -> C = ?config(client, Config), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/fileid">>, <<>>, 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/fileid/">>, <<>>, 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/fileid/offset">>, <<>>, 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/">>, <<>>, 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/X/Y/Z">>, <<>>, 1) ), %% should not be handled by `emqx_ft` ?assertRCName( no_matching_subscribers, emqtt:publish(C, <<"$file">>, <<>>, 1) ). t_invalid_fileid(Config) -> C = ?config(client, Config), ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, <<>>), <<>>, 1) ). t_invalid_filename(Config) -> C = ?config(client, Config), ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, <<"f1">>), encode_meta(meta(".", <<>>)), 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("..", <<>>)), 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("../nice", <<>>)), 1) ), ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, <<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1) ), ?assertRCName( unspecified_error, emqtt:publish( C, mk_init_topic(Config, <<"f4">>), encode_meta(meta(lists:duplicate(1000, $A), <<>>)), 1 ) ). t_simple_transfer(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), Filename = "topsecret.pdf", FileId = <<"f1">>, Data = [<<"first">>, <<"second">>, <<"third">>], Meta = #{size := Filesize} = meta(Filename, Data), ?assertRCName( success, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), lists:foreach( fun({Chunk, Offset}) -> ?assertRCName( success, emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1) ) end, with_offsets(Data) ), ?assertEqual( ok, emqx_ft_test_helpers:fin_result( mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize) ) ), [Export] = list_files(?config(clientid, Config)), ?assertEqual( {ok, iolist_to_binary(Data)}, read_export(Export) ). t_nasty_clientids_fileids(Config) -> Transfers = [ {<<".">>, <<".">>}, {<<"🌚"/utf8>>, <<"🌝"/utf8>>}, {<<"../..">>, <<"😀"/utf8>>}, {<<"/etc/passwd">>, <<"whitehat">>}, {<<"; rm -rf / ;">>, <<"whitehat">>} ], ok = lists:foreach( fun({ClientId, FileId}) -> Data = ClientId, ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, "justfile", Data), [Export] = list_files(ClientId), ?assertMatch(#{meta := #{name := "justfile"}}, Export), ?assertEqual({ok, Data}, read_export(Export)) end, Transfers ). t_nasty_filenames(Config) -> Filenames = [ {<<"nasty1">>, "146%"}, {<<"nasty2">>, "🌚"}, {<<"nasty3">>, "δΈ­ζ–‡.txt"}, {<<"nasty4">>, _239Bytes = string:join(lists:duplicate(240 div 5, "LONG"), ".")} ], ok = lists:foreach( fun({ClientId, Filename}) -> FileId = unicode:characters_to_binary(Filename), ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, Filename, FileId), [Export] = list_files(ClientId), ?assertMatch(#{meta := #{name := Filename}}, Export), ?assertEqual({ok, FileId}, read_export(Export)) end, Filenames ). t_meta_conflict(Config) -> C = ?config(client, Config), Filename = "topsecret.pdf", FileId = <<"f1">>, Meta = meta(Filename, [<<"x">>]), ?assertRCName( success, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), ConflictMeta = Meta#{name => "conflict.pdf"}, ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(ConflictMeta), 1) ). t_no_meta(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), FileId = <<"f1">>, Data = <<"first">>, ?assertRCName( success, emqtt:publish(C, mk_segment_topic(Config, FileId, 0), Data, 1) ), ?assertEqual( {error, unspecified_error}, emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, mk_fin_topic(Config, FileId, 42)) ). t_no_segment(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), Filename = "topsecret.pdf", FileId = <<"f1">>, Data = [<<"first">>, <<"second">>, <<"third">>], Meta = #{size := Filesize} = meta(Filename, Data), ?assertRCName( success, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), lists:foreach( fun({Chunk, Offset}) -> ?assertRCName( success, emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1) ) end, %% Skip the first segment tl(with_offsets(Data)) ), ?assertEqual( {error, unspecified_error}, emqx_ft_test_helpers:fin_result( mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize) ) ). t_invalid_meta(Config) -> C = ?config(client, Config), FileId = <<"f1">>, %% Invalid schema Meta = #{foo => <<"bar">>}, MetaPayload = emqx_utils_json:encode(Meta), ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1) ), %% Invalid JSON ?assertRCName( unspecified_error, emqtt:publish(C, mk_init_topic(Config, FileId), <<"{oops;">>, 1) ). t_invalid_checksum(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), Filename = "topsecret.pdf", FileId = <<"f1">>, Data = [<<"first">>, <<"second">>, <<"third">>], Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = encode_meta(Meta#{checksum => {sha256, sha256(<<"invalid">>)}}), ?assertRCName( success, emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1) ), lists:foreach( fun({Chunk, Offset}) -> ?assertRCName( success, emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1) ) end, with_offsets(Data) ), % Send `fin` w/o checksum, should fail since filemeta checksum is invalid FinTopic = mk_fin_topic(Config, FileId, Filesize), ?assertEqual( {error, unspecified_error}, emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, FinTopic) ), % Send `fin` with the correct checksum Checksum = binary:encode_hex(sha256(Data)), ?assertEqual( ok, emqx_ft_test_helpers:fin_result( mode(Config), ClientId, C, <> ) ). t_corrupted_segment_retry(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), Filename = "corruption.pdf", FileId = <<"4242-4242">>, Data = [<<"first">>, <<"second">>, <<"third">>], [ {Seg1, Offset1}, {Seg2, Offset2}, {Seg3, Offset3} ] = with_offsets(Data), [ Checksum1, Checksum2, Checksum3 ] = [binary:encode_hex(sha256(S)) || S <- Data], Meta = #{size := Filesize} = meta(Filename, Data), ?assertRCName(success, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)), ?assertRCName( success, emqtt:publish(C, mk_segment_topic(Config, FileId, Offset1, Checksum1), Seg1, 1) ), % segment is corrupted ?assertRCName( unspecified_error, emqtt:publish( C, mk_segment_topic(Config, FileId, Offset2, Checksum2), <>, 1 ) ), % retry ?assertRCName( success, emqtt:publish(C, mk_segment_topic(Config, FileId, Offset2, Checksum2), Seg2, 1) ), ?assertRCName( success, emqtt:publish(C, mk_segment_topic(Config, FileId, Offset3, Checksum3), Seg3, 1) ), ?assertEqual( ok, emqx_ft_test_helpers:fin_result( mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize) ) ). t_assemble_crash(Config) -> C = ?config(client, Config), meck:new(emqx_ft_storage_fs), meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1) ), meck:unload(emqx_ft_storage_fs). t_assemble_timeout(Config) -> C = ?config(client, Config), ClientId = ?config(clientid, Config), SleepForever = fun() -> Ref = make_ref(), receive Ref -> ok end end, ok = meck:new(emqx_ft_storage, [passthrough]), ok = meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end), {Time, Res} = timer:tc( fun() -> emqx_ft_test_helpers:fin_result( mode(Config), ClientId, C, <<"$file/someid/fin/9999999">> ) end ), ok = meck:unload(emqx_ft_storage), ?assertEqual( {error, unspecified_error}, Res ), ?assert(2_000_000 < Time). %%-------------------------------------------------------------------- %% Cluster tests %%-------------------------------------------------------------------- t_switch_node(Config) -> [Node | _] = ?config(cluster_nodes, Config), AdditionalNodePort = emqx_ft_test_helpers:tcp_port(Node), ClientId = <<"t_switch_node-migrating_client">>, {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, AdditionalNodePort}]), {ok, _} = emqtt:connect(C1), Filename = "multinode_upload.txt", FileId = <<"f1">>, Data = [<<"first">>, <<"second">>, <<"third">>], [{Data0, Offset0}, {Data1, Offset1}, {Data2, Offset2}] = with_offsets(Data), %% First, publist metadata and the first segment to the additional node Meta = #{size := Filesize} = meta(Filename, Data), ?assertRCName( success, emqtt:publish(C1, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), ?assertRCName( success, emqtt:publish(C1, mk_segment_topic(Config, FileId, Offset0), Data0, 1) ), %% Then, switch the client to the main node %% and publish the rest of the segments ok = emqtt:stop(C1), {ok, C2} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), {ok, _} = emqtt:connect(C2), ?assertRCName( success, emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset1), Data1, 1) ), ?assertRCName( success, emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset2), Data2, 1) ), ?assertRCName( success, emqtt:publish(C2, mk_fin_topic(Config, FileId, Filesize), <<>>, 1) ), ok = emqtt:stop(C2), %% Now check consistency of the file [Export] = list_files(ClientId), ?assertEqual( {ok, iolist_to_binary(Data)}, read_export(Export) ). t_unreliable_migrating_client(Config) -> NodeSelf = node(), [Node1, Node2] = ?config(cluster_nodes, Config), ClientId = ?config(clientid, Config), FileId = emqx_guid:to_hexstr(emqx_guid:gen()), Filename = "migratory-birds-in-southern-hemisphere-2013.pdf", Filesize = 1000, Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16), Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)), Meta = meta(Filename, Payload), Context = #{ clientid => ClientId, fileid => FileId, filesize => Filesize, payload => Payload }, Commands = [ % Connect to the broker on the current node {fun connect_mqtt_client/2, [NodeSelf]}, % Send filemeta and 3 initial segments % (assuming client chose 100 bytes as a desired segment size) {fun send_filemeta/3, [Config, Meta]}, {fun send_segment/4, [Config, 0, 100]}, {fun send_segment/4, [Config, 100, 100]}, {fun send_segment/4, [Config, 200, 100]}, % Disconnect the client cleanly {fun stop_mqtt_client/1, []}, % Connect to the broker on `Node1` {fun connect_mqtt_client/2, [Node1]}, % Connect to the broker on `Node2` without first disconnecting from `Node1` % Client forgot the state for some reason and started the transfer again. % (assuming this is usual for a client on a device that was rebooted) {fun connect_mqtt_client/2, [Node2]}, {fun send_filemeta/3, [Config, Meta]}, % This time it chose 200 bytes as a segment size {fun send_segment/4, [Config, 0, 200]}, {fun send_segment/4, [Config, 200, 200]}, % But now it downscaled back to 100 bytes segments {fun send_segment/4, [Config, 400, 100]}, % Client lost connectivity and reconnected % (also had last few segments unacked and decided to resend them) {fun connect_mqtt_client/2, [Node2]}, {fun send_segment/4, [Config, 200, 200]}, {fun send_segment/4, [Config, 400, 200]}, % Client lost connectivity and reconnected, this time to another node % (also had last segment unacked and decided to resend it) {fun connect_mqtt_client/2, [Node1]}, {fun send_segment/4, [Config, 400, 200]}, {fun send_segment/4, [Config, 600, eof]}, {fun send_finish/2, [Config]}, % Client lost connectivity and reconnected, this time to the current node % (client had `fin` unacked and decided to resend it) {fun connect_mqtt_client/2, [NodeSelf]}, {fun send_finish/2, [Config]} ], _Context = run_commands(Commands, Context), Exports = list_files(?config(clientid, Config)), Node1Str = atom_to_list(Node1), % TODO: this testcase is specific to local fs storage backend ?assertMatch( [#{"node" := Node1Str}], fs_exported_file_attributes(Exports) ), [ ?assertEqual({ok, Payload}, read_export(Export)) || Export <- Exports ]. t_concurrent_fins(Config) -> ct:timetrap({seconds, 10}), NodeSelf = node(), [Node1, Node2] = ?config(cluster_nodes, Config), ClientId = iolist_to_binary([ ?config(clientid, Config), integer_to_list(erlang:unique_integer()) ]), FileId = emqx_guid:to_hexstr(emqx_guid:gen()), Filename = "migratory-birds-in-southern-hemisphere-2013.pdf", Filesize = 100, Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16), Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)), Meta = meta(Filename, Payload), %% Send filemeta and segments to Node1 Context0 = #{ clientid => ClientId, fileid => FileId, filesize => Filesize, payload => Payload }, Context1 = run_commands( [ {fun connect_mqtt_client/2, [Node1]}, {fun send_filemeta/3, [Config, Meta]}, {fun send_segment/4, [Config, 0, 100]}, {fun stop_mqtt_client/1, []} ], Context0 ), %% Now send fins concurrently to the 3 nodes Nodes = [Node1, Node2, NodeSelf], SendFin = fun(Node) -> run_commands( [ {fun connect_mqtt_client/2, [Node]}, {fun send_finish/2, [Config]} ], Context1 ) end, PidMons = lists:map( fun(Node) -> erlang:spawn_monitor(fun F() -> _ = erlang:process_flag(trap_exit, true), try SendFin(Node) catch C:E -> % NOTE: random delay to avoid livelock conditions ct:pal("Node ~p did not send finish successfully: ~p:~p", [Node, C, E]), ok = timer:sleep(rand:uniform(10)), F() end end) end, Nodes ), ok = lists:foreach( fun({Pid, MRef}) -> receive {'DOWN', MRef, process, Pid, normal} -> ok end end, PidMons ), %% Only one node should have the file Exports = list_files(ClientId), case fs_exported_file_attributes(Exports) of [#{"node" := _Node}] -> ok; [#{"node" := _Node} | _] = Files -> % ...But we can't really guarantee that ct:comment({multiple_files_on_different_nodes, Files}) end. %%------------------------------------------------------------------------------ %% Command helpers %%------------------------------------------------------------------------------ %% Command runners run_commands(Commands, Context) -> lists:foldl(fun run_command/2, Context, Commands). run_command({Command, Args}, Context) -> ct:pal("COMMAND ~p ~p", [erlang:fun_info(Command, name), Args]), erlang:apply(Command, Args ++ [Context]). %% Commands connect_mqtt_client(Node, ContextIn) -> Context = #{clientid := ClientId} = disown_mqtt_client(ContextIn), NodePort = emqx_ft_test_helpers:tcp_port(Node), {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, NodePort}]), {ok, _} = emqtt:connect(Client), Context#{client => Client}. stop_mqtt_client(Context = #{client := Client}) -> _ = emqtt:stop(Client), maps:remove(client, Context). disown_mqtt_client(Context = #{client := Client}) -> _ = erlang:unlink(Client), maps:remove(client, Context); disown_mqtt_client(Context = #{}) -> Context. send_filemeta(Config, Meta, Context = #{client := Client, fileid := FileId}) -> ?assertRCName( success, emqtt:publish(Client, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), Context. send_segment( Config, Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload} ) -> Data = case Size of eof -> binary:part(Payload, Offset, byte_size(Payload) - Offset); N -> binary:part(Payload, Offset, N) end, ?assertRCName( success, emqtt:publish(Client, mk_segment_topic(Config, FileId, Offset), Data, 1) ), Context. send_finish(Config, Context = #{client := Client, fileid := FileId, filesize := Filesize}) -> ?assertRCName( success, emqtt:publish(Client, mk_fin_topic(Config, FileId, Filesize), <<>>, 1) ), Context. %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ fs_exported_file_attributes(FSExports) -> lists:map( fun(#{uri := URIString}) -> #{query := QS} = uri_string:parse(URIString), maps:from_list(uri_string:dissect_query(QS)) end, lists:sort(FSExports) ). mk_init_topic(Config, FileId) -> RequestTopicPrefix = request_topic_prefix(Config, FileId), <>. mk_segment_topic(Config, FileId, Offset) when is_integer(Offset) -> mk_segment_topic(Config, FileId, integer_to_binary(Offset)); mk_segment_topic(Config, FileId, Offset) when is_binary(Offset) -> RequestTopicPrefix = request_topic_prefix(Config, FileId), <>. mk_segment_topic(Config, FileId, Offset, Checksum) when is_integer(Offset) -> mk_segment_topic(Config, FileId, integer_to_binary(Offset), Checksum); mk_segment_topic(Config, FileId, Offset, Checksum) when is_binary(Offset) -> RequestTopicPrefix = request_topic_prefix(Config, FileId), <>. mk_fin_topic(Config, FileId, Size) when is_integer(Size) -> mk_fin_topic(Config, FileId, integer_to_binary(Size)); mk_fin_topic(Config, FileId, Size) when is_binary(Size) -> RequestTopicPrefix = request_topic_prefix(Config, FileId), <>. request_topic_prefix(Config, FileId) -> emqx_ft_test_helpers:request_topic_prefix(mode(Config), FileId). with_offsets(Items) -> {List, _} = lists:mapfoldl( fun(Item, Offset) -> {{Item, integer_to_binary(Offset)}, Offset + byte_size(Item)} end, 0, Items ), List. sha256(Data) -> crypto:hash(sha256, Data). meta(FileName, Data) -> FullData = iolist_to_binary(Data), #{ name => FileName, checksum => {sha256, sha256(FullData)}, expire_at => erlang:system_time(_Unit = second) + 3600, size => byte_size(FullData) }. encode_meta(Meta) -> emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)). list_files(ClientId) -> {ok, #{items := Files}} = emqx_ft_storage:files(), [File || File = #{transfer := {CId, _}} <- Files, CId == ClientId]. read_export(#{path := AbsFilepath}) -> % TODO: only works for the local filesystem exporter right now file:read_file(AbsFilepath). set_client_specific_ft_dirs(ClientId, Config) -> FTRoot = emqx_ft_test_helpers:ft_root(Config), ok = emqx_config:put( [file_transfer, storage, local, segments, root], filename:join([FTRoot, ClientId, segments]) ), ok = emqx_config:put( [file_transfer, storage, local, exporter, local, root], filename:join([FTRoot, ClientId, exports]) ). mode(Config) -> proplists:get_value(mode, Config, sync).