feat(ft): add file transfer tests

This commit is contained in:
Ilya Averyanov 2023-02-15 11:42:27 +02:00
parent 2e889f4ac7
commit f6a0598f27
10 changed files with 456 additions and 85 deletions

View File

@ -63,7 +63,7 @@
]).
-export([
maybe_fix_gen_rpc/0,
set_gen_rpc_stateless/0,
emqx_cluster/1,
emqx_cluster/2,
start_epmd/0,
@ -617,13 +617,14 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
}.
-spec maybe_fix_gen_rpc() -> ok.
maybe_fix_gen_rpc() ->
-spec set_gen_rpc_stateless() -> ok.
set_gen_rpc_stateless() ->
%% When many tests run in an obscure order, it may occur that
%% `gen_rpc` started with its default settings before `emqx_conf`.
%% `gen_rpc` and `emqx_conf` have different default `port_discovery` modes,
%% so we reinitialize `gen_rpc` explicitly.
ok = application:stop(gen_rpc),
ok = application:set_env(gen_rpc, port_discovery, stateless),
ok = application:start(gen_rpc).
-spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].

View File

@ -175,7 +175,13 @@ on_init(Msg, FileId) ->
case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
ok ->
?RC_SUCCESS;
{error, _Reason} ->
{error, Reason} ->
?SLOG(warning, #{
msg => "store_filemeta_failed",
mqtt_msg => Msg,
file_id => FileId,
reason => Reason
}),
?RC_UNSPECIFIED_ERROR
end;
{error, Reason} ->
@ -235,7 +241,13 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
{ok, _} ->
undefined;
%% Assembling failed, unregister the packet key
{error, _} ->
{error, Reason} ->
?SLOG(warning, #{
msg => "assemble_not_started",
mqtt_msg => Msg,
file_id => FileId,
reason => Reason
}),
case emqx_ft_responder:unregister(FinPacketKey) of
%% We successfully unregistered the packet key,
%% so we can send the error code at once

View File

@ -50,8 +50,8 @@ unload() ->
-spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update(_, _Req, Config) ->
{ok, Config}.
pre_config_update(_, Req, _Config) ->
{ok, Req}.
-spec post_config_update(
list(atom()),

View File

@ -1,43 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_dummy).
-behaviour(emqx_ft_storage).
-export([
store_filemeta/3,
store_segment/3,
assemble/3,
ready_transfers/1,
get_ready_transfer/2
]).
store_filemeta(_Storage, _Transfer, _Meta) ->
ok.
store_segment(_Storage, _Transfer, _Segment) ->
ok.
assemble(_Storage, _Transfer, Callback) ->
Pid = spawn(fun() -> Callback({error, not_implemented}) end),
{ok, Pid}.
ready_transfers(_Storage) ->
{ok, []}.
get_ready_transfer(_Storage, _Id) ->
{error, not_implemented}.

View File

@ -93,7 +93,10 @@ store_filemeta(Storage, Transfer, Meta) ->
{ok, Meta} ->
_ = touch_file(Filepath),
ok;
{ok, _Conflict} ->
{ok, Conflict} ->
?SLOG(warning, #{
msg => "filemeta_conflict", transfer => Transfer, new => Meta, old => Conflict
}),
% TODO
% We won't see conflicts in case of concurrent `store_filemeta`
% requests. It's rather odd scenario so it's fine not to worry

View File

@ -20,7 +20,6 @@
-export([introduced_in/0]).
-export([list/3]).
-export([multilist/3]).
-export([pread/5]).
-export([ready_transfers/1]).
@ -35,11 +34,6 @@
introduced_in() ->
"5.0.17".
-spec list(node(), transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()} | no_return().
list(Node, Transfer, What) ->
erpc:call(Node, emqx_ft_storage_fs_proxy, list_local, [Transfer, What]).
-spec multilist([node()], transfer(), fragment | result) ->
emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}).
multilist(Nodes, Transfer, What) ->

View File

@ -29,35 +29,58 @@
)
).
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
[
{group, single_node},
{group, cluster}
].
groups() ->
[
{single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- [t_switch_node]},
{cluster, [sequence], [t_switch_node]}
].
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun set_special_configs/1),
ok = emqx_common_test_helpers:maybe_fix_gen_rpc(),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config)),
ok = emqx_common_test_helpers:set_gen_rpc_stateless(),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok.
set_special_configs(emqx_ft) ->
{ok, _} = emqx:update_config([file_transfer, storage], #{<<"type">> => <<"local">>}),
ok;
set_special_configs(_App) ->
ok.
set_special_configs(Config) ->
fun
(emqx_ft) ->
ok = emqx_config:put([file_transfer, storage], #{
type => local, root => emqx_ft_test_helpers:ft_root(Config, node())
});
(_) ->
ok
end.
init_per_testcase(_Case, Config) ->
_ = file:del_dir_r(filename:join(emqx:data_dir(), "file_transfer")),
ClientId = <<"client">>,
init_per_testcase(Case, Config) ->
ClientId = atom_to_binary(Case),
{ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
[{client, C}, {clientid, ClientId} | Config].
end_per_testcase(_Case, Config) ->
C = ?config(client, Config),
ok = emqtt:stop(C),
ok.
init_per_group(cluster, Config) ->
Node = emqx_ft_test_helpers:start_additional_node(Config, test2),
[{additional_node, Node} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(cluster, Config) ->
ok = emqx_ft_test_helpers:stop_additional_node(Config);
end_per_group(_Group, _Config) ->
ok.
%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
@ -65,15 +88,34 @@ end_per_testcase(_Case, Config) ->
t_invalid_topic_format(Config) ->
C = ?config(client, Config),
%% TODO: more invalid topics
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/XYZ">>, <<>>, 1)
emqtt:publish(C, <<"$file/fileid">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/offset">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/X/Y/Z">>, <<>>, 1)
),
%% should not be handled by `emqx_ft`
?assertRCName(
no_matching_subscribers,
emqtt:publish(C, <<"$file">>, <<>>, 1)
).
t_simple_transfer(Config) ->
@ -95,8 +137,7 @@ t_simple_transfer(Config) ->
lists:foreach(
fun({Chunk, Offset}) ->
OffsetBin = integer_to_binary(Offset),
SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>,
SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>,
?assertRCName(
success,
emqtt:publish(C, SegmentTopic, Chunk, 1)
@ -111,12 +152,7 @@ t_simple_transfer(Config) ->
emqtt:publish(C, FinTopic, <<>>, 1)
),
ReadyTransferId = #{
<<"fileid">> => FileId,
<<"clientid">> => ?config(clientid, Config),
<<"node">> => atom_to_binary(node(), utf8)
},
{ok, [{ReadyTransferId, _}]} = emqx_ft_storage:ready_transfers(),
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
?assertEqual(
@ -184,8 +220,7 @@ t_no_segment(Config) ->
lists:foreach(
fun({Chunk, Offset}) ->
OffsetBin = integer_to_binary(Offset),
SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>,
SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>,
?assertRCName(
success,
emqtt:publish(C, SegmentTopic, Chunk, 1)
@ -241,8 +276,7 @@ t_invalid_checksum(Config) ->
lists:foreach(
fun({Chunk, Offset}) ->
OffsetBin = integer_to_binary(Offset),
SegmentTopic = <<"$file/", FileId/binary, "/", OffsetBin/binary>>,
SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>,
?assertRCName(
success,
emqtt:publish(C, SegmentTopic, Chunk, 1)
@ -257,6 +291,83 @@ t_invalid_checksum(Config) ->
emqtt:publish(C, FinTopic, <<>>, 1)
).
t_switch_node(Config) ->
AdditionalNodePort = emqx_ft_test_helpers:tcp_port(?config(additional_node, Config)),
ClientId = <<"t_switch_node-migrating_client">>,
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, AdditionalNodePort}]),
{ok, _} = emqtt:connect(C1),
Filename = <<"multinode_upload.txt">>,
FileId = <<"f1">>,
Data = [<<"first">>, <<"second">>, <<"third">>],
[{Data0, Offset0}, {Data1, Offset1}, {Data2, Offset2}] = with_offsets(Data),
%% First, publist metadata and the first segment to the additional node
Meta = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
?assertRCName(
success,
emqtt:publish(C1, MetaTopic, MetaPayload, 1)
),
?assertRCName(
success,
emqtt:publish(C1, <<"$file/", FileId/binary, "/", Offset0/binary>>, Data0, 1)
),
%% Then, switch the client to the main node
%% and publish the rest of the segments
ok = emqtt:stop(C1),
{ok, C2} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C2),
?assertRCName(
success,
emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset1/binary>>, Data1, 1)
),
?assertRCName(
success,
emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1)
),
FinTopic = <<"$file/", FileId/binary, "/fin">>,
?assertRCName(
success,
emqtt:publish(C2, FinTopic, <<>>, 1)
),
ok = emqtt:stop(C2),
%% Now check consistency of the file
{ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(),
{ReadyTransferIds, _} = lists:unzip(ReadyTransfers),
[ReadyTransferId] = [Id || #{<<"clientid">> := CId} = Id <- ReadyTransferIds, CId == ClientId],
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
?assertEqual(
iolist_to_binary(Data),
iolist_to_binary(qlc:eval(TableQH))
).
t_assemble_crash(Config) ->
C = ?config(client, Config),
meck:new(emqx_ft_storage_fs),
meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1)
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
@ -264,7 +375,7 @@ t_invalid_checksum(Config) ->
with_offsets(Items) ->
{List, _} = lists:mapfoldl(
fun(Item, Offset) ->
{{Item, Offset}, Offset + byte_size(Item)}
{{Item, integer_to_binary(Offset)}, Offset + byte_size(Item)}
end,
0,
Items

View File

@ -0,0 +1,65 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_conf_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft]),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok.
init_per_testcase(_Case, Config) ->
Config.
end_per_testcase(_Case, _Config) ->
ok.
%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
t_update_config(_Config) ->
?assertMatch(
{error, #{kind := validation_error}},
emqx_conf:update(
[file_transfer],
#{<<"storage">> => #{<<"type">> => <<"unknown">>}},
#{}
)
),
?assertMatch(
{ok, _},
emqx_conf:update(
[file_transfer],
#{<<"storage">> => #{<<"type">> => <<"local">>, <<"root">> => <<"/tmp/path">>}},
#{}
)
),
?assertEqual(
<<"/tmp/path">>,
emqx_config:get([file_transfer, storage, root])
).

View File

@ -0,0 +1,151 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_fs_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(assertInclude(Pattern, List),
?assert(
lists:any(
fun
(Pattern) -> true;
(_) -> false
end,
List
)
)
).
all() ->
[
{group, single_node},
{group, cluster}
].
-define(CLUSTER_CASES, [t_multinode_ready_transfers]).
groups() ->
[
{single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- ?CLUSTER_CASES},
{cluster, [sequence], ?CLUSTER_CASES}
].
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config)),
ok = emqx_common_test_helpers:set_gen_rpc_stateless(),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok.
set_special_configs(Config) ->
fun
(emqx_ft) ->
ok = emqx_config:put([file_transfer, storage], #{
type => local, root => emqx_ft_test_helpers:ft_root(Config, node())
});
(_) ->
ok
end.
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(_Case, _Config) ->
ok.
init_per_group(cluster, Config) ->
Node = emqx_ft_test_helpers:start_additional_node(Config, test2),
[{additional_node, Node} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(cluster, Config) ->
ok = emqx_ft_test_helpers:stop_additional_node(Config);
end_per_group(_Group, _Config) ->
ok.
%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
t_invalid_ready_transfer_id(Config) ->
?assertMatch(
{error, _},
emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
<<"clientid">> => client_id(Config),
<<"fileid">> => <<"fileid">>,
<<"node">> => atom_to_binary('nonexistent@127.0.0.1')
})
),
?assertMatch(
{error, _},
emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
<<"clientid">> => client_id(Config),
<<"fileid">> => <<"fileid">>,
<<"node">> => <<"nonexistent_as_atom@127.0.0.1">>
})
),
?assertMatch(
{error, _},
emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
<<"clientid">> => client_id(Config),
<<"fileid">> => <<"nonexistent_file">>,
<<"node">> => node()
})
).
t_multinode_ready_transfers(Config) ->
Node1 = ?config(additional_node, Config),
ok = emqx_ft_test_helpers:upload_file(<<"c1">>, <<"f1">>, <<"data">>, Node1),
Node2 = node(),
ok = emqx_ft_test_helpers:upload_file(<<"c2">>, <<"f2">>, <<"data">>, Node2),
?assertInclude(
#{<<"clientid">> := <<"c1">>, <<"fileid">> := <<"f1">>},
ready_transfer_ids(Config)
),
?assertInclude(
#{<<"clientid">> := <<"c2">>, <<"fileid">> := <<"f2">>},
ready_transfer_ids(Config)
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
client_id(Config) ->
atom_to_binary(?config(tc, Config), utf8).
storage(Config) ->
#{
type => local,
root => ft_root(Config)
}.
ft_root(Config) ->
emqx_ft_test_helpers:ft_root(Config, node()).
ready_transfer_ids(Config) ->
{ok, ReadyTransfers} = emqx_ft_storage_fs:ready_transfers(storage(Config)),
{ReadyTransferIds, _} = lists:unzip(ReadyTransfers),
ReadyTransferIds.

View File

@ -0,0 +1,77 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_test_helpers).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
start_additional_node(Config, Node) ->
SelfNode = node(),
emqx_common_test_helpers:start_slave(
Node,
[
{apps, [emqx_ft]},
{join_to, SelfNode},
{configure_gen_rpc, false},
{env_handler, fun
(emqx_ft) ->
ok = emqx_config:put([file_transfer, storage], #{
type => local, root => ft_root(Config, node())
});
(_) ->
ok
end}
]
).
stop_additional_node(Config) ->
Node = ?config(additional_node, Config),
ok = rpc:call(Node, ekka, leave, []),
ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]),
{ok, _} = emqx_common_test_helpers:stop_slave(Node),
ok.
tcp_port(Node) ->
{_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
Port.
ft_root(Config, Node) ->
filename:join([
?config(priv_dir, Config), <<"file_transfer">>, atom_to_binary(Node)
]).
upload_file(ClientId, FileId, Data, Node) ->
Port = tcp_port(Node),
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
{ok, _} = emqtt:connect(C1),
Meta = #{
name => FileId,
expire_at => erlang:system_time(_Unit = second) + 3600,
size => byte_size(Data)
},
MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
{ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
{ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1),
FinTopic = <<"$file/", FileId/binary, "/fin">>,
{ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1),
ok = emqtt:stop(C1).