emqx/apps/emqx_ft/test/emqx_ft_SUITE.erl

897 lines
27 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(assertRCName(RCName, PublishRes),
?assertMatch(
{ok, #{reason_code_name := RCName}},
PublishRes
)
).
all() ->
[
{group, async_mode},
{group, sync_mode},
{group, cluster}
].
groups() ->
[
{single_node, [], [
t_assemble_crash,
t_corrupted_segment_retry,
t_invalid_checksum,
t_invalid_fileid,
t_invalid_filename,
t_invalid_meta,
t_invalid_topic_format,
t_meta_conflict,
t_nasty_clientids_fileids,
t_nasty_filenames,
t_no_meta,
t_no_segment,
t_simple_transfer,
t_assemble_timeout
]},
{async_mode, [], [
{group, single_node}
]},
{sync_mode, [], [
{group, single_node}
]},
{cluster, [], [
t_switch_node,
t_unreliable_migrating_client,
{g_concurrent_fins, [{repeat_until_any_fail, 8}], [
t_concurrent_fins
]}
]}
].
suite() ->
[{timetrap, {seconds, 90}}].
init_per_suite(Config) ->
% NOTE
% Inhibit local fs GC to simulate it isn't fast enough to collect
% complete transfers.
Storage = emqx_utils_maps:deep_merge(
emqx_ft_test_helpers:local_storage(Config),
#{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
),
FTConfig = emqx_ft_test_helpers:config(Storage, #{<<"assemble_timeout">> => <<"2s">>}),
Apps = emqx_cth_suite:start(
[
{emqx_ft, #{config => FTConfig}}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
ok.
init_per_testcase(Case, Config) ->
ClientId = iolist_to_binary([
atom_to_binary(Case), <<"-">>, emqx_ft_test_helpers:unique_binary_string()
]),
ok = set_client_specific_ft_dirs(ClientId, Config),
case ?config(group, Config) of
cluster ->
[{clientid, ClientId} | Config];
_ ->
{ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
[{client, C}, {clientid, ClientId} | Config]
end.
end_per_testcase(_Case, Config) ->
_ = [ok = emqtt:stop(C) || {client, C} <- Config],
ok.
init_per_group(Group = cluster, Config) ->
WorkDir = ?config(priv_dir, Config),
Cluster = mk_cluster_specs(Config),
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
[{group, Group}, {cluster_nodes, Nodes} | Config];
init_per_group(_Group = async_mode, Config) ->
[{mode, sync} | Config];
init_per_group(_Group = sync_mode, Config) ->
[{mode, async} | Config];
init_per_group(Group, Config) ->
[{group, Group} | Config].
end_per_group(cluster, Config) ->
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
end_per_group(_Group, _Config) ->
ok.
mk_cluster_specs(_Config) ->
CommonOpts = #{
role => core,
join_to => node(),
apps => [
{emqx_conf, #{start => false}},
{emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
{emqx_ft, "file_transfer { enable = true }"}
]
},
[
{emqx_ft_SUITE1, CommonOpts},
{emqx_ft_SUITE2, CommonOpts}
].
%%--------------------------------------------------------------------
%% Single node tests
%%--------------------------------------------------------------------
t_invalid_topic_format(Config) ->
C = ?config(client, Config),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/offset">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/X/Y/Z">>, <<>>, 1)
),
%% should not be handled by `emqx_ft`
?assertRCName(
no_matching_subscribers,
emqtt:publish(C, <<"$file">>, <<>>, 1)
).
t_invalid_fileid(Config) ->
C = ?config(client, Config),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, <<>>), <<>>, 1)
).
t_invalid_filename(Config) ->
C = ?config(client, Config),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, <<"f1">>), encode_meta(meta(".", <<>>)), 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("..", <<>>)), 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("../nice", <<>>)), 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, <<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(
C,
mk_init_topic(Config, <<"f4">>),
encode_meta(meta(lists:duplicate(1000, $A), <<>>)),
1
)
).
t_simple_transfer(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
Filename = "topsecret.pdf",
FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = #{size := Filesize} = meta(Filename, Data),
?assertRCName(
success,
emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
),
lists:foreach(
fun({Chunk, Offset}) ->
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1)
)
end,
with_offsets(Data)
),
?assertEqual(
ok,
emqx_ft_test_helpers:fin_result(
mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize)
)
),
[Export] = list_files(?config(clientid, Config)),
?assertEqual(
{ok, iolist_to_binary(Data)},
read_export(Export)
).
t_nasty_clientids_fileids(Config) ->
Transfers = [
{<<".">>, <<".">>},
{<<"🌚"/utf8>>, <<"🌝"/utf8>>},
{<<"../..">>, <<"😤"/utf8>>},
{<<"/etc/passwd">>, <<"whitehat">>},
{<<"; rm -rf / ;">>, <<"whitehat">>}
],
ok = lists:foreach(
fun({ClientId, FileId}) ->
Data = ClientId,
ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, "justfile", Data),
[Export] = list_files(ClientId),
?assertMatch(#{meta := #{name := "justfile"}}, Export),
?assertEqual({ok, Data}, read_export(Export))
end,
Transfers
).
t_nasty_filenames(Config) ->
Filenames = [
{<<"nasty1">>, "146%"},
{<<"nasty2">>, "🌚"},
{<<"nasty3">>, "中文.txt"},
{<<"nasty4">>, _239Bytes = string:join(lists:duplicate(240 div 5, "LONG"), ".")}
],
ok = lists:foreach(
fun({ClientId, Filename}) ->
FileId = unicode:characters_to_binary(Filename),
ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, Filename, FileId),
[Export] = list_files(ClientId),
?assertMatch(#{meta := #{name := Filename}}, Export),
?assertEqual({ok, FileId}, read_export(Export))
end,
Filenames
).
t_meta_conflict(Config) ->
C = ?config(client, Config),
Filename = "topsecret.pdf",
FileId = <<"f1">>,
Meta = meta(Filename, [<<"x">>]),
?assertRCName(
success,
emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
),
ConflictMeta = Meta#{name => "conflict.pdf"},
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(ConflictMeta), 1)
).
t_no_meta(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
FileId = <<"f1">>,
Data = <<"first">>,
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(Config, FileId, 0), Data, 1)
),
?assertEqual(
{error, unspecified_error},
emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, mk_fin_topic(Config, FileId, 42))
).
t_no_segment(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
Filename = "topsecret.pdf",
FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = #{size := Filesize} = meta(Filename, Data),
?assertRCName(
success,
emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
),
lists:foreach(
fun({Chunk, Offset}) ->
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1)
)
end,
%% Skip the first segment
tl(with_offsets(Data))
),
?assertEqual(
{error, unspecified_error},
emqx_ft_test_helpers:fin_result(
mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize)
)
).
t_invalid_meta(Config) ->
C = ?config(client, Config),
FileId = <<"f1">>,
%% Invalid schema
Meta = #{foo => <<"bar">>},
MetaPayload = emqx_utils_json:encode(Meta),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1)
),
%% Invalid JSON
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_init_topic(Config, FileId), <<"{oops;">>, 1)
).
t_invalid_checksum(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
Filename = "topsecret.pdf",
FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = encode_meta(Meta#{checksum => {sha256, sha256(<<"invalid">>)}}),
?assertRCName(
success,
emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1)
),
lists:foreach(
fun({Chunk, Offset}) ->
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1)
)
end,
with_offsets(Data)
),
% Send `fin` w/o checksum, should fail since filemeta checksum is invalid
FinTopic = mk_fin_topic(Config, FileId, Filesize),
?assertEqual(
{error, unspecified_error},
emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, FinTopic)
),
% Send `fin` with the correct checksum
Checksum = binary:encode_hex(sha256(Data)),
?assertEqual(
ok,
emqx_ft_test_helpers:fin_result(
mode(Config), ClientId, C, <<FinTopic/binary, "/", Checksum/binary>>
)
).
t_corrupted_segment_retry(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
Filename = "corruption.pdf",
FileId = <<"4242-4242">>,
Data = [<<"first">>, <<"second">>, <<"third">>],
[
{Seg1, Offset1},
{Seg2, Offset2},
{Seg3, Offset3}
] = with_offsets(Data),
[
Checksum1,
Checksum2,
Checksum3
] = [binary:encode_hex(sha256(S)) || S <- Data],
Meta = #{size := Filesize} = meta(Filename, Data),
?assertRCName(success, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)),
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(Config, FileId, Offset1, Checksum1), Seg1, 1)
),
% segment is corrupted
?assertRCName(
unspecified_error,
emqtt:publish(
C, mk_segment_topic(Config, FileId, Offset2, Checksum2), <<Seg2/binary, 42>>, 1
)
),
% retry
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(Config, FileId, Offset2, Checksum2), Seg2, 1)
),
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(Config, FileId, Offset3, Checksum3), Seg3, 1)
),
?assertEqual(
ok,
emqx_ft_test_helpers:fin_result(
mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize)
)
).
t_assemble_crash(Config) ->
C = ?config(client, Config),
meck:new(emqx_ft_storage_fs),
meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1)
),
meck:unload(emqx_ft_storage_fs).
t_assemble_timeout(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
SleepForever = fun() ->
Ref = make_ref(),
receive
Ref -> ok
end
end,
ok = meck:new(emqx_ft_storage, [passthrough]),
ok = meck:expect(emqx_ft_storage, assemble, fun(_, _, _) ->
{async, spawn_link(SleepForever)}
end),
{Time, Res} = timer:tc(
fun() ->
emqx_ft_test_helpers:fin_result(
mode(Config), ClientId, C, <<"$file/someid/fin/9999999">>
)
end
),
ok = meck:unload(emqx_ft_storage),
?assertEqual(
{error, unspecified_error},
Res
),
?assert(2_000_000 < Time).
%%--------------------------------------------------------------------
%% Cluster tests
%%--------------------------------------------------------------------
t_switch_node(Config) ->
[Node | _] = ?config(cluster_nodes, Config),
AdditionalNodePort = emqx_ft_test_helpers:tcp_port(Node),
ClientId = <<"t_switch_node-migrating_client">>,
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, AdditionalNodePort}]),
{ok, _} = emqtt:connect(C1),
Filename = "multinode_upload.txt",
FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>],
[{Data0, Offset0}, {Data1, Offset1}, {Data2, Offset2}] = with_offsets(Data),
%% First, publist metadata and the first segment to the additional node
Meta = #{size := Filesize} = meta(Filename, Data),
?assertRCName(
success,
emqtt:publish(C1, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
),
?assertRCName(
success,
emqtt:publish(C1, mk_segment_topic(Config, FileId, Offset0), Data0, 1)
),
%% Then, switch the client to the main node
%% and publish the rest of the segments
ok = emqtt:stop(C1),
{ok, C2} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C2),
?assertRCName(
success,
emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset1), Data1, 1)
),
?assertRCName(
success,
emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset2), Data2, 1)
),
?assertRCName(
success,
emqtt:publish(C2, mk_fin_topic(Config, FileId, Filesize), <<>>, 1)
),
ok = emqtt:stop(C2),
%% Now check consistency of the file
[Export] = list_files(ClientId),
?assertEqual(
{ok, iolist_to_binary(Data)},
read_export(Export)
).
t_unreliable_migrating_client(Config) ->
NodeSelf = node(),
[Node1, Node2] = ?config(cluster_nodes, Config),
ClientId = ?config(clientid, Config),
FileId = emqx_guid:to_hexstr(emqx_guid:gen()),
Filename = "migratory-birds-in-southern-hemisphere-2013.pdf",
Filesize = 1000,
Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16),
Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)),
Meta = meta(Filename, Payload),
Context = #{
clientid => ClientId,
fileid => FileId,
filesize => Filesize,
payload => Payload
},
Commands = [
% Connect to the broker on the current node
{fun connect_mqtt_client/2, [NodeSelf]},
% Send filemeta and 3 initial segments
% (assuming client chose 100 bytes as a desired segment size)
{fun send_filemeta/3, [Config, Meta]},
{fun send_segment/4, [Config, 0, 100]},
{fun send_segment/4, [Config, 100, 100]},
{fun send_segment/4, [Config, 200, 100]},
% Disconnect the client cleanly
{fun stop_mqtt_client/1, []},
% Connect to the broker on `Node1`
{fun connect_mqtt_client/2, [Node1]},
% Connect to the broker on `Node2` without first disconnecting from `Node1`
% Client forgot the state for some reason and started the transfer again.
% (assuming this is usual for a client on a device that was rebooted)
{fun connect_mqtt_client/2, [Node2]},
{fun send_filemeta/3, [Config, Meta]},
% This time it chose 200 bytes as a segment size
{fun send_segment/4, [Config, 0, 200]},
{fun send_segment/4, [Config, 200, 200]},
% But now it downscaled back to 100 bytes segments
{fun send_segment/4, [Config, 400, 100]},
% Client lost connectivity and reconnected
% (also had last few segments unacked and decided to resend them)
{fun connect_mqtt_client/2, [Node2]},
{fun send_segment/4, [Config, 200, 200]},
{fun send_segment/4, [Config, 400, 200]},
% Client lost connectivity and reconnected, this time to another node
% (also had last segment unacked and decided to resend it)
{fun connect_mqtt_client/2, [Node1]},
{fun send_segment/4, [Config, 400, 200]},
{fun send_segment/4, [Config, 600, eof]},
{fun send_finish/2, [Config]},
% Client lost connectivity and reconnected, this time to the current node
% (client had `fin` unacked and decided to resend it)
{fun connect_mqtt_client/2, [NodeSelf]},
{fun send_finish/2, [Config]}
],
_Context = run_commands(Commands, Context),
Exports = list_files(?config(clientid, Config)),
Node1Str = atom_to_list(Node1),
% TODO: this testcase is specific to local fs storage backend
?assertMatch(
[#{"node" := Node1Str}],
fs_exported_file_attributes(Exports)
),
[
?assertEqual({ok, Payload}, read_export(Export))
|| Export <- Exports
].
t_concurrent_fins(Config) ->
ct:timetrap({seconds, 10}),
NodeSelf = node(),
[Node1, Node2] = ?config(cluster_nodes, Config),
ClientId = iolist_to_binary([
?config(clientid, Config),
integer_to_list(erlang:unique_integer())
]),
FileId = emqx_guid:to_hexstr(emqx_guid:gen()),
Filename = "migratory-birds-in-southern-hemisphere-2013.pdf",
Filesize = 100,
Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16),
Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)),
Meta = meta(Filename, Payload),
%% Send filemeta and segments to Node1
Context0 = #{
clientid => ClientId,
fileid => FileId,
filesize => Filesize,
payload => Payload
},
Context1 = run_commands(
[
{fun connect_mqtt_client/2, [Node1]},
{fun send_filemeta/3, [Config, Meta]},
{fun send_segment/4, [Config, 0, 100]},
{fun stop_mqtt_client/1, []}
],
Context0
),
%% Now send fins concurrently to the 3 nodes
Nodes = [Node1, Node2, NodeSelf],
SendFin = fun(Node) ->
run_commands(
[
{fun connect_mqtt_client/2, [Node]},
{fun send_finish/2, [Config]}
],
Context1
)
end,
PidMons = lists:map(
fun(Node) ->
erlang:spawn_monitor(fun F() ->
_ = erlang:process_flag(trap_exit, true),
try
SendFin(Node)
catch
C:E ->
% NOTE: random delay to avoid livelock conditions
ct:pal("Node ~p did not send finish successfully: ~p:~p", [Node, C, E]),
ok = timer:sleep(rand:uniform(10)),
F()
end
end)
end,
Nodes
),
ok = lists:foreach(
fun({Pid, MRef}) ->
receive
{'DOWN', MRef, process, Pid, normal} -> ok
end
end,
PidMons
),
%% Only one node should have the file
Exports = list_files(ClientId),
case fs_exported_file_attributes(Exports) of
[#{"node" := _Node}] ->
ok;
[#{"node" := _Node} | _] = Files ->
% ...But we can't really guarantee that
ct:comment({multiple_files_on_different_nodes, Files})
end.
%%------------------------------------------------------------------------------
%% Command helpers
%%------------------------------------------------------------------------------
%% Command runners
run_commands(Commands, Context) ->
lists:foldl(fun run_command/2, Context, Commands).
run_command({Command, Args}, Context) ->
ct:pal("COMMAND ~p ~p", [erlang:fun_info(Command, name), Args]),
erlang:apply(Command, Args ++ [Context]).
%% Commands
connect_mqtt_client(Node, ContextIn) ->
Context = #{clientid := ClientId} = disown_mqtt_client(ContextIn),
NodePort = emqx_ft_test_helpers:tcp_port(Node),
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, NodePort}]),
{ok, _} = emqtt:connect(Client),
Context#{client => Client}.
stop_mqtt_client(Context = #{client := Client}) ->
_ = emqtt:stop(Client),
maps:remove(client, Context).
disown_mqtt_client(Context = #{client := Client}) ->
_ = erlang:unlink(Client),
maps:remove(client, Context);
disown_mqtt_client(Context = #{}) ->
Context.
send_filemeta(Config, Meta, Context = #{client := Client, fileid := FileId}) ->
?assertRCName(
success,
emqtt:publish(Client, mk_init_topic(Config, FileId), encode_meta(Meta), 1)
),
Context.
send_segment(
Config, Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}
) ->
Data =
case Size of
eof ->
binary:part(Payload, Offset, byte_size(Payload) - Offset);
N ->
binary:part(Payload, Offset, N)
end,
?assertRCName(
success,
emqtt:publish(Client, mk_segment_topic(Config, FileId, Offset), Data, 1)
),
Context.
send_finish(Config, Context = #{client := Client, fileid := FileId, filesize := Filesize}) ->
?assertRCName(
success,
emqtt:publish(Client, mk_fin_topic(Config, FileId, Filesize), <<>>, 1)
),
Context.
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
fs_exported_file_attributes(FSExports) ->
lists:map(
fun(#{uri := URIString}) ->
#{query := QS} = uri_string:parse(URIString),
maps:from_list(uri_string:dissect_query(QS))
end,
lists:sort(FSExports)
).
mk_init_topic(Config, FileId) ->
RequestTopicPrefix = request_topic_prefix(Config, FileId),
<<RequestTopicPrefix/binary, "/init">>.
mk_segment_topic(Config, FileId, Offset) when is_integer(Offset) ->
mk_segment_topic(Config, FileId, integer_to_binary(Offset));
mk_segment_topic(Config, FileId, Offset) when is_binary(Offset) ->
RequestTopicPrefix = request_topic_prefix(Config, FileId),
<<RequestTopicPrefix/binary, "/", Offset/binary>>.
mk_segment_topic(Config, FileId, Offset, Checksum) when is_integer(Offset) ->
mk_segment_topic(Config, FileId, integer_to_binary(Offset), Checksum);
mk_segment_topic(Config, FileId, Offset, Checksum) when is_binary(Offset) ->
RequestTopicPrefix = request_topic_prefix(Config, FileId),
<<RequestTopicPrefix/binary, "/", Offset/binary, "/", Checksum/binary>>.
mk_fin_topic(Config, FileId, Size) when is_integer(Size) ->
mk_fin_topic(Config, FileId, integer_to_binary(Size));
mk_fin_topic(Config, FileId, Size) when is_binary(Size) ->
RequestTopicPrefix = request_topic_prefix(Config, FileId),
<<RequestTopicPrefix/binary, "/fin/", Size/binary>>.
request_topic_prefix(Config, FileId) ->
emqx_ft_test_helpers:request_topic_prefix(mode(Config), FileId).
with_offsets(Items) ->
{List, _} = lists:mapfoldl(
fun(Item, Offset) ->
{{Item, integer_to_binary(Offset)}, Offset + byte_size(Item)}
end,
0,
Items
),
List.
sha256(Data) ->
crypto:hash(sha256, Data).
meta(FileName, Data) ->
FullData = iolist_to_binary(Data),
#{
name => FileName,
checksum => {sha256, sha256(FullData)},
expire_at => erlang:system_time(_Unit = second) + 3600,
size => byte_size(FullData)
}.
encode_meta(Meta) ->
emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)).
list_files(ClientId) ->
{ok, #{items := Files}} = emqx_ft_storage:files(),
[File || File = #{transfer := {CId, _}} <- Files, CId == ClientId].
read_export(#{path := AbsFilepath}) ->
% TODO: only works for the local filesystem exporter right now
file:read_file(AbsFilepath).
set_client_specific_ft_dirs(ClientId, Config) ->
FTRoot = emqx_ft_test_helpers:ft_root(Config),
ok = emqx_config:put(
[file_transfer, storage, local, segments, root],
filename:join([FTRoot, ClientId, segments])
),
ok = emqx_config:put(
[file_transfer, storage, local, exporter, local, root],
filename:join([FTRoot, ClientId, exports])
).
mode(Config) ->
proplists:get_value(mode, Config, sync).