fix(ft): require final size in `fin` packet

Otherwise there are situations when it's not entirely clear if a
transfer is really ready to be assembled. Since the `size` field
in a filemeta is not required (and rightly so), we need client to
tell us the final transfer size at the end of the process.

Also synthesize a testcase to show why it's needed.

Also worth noting that right now `fin` packets require final size,
even if a client already told us the size through filemeta. The
latter is regarded as serving informational purposes only (which
means that, for example, it might differ from the final size, or
some tranfer progress might show >100% somewhere because of that).
This commit is contained in:
Andrew Mayorov 2023-02-17 17:27:38 +03:00 committed by Ilya Averyanov
parent c073914f75
commit 58115715dd
11 changed files with 295 additions and 81 deletions

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_maybe).
-include_lib("emqx/include/types.hrl").
-export([to_list/1]).
-export([from_list/1]).
-export([apply/2]).
-spec to_list(maybe(A)) -> [A].
to_list(undefined) ->
[];
to_list(Term) ->
[Term].
-spec from_list([A]) -> maybe(A).
from_list([]) ->
undefined;
from_list([Term]) ->
Term.
-spec apply(fun((maybe(A)) -> maybe(A)), maybe(A)) ->
maybe(A).
apply(_Fun, undefined) ->
undefined;
apply(Fun, Term) when is_function(Fun) ->
erlang:apply(Fun, [Term]).

View File

@ -40,6 +40,7 @@
-export_type([ -export_type([
clientid/0, clientid/0,
transfer/0, transfer/0,
bytes/0,
offset/0, offset/0,
filemeta/0, filemeta/0,
segment/0 segment/0
@ -145,10 +146,11 @@ on_file_command(PacketId, Msg, FileCommand) ->
case string:split(FileCommand, <<"/">>, all) of case string:split(FileCommand, <<"/">>, all) of
[FileId, <<"init">>] -> [FileId, <<"init">>] ->
on_init(PacketId, Msg, transfer(Msg, FileId)); on_init(PacketId, Msg, transfer(Msg, FileId));
[FileId, <<"fin">>] -> [FileId, <<"fin">>, FinalSizeBin | ChecksumL] ->
on_fin(PacketId, Msg, transfer(Msg, FileId), undefined); validate([{size, FinalSizeBin}], fun([FinalSize]) ->
[FileId, <<"fin">>, Checksum] -> Checksum = emqx_maybe:from_list(ChecksumL),
on_fin(PacketId, Msg, transfer(Msg, FileId), Checksum); on_fin(PacketId, Msg, transfer(Msg, FileId), FinalSize, Checksum)
end);
[FileId, <<"abort">>] -> [FileId, <<"abort">>] ->
on_abort(Msg, transfer(Msg, FileId)); on_abort(Msg, transfer(Msg, FileId));
[FileId, OffsetBin] -> [FileId, OffsetBin] ->
@ -235,12 +237,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
end end
end). end).
on_fin(PacketId, Msg, Transfer, Checksum) -> on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "on_fin", msg => "on_fin",
mqtt_msg => Msg, mqtt_msg => Msg,
packet_id => PacketId, packet_id => PacketId,
transfer => Transfer, transfer => Transfer,
final_size => FinalSize,
checksum => Checksum checksum => Checksum
}), }),
%% TODO: handle checksum? Do we need it? %% TODO: handle checksum? Do we need it?
@ -249,7 +252,7 @@ on_fin(PacketId, Msg, Transfer, Checksum) ->
?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
end, end,
with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() -> with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() ->
case assemble(Transfer) of case assemble(Transfer, FinalSize) of
%% Assembling completed, ack through the responder right away %% Assembling completed, ack through the responder right away
% ok -> % ok ->
% emqx_ft_responder:ack(FinPacketKey, ok); % emqx_ft_responder:ack(FinPacketKey, ok);
@ -298,9 +301,9 @@ store_segment(Transfer, Segment) ->
{error, {internal_error, E}} {error, {internal_error, E}}
end. end.
assemble(Transfer) -> assemble(Transfer, FinalSize) ->
try try
emqx_ft_storage:assemble(Transfer) emqx_ft_storage:assemble(Transfer, FinalSize)
catch catch
C:E:S -> C:E:S ->
?SLOG(error, #{ ?SLOG(error, #{
@ -359,6 +362,13 @@ do_validate([{offset, Offset} | Rest], Parsed) ->
_ -> _ ->
{error, {invalid_offset, Offset}} {error, {invalid_offset, Offset}}
end; end;
do_validate([{size, Size} | Rest], Parsed) ->
case string:to_integer(Size) of
{Int, <<>>} ->
do_validate(Rest, [Int | Parsed]);
_ ->
{error, {invalid_size, Size}}
end;
do_validate([{checksum, Checksum} | Rest], Parsed) -> do_validate([{checksum, Checksum} | Rest], Parsed) ->
case parse_checksum(Checksum) of case parse_checksum(Checksum) of
{ok, Bin} -> {ok, Bin} ->

View File

@ -16,7 +16,7 @@
-module(emqx_ft_assembler). -module(emqx_ft_assembler).
-export([start_link/2]). -export([start_link/3]).
-behaviour(gen_statem). -behaviour(gen_statem).
-export([callback_mode/0]). -export([callback_mode/0]).
@ -36,11 +36,11 @@
%% %%
start_link(Storage, Transfer) -> start_link(Storage, Transfer, Size) ->
%% TODO %% TODO
%% Additional callbacks? They won't survive restarts by the supervisor, which brings a %% Additional callbacks? They won't survive restarts by the supervisor, which brings a
%% question if we even need to retry with the help of supervisor. %% question if we even need to retry with the help of supervisor.
gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer}, []). gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []).
%% %%
@ -49,11 +49,11 @@ start_link(Storage, Transfer) ->
callback_mode() -> callback_mode() ->
handle_event_function. handle_event_function.
init({Storage, Transfer}) -> init({Storage, Transfer, Size}) ->
St = #st{ St = #st{
storage = Storage, storage = Storage,
transfer = Transfer, transfer = Transfer,
assembly = emqx_ft_assembly:new(), assembly = emqx_ft_assembly:new(Size),
hash = crypto:hash_init(sha256) hash = crypto:hash_init(sha256)
}, },
{ok, idle, St}. {ok, idle, St}.

View File

@ -17,7 +17,7 @@
-module(emqx_ft_assembler_sup). -module(emqx_ft_assembler_sup).
-export([start_link/0]). -export([start_link/0]).
-export([ensure_child/2]). -export([ensure_child/3]).
-behaviour(supervisor). -behaviour(supervisor).
-export([init/1]). -export([init/1]).
@ -25,10 +25,10 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
ensure_child(Storage, Transfer) -> ensure_child(Storage, Transfer, Size) ->
Childspec = #{ Childspec = #{
id => Transfer, id => Transfer,
start => {emqx_ft_assembler, start_link, [Storage, Transfer]}, start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]},
restart => temporary restart => temporary
}, },
case supervisor:start_child(?MODULE, Childspec) of case supervisor:start_child(?MODULE, Childspec) of

View File

@ -16,7 +16,7 @@
-module(emqx_ft_assembly). -module(emqx_ft_assembly).
-export([new/0]). -export([new/1]).
-export([append/3]). -export([append/3]).
-export([update/1]). -export([update/1]).
@ -35,12 +35,12 @@
size size
}). }).
new() -> new(Size) ->
#asm{ #asm{
status = {incomplete, {missing, filemeta}}, status = {incomplete, {missing, filemeta}},
meta = orddict:new(), meta = orddict:new(),
segs = orddict:new(), segs = orddict:new(),
size = 0 size = Size
}. }.
append(Asm, Node, Fragments) when is_list(Fragments) -> append(Asm, Node, Fragments) when is_list(Fragments) ->
@ -114,8 +114,7 @@ append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) ->
% In theory it's possible to have two segments with same offset + size on % In theory it's possible to have two segments with same offset + size on
% different nodes but with differing content. We'd need a checksum to % different nodes but with differing content. We'd need a checksum to
% be able to disambiguate them though. % be able to disambiguate them though.
segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs), segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs)
size = max(End, Asm#asm.size)
}. }.
coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor -> coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor ->
@ -186,7 +185,7 @@ segsize(#{fragment := {segment, Info}}) ->
incomplete_new_test() -> incomplete_new_test() ->
?assertEqual( ?assertEqual(
{incomplete, {missing, filemeta}}, {incomplete, {missing, filemeta}},
status(update(new())) status(update(new(42)))
). ).
incomplete_test() -> incomplete_test() ->
@ -194,7 +193,7 @@ incomplete_test() ->
{incomplete, {missing, filemeta}}, {incomplete, {missing, filemeta}},
status( status(
update( update(
append(new(), node(), [ append(new(142), node(), [
segment(p1, 0, 42), segment(p1, 0, 42),
segment(p1, 42, 100) segment(p1, 42, 100)
]) ])
@ -203,13 +202,13 @@ incomplete_test() ->
). ).
consistent_test() -> consistent_test() ->
Asm1 = append(new(), n1, [filemeta(m1, "blarg")]), Asm1 = append(new(42), n1, [filemeta(m1, "blarg")]),
Asm2 = append(Asm1, n2, [segment(s2, 0, 42)]), Asm2 = append(Asm1, n2, [segment(s2, 0, 42)]),
Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]), Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]),
?assertMatch({complete, _}, status(meta, Asm3)). ?assertMatch({complete, _}, status(meta, Asm3)).
inconsistent_test() -> inconsistent_test() ->
Asm1 = append(new(), node(), [segment(s1, 0, 42)]), Asm1 = append(new(42), node(), [segment(s1, 0, 42)]),
Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]), Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]),
Asm3 = append(Asm2, n2, [segment(s2, 0, 42), filemeta(m1, "blorg")]), Asm3 = append(Asm2, n2, [segment(s2, 0, 42), filemeta(m1, "blorg")]),
Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]), Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]),
@ -231,7 +230,7 @@ simple_coverage_test() ->
{Node, segment(n3, 50, 50)}, {Node, segment(n3, 50, 50)},
{Node, segment(n4, 10, 10)} {Node, segment(n4, 10, 10)}
], ],
Asm = append_many(new(), Segs), Asm = append_many(new(100), Segs),
?assertMatch( ?assertMatch(
{complete, {complete,
[ [
@ -256,7 +255,7 @@ redundant_coverage_test() ->
{Node, segment(n7, 50, 10)}, {Node, segment(n7, 50, 10)},
{node1, segment(n8, 40, 10)} {node1, segment(n8, 40, 10)}
], ],
Asm = append_many(new(), Segs), Asm = append_many(new(70), Segs),
?assertMatch( ?assertMatch(
{complete, {complete,
[ [
@ -279,7 +278,7 @@ redundant_coverage_prefer_local_test() ->
{Node, segment(n5, 30, 10)}, {Node, segment(n5, 30, 10)},
{Node, segment(n6, 20, 10)} {Node, segment(n6, 20, 10)}
], ],
Asm = append_many(new(), Segs), Asm = append_many(new(40), Segs),
?assertMatch( ?assertMatch(
{complete, {complete,
[ [
@ -301,7 +300,7 @@ missing_coverage_test() ->
{node2, segment(n4, 50, 50)}, {node2, segment(n4, 50, 50)},
{Node, segment(n5, 40, 60)} {Node, segment(n5, 40, 60)}
], ],
Asm = append_many(new(), Segs), Asm = append_many(new(100), Segs),
?assertEqual( ?assertEqual(
% {incomplete, {missing, {segment, 30, 40}}}, ??? % {incomplete, {missing, {segment, 30, 40}}}, ???
{incomplete, {missing, {segment, 20, 40}}}, {incomplete, {missing, {segment, 20, 40}}},
@ -314,7 +313,7 @@ missing_end_coverage_test() ->
{Node, segment(n1, 0, 15)}, {Node, segment(n1, 0, 15)},
{node1, segment(n3, 10, 10)} {node1, segment(n3, 10, 10)}
], ],
Asm = append_many(new(), Segs), Asm = append_many(new(20), Segs),
?assertEqual( ?assertEqual(
{incomplete, {missing, {segment, 15, 20}}}, {incomplete, {missing, {segment, 15, 20}}},
status(coverage, Asm) status(coverage, Asm)
@ -328,7 +327,7 @@ missing_coverage_with_redudancy_test() ->
{node43, segment(n4, 10, 50)}, {node43, segment(n4, 10, 50)},
{node(), segment(n5, 40, 60)} {node(), segment(n5, 40, 60)}
], ],
Asm = append_many(new(), Segs), Asm = append_many(new(100), Segs),
?assertEqual( ?assertEqual(
% {incomplete, {missing, {segment, 50, 60}}}, ??? % {incomplete, {missing, {segment, 50, 60}}}, ???
{incomplete, {missing, {segment, 20, 40}}}, {incomplete, {missing, {segment, 20, 40}}},

View File

@ -20,7 +20,7 @@
[ [
store_filemeta/2, store_filemeta/2,
store_segment/2, store_segment/2,
assemble/1, assemble/2,
ready_transfers/0, ready_transfers/0,
get_ready_transfer/1, get_ready_transfer/1,
@ -53,7 +53,7 @@
ok | {async, pid()} | {error, term()}. ok | {async, pid()} | {error, term()}.
-callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) -> -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {async, pid()} | {error, term()}. ok | {async, pid()} | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer()) -> -callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}. ok | {async, pid()} | {error, term()}.
-callback ready_transfers(storage()) -> -callback ready_transfers(storage()) ->
{ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
@ -76,11 +76,11 @@ store_segment(Transfer, Segment) ->
Mod = mod(), Mod = mod(),
Mod:store_segment(storage(), Transfer, Segment). Mod:store_segment(storage(), Transfer, Segment).
-spec assemble(emqx_ft:transfer()) -> -spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}. ok | {async, pid()} | {error, term()}.
assemble(Transfer) -> assemble(Transfer, Size) ->
Mod = mod(), Mod = mod(),
Mod:assemble(storage(), Transfer). Mod:assemble(storage(), Transfer, Size).
-spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. -spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
ready_transfers() -> ready_transfers() ->

View File

@ -24,7 +24,7 @@
-export([store_segment/3]). -export([store_segment/3]).
-export([list/3]). -export([list/3]).
-export([pread/5]). -export([pread/5]).
-export([assemble/2]). -export([assemble/3]).
-export([transfers/1]). -export([transfers/1]).
@ -167,12 +167,12 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
{error, Reason} {error, Reason}
end. end.
-spec assemble(storage(), transfer()) -> -spec assemble(storage(), transfer(), emqx_ft:bytes()) ->
% {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
{async, _Assembler :: pid()} | {error, _TODO}. {async, _Assembler :: pid()} | {error, _TODO}.
assemble(Storage, Transfer) -> assemble(Storage, Transfer, Size) ->
% TODO: ask cluster if the transfer is already assembled % TODO: ask cluster if the transfer is already assembled
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer), {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
{async, Pid}. {async, Pid}.
get_ready_transfer(_Storage, ReadyTransferId) -> get_ready_transfer(_Storage, ReadyTransferId) ->

View File

@ -37,8 +37,14 @@ all() ->
groups() -> groups() ->
[ [
{single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- [t_switch_node]}, {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- group_cluster()},
{cluster, [sequence], [t_switch_node]} {cluster, [sequence], group_cluster()}
].
group_cluster() ->
[
t_switch_node,
t_unreliable_migrating_client
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -61,25 +67,61 @@ set_special_configs(Config) ->
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
ClientId = atom_to_binary(Case), ClientId = atom_to_binary(Case),
{ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), case ?config(group, Config) of
{ok, _} = emqtt:connect(C), cluster ->
[{client, C}, {clientid, ClientId} | Config]. [{clientid, ClientId} | Config];
_ ->
{ok, C} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
[{client, C}, {clientid, ClientId} | Config]
end.
end_per_testcase(_Case, Config) -> end_per_testcase(_Case, Config) ->
C = ?config(client, Config), _ = [ok = emqtt:stop(C) || {client, C} <- Config],
ok = emqtt:stop(C),
ok. ok.
init_per_group(cluster, Config) -> init_per_group(Group = cluster, Config) ->
Node = emqx_ft_test_helpers:start_additional_node(Config, emqx_ft1), Cluster = mk_cluster_specs(Config),
[{additional_node, Node} | Config]; ct:pal("Starting ~p", [Cluster]),
init_per_group(_Group, Config) -> Nodes = [
Config. emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()})
|| {Name, Opts} <- Cluster
],
[{group, Group}, {cluster_nodes, Nodes} | Config];
init_per_group(Group, Config) ->
[{group, Group} | Config].
end_per_group(cluster, Config) -> end_per_group(cluster, Config) ->
ok = emqx_ft_test_helpers:stop_additional_node(Config); ok = lists:foreach(
fun emqx_ft_test_helpers:stop_additional_node/1,
?config(cluster_nodes, Config)
);
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
mk_cluster_specs(Config) ->
Specs = [
{core, emqx_ft_SUITE1, #{listener_ports => [{tcp, 2883}]}},
{core, emqx_ft_SUITE2, #{listener_ports => [{tcp, 3883}]}}
],
CommOpts = [
{env, [{emqx, boot_modules, [broker, listeners]}]},
{apps, [emqx_ft]},
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
{env_handler, fun
(emqx_ft) ->
ok = emqx_config:put([file_transfer, storage], #{
type => local,
root => emqx_ft_test_helpers:ft_root(Config, node())
});
(_) ->
ok
end}
],
emqx_common_test_helpers:emqx_cluster(
Specs,
CommOpts
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Tests %% Tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -125,7 +167,7 @@ t_simple_transfer(Config) ->
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta), MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>, MetaTopic = <<"$file/", FileId/binary, "/init">>,
@ -145,7 +187,7 @@ t_simple_transfer(Config) ->
with_offsets(Data) with_offsets(Data)
), ),
FinTopic = <<"$file/", FileId/binary, "/fin">>, FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(C, FinTopic, <<>>, 1) emqtt:publish(C, FinTopic, <<>>, 1)
@ -194,7 +236,7 @@ t_no_meta(Config) ->
emqtt:publish(C, SegmentTopic, Data, 1) emqtt:publish(C, SegmentTopic, Data, 1)
), ),
FinTopic = <<"$file/", FileId/binary, "/fin">>, FinTopic = <<"$file/", FileId/binary, "/fin/42">>,
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, FinTopic, <<>>, 1) emqtt:publish(C, FinTopic, <<>>, 1)
@ -208,7 +250,7 @@ t_no_segment(Config) ->
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta), MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>, MetaTopic = <<"$file/", FileId/binary, "/init">>,
@ -229,7 +271,7 @@ t_no_segment(Config) ->
tl(with_offsets(Data)) tl(with_offsets(Data))
), ),
FinTopic = <<"$file/", FileId/binary, "/fin">>, FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, FinTopic, <<>>, 1) emqtt:publish(C, FinTopic, <<>>, 1)
@ -264,7 +306,7 @@ t_invalid_checksum(Config) ->
Data = [<<"first">>, <<"second">>, <<"third">>], Data = [<<"first">>, <<"second">>, <<"third">>],
Meta = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}), MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}),
MetaTopic = <<"$file/", FileId/binary, "/init">>, MetaTopic = <<"$file/", FileId/binary, "/init">>,
@ -284,14 +326,15 @@ t_invalid_checksum(Config) ->
with_offsets(Data) with_offsets(Data)
), ),
FinTopic = <<"$file/", FileId/binary, "/fin">>, FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, FinTopic, <<>>, 1) emqtt:publish(C, FinTopic, <<>>, 1)
). ).
t_switch_node(Config) -> t_switch_node(Config) ->
AdditionalNodePort = emqx_ft_test_helpers:tcp_port(?config(additional_node, Config)), [Node | _] = ?config(cluster_nodes, Config),
AdditionalNodePort = emqx_ft_test_helpers:tcp_port(Node),
ClientId = <<"t_switch_node-migrating_client">>, ClientId = <<"t_switch_node-migrating_client">>,
@ -306,7 +349,7 @@ t_switch_node(Config) ->
%% First, publist metadata and the first segment to the additional node %% First, publist metadata and the first segment to the additional node
Meta = meta(Filename, Data), Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta), MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>, MetaTopic = <<"$file/", FileId/binary, "/init">>,
@ -335,7 +378,7 @@ t_switch_node(Config) ->
emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1) emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1)
), ),
FinTopic = <<"$file/", FileId/binary, "/fin">>, FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName( ?assertRCName(
success, success,
emqtt:publish(C2, FinTopic, <<>>, 1) emqtt:publish(C2, FinTopic, <<>>, 1)
@ -360,13 +403,134 @@ t_assemble_crash(Config) ->
C = ?config(client, Config), C = ?config(client, Config),
meck:new(emqx_ft_storage_fs), meck:new(emqx_ft_storage_fs),
meck:expect(emqx_ft_storage_fs, assemble, fun(_, _) -> meck:exception(error, oops) end), meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end),
?assertRCName( ?assertRCName(
unspecified_error, unspecified_error,
emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1) emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1)
). ).
t_unreliable_migrating_client(Config) ->
NodeSelf = node(),
[Node1, Node2] = ?config(cluster_nodes, Config),
ClientId = ?config(clientid, Config),
FileId = emqx_guid:to_hexstr(emqx_guid:gen()),
Filename = "migratory-birds-in-southern-hemisphere-2013.pdf",
Filesize = 1000,
Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16),
Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)),
Meta = meta(Filename, Payload),
Context = #{
clientid => ClientId,
fileid => FileId,
filesize => Filesize,
payload => Payload
},
Commands = [
{fun connect_mqtt_client/2, [NodeSelf]},
{fun send_filemeta/2, [Meta]},
{fun send_segment/3, [0, 100]},
{fun send_segment/3, [100, 100]},
{fun send_segment/3, [200, 100]},
{fun stop_mqtt_client/1, []},
{fun connect_mqtt_client/2, [Node1]},
{fun connect_mqtt_client/2, [Node2]},
{fun send_filemeta/2, [Meta]},
{fun send_segment/3, [0, 200]},
{fun send_segment/3, [200, 200]},
{fun send_segment/3, [400, 100]},
{fun connect_mqtt_client/2, [Node2]},
{fun send_segment/3, [200, 200]},
{fun send_segment/3, [400, 200]},
{fun connect_mqtt_client/2, [Node1]},
{fun send_segment/3, [400, 200]},
{fun send_segment/3, [600, eof]},
{fun send_finish/1, []},
{fun connect_mqtt_client/2, [NodeSelf]},
{fun send_finish/1, []}
],
_Context = run_commands(Commands, Context),
{ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(),
ReadyTransferIds =
[Id || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers, CId == ClientId],
Node1Bin = atom_to_binary(Node1),
NodeSelfBin = atom_to_binary(NodeSelf),
?assertMatch(
[#{<<"node">> := Node1Bin}, #{<<"node">> := NodeSelfBin}],
lists:sort(ReadyTransferIds)
),
[
begin
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(Id),
?assertEqual(
Payload,
iolist_to_binary(qlc:eval(TableQH))
)
end
|| Id <- ReadyTransferIds
].
run_commands(Commands, Context) ->
lists:foldl(fun run_command/2, Context, Commands).
run_command({Command, Args}, Context) ->
ct:pal("COMMAND ~p ~p", [erlang:fun_info(Command, name), Args]),
erlang:apply(Command, Args ++ [Context]).
connect_mqtt_client(Node, ContextIn) ->
Context = #{clientid := ClientId} = disown_mqtt_client(ContextIn),
NodePort = emqx_ft_test_helpers:tcp_port(Node),
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, NodePort}]),
{ok, _} = emqtt:connect(Client),
Context#{client => Client}.
stop_mqtt_client(Context = #{client := Client}) ->
_ = emqtt:stop(Client),
maps:remove(client, Context).
disown_mqtt_client(Context = #{client := Client}) ->
_ = erlang:unlink(Client),
maps:remove(client, Context);
disown_mqtt_client(Context = #{}) ->
Context.
send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) ->
Topic = <<"$file/", FileId/binary, "/init">>,
MetaPayload = emqx_json:encode(Meta),
?assertRCName(
success,
emqtt:publish(Client, Topic, MetaPayload, 1)
),
Context.
send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}) ->
Topic = <<"$file/", FileId/binary, "/", (integer_to_binary(Offset))/binary>>,
Data =
case Size of
eof ->
binary:part(Payload, Offset, byte_size(Payload) - Offset);
N ->
binary:part(Payload, Offset, N)
end,
?assertRCName(
success,
emqtt:publish(Client, Topic, Data, 1)
),
Context.
send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize}) ->
Topic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName(
success,
emqtt:publish(Client, Topic, <<>>, 1)
),
Context.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -84,7 +84,7 @@ t_assemble_empty_transfer(Config) ->
]}, ]},
emqx_ft_storage_fs:list(Storage, Transfer, fragment) emqx_ft_storage_fs:list(Storage, Transfer, fragment)
), ),
Status = complete_assemble(Storage, Transfer), Status = complete_assemble(Storage, Transfer, 0),
?assertEqual({shutdown, ok}, Status), ?assertEqual({shutdown, ok}, Status),
?assertEqual( ?assertEqual(
{ok, <<>>}, {ok, <<>>},
@ -132,7 +132,7 @@ t_assemble_complete_local_transfer(Config) ->
Fragments Fragments
), ),
Status = complete_assemble(Storage, Transfer), Status = complete_assemble(Storage, Transfer, TransferSize),
?assertEqual({shutdown, ok}, Status), ?assertEqual({shutdown, ok}, Status),
AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename), AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
@ -171,20 +171,20 @@ t_assemble_incomplete_transfer(Config) ->
expire_at => 42 expire_at => 42
}, },
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta), ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
Status = complete_assemble(Storage, Transfer), Status = complete_assemble(Storage, Transfer, TransferSize),
?assertMatch({shutdown, {error, _}}, Status). ?assertMatch({shutdown, {error, _}}, Status).
t_assemble_no_meta(Config) -> t_assemble_no_meta(Config) ->
Storage = storage(Config), Storage = storage(Config),
Transfer = {?CLIENTID2, ?config(file_id, Config)}, Transfer = {?CLIENTID2, ?config(file_id, Config)},
Status = complete_assemble(Storage, Transfer), Status = complete_assemble(Storage, Transfer, 42),
?assertMatch({shutdown, {error, {incomplete, _}}}, Status). ?assertMatch({shutdown, {error, {incomplete, _}}}, Status).
complete_assemble(Storage, Transfer) -> complete_assemble(Storage, Transfer, Size) ->
complete_assemble(Storage, Transfer, 1000). complete_assemble(Storage, Transfer, Size, 1000).
complete_assemble(Storage, Transfer, Timeout) -> complete_assemble(Storage, Transfer, Size, Timeout) ->
{async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer), {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
MRef = erlang:monitor(process, Pid), MRef = erlang:monitor(process, Pid),
Pid ! kickoff, Pid ! kickoff,
receive receive

View File

@ -67,7 +67,7 @@ init_per_group(_Group, Config) ->
Config. Config.
end_per_group(cluster, Config) -> end_per_group(cluster, Config) ->
ok = emqx_ft_test_helpers:stop_additional_node(Config); ok = emqx_ft_test_helpers:stop_additional_node(?config(additional_node, Config));
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.

View File

@ -21,13 +21,12 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
start_additional_node(Config, Node) -> start_additional_node(Config, Name) ->
SelfNode = node(),
emqx_common_test_helpers:start_slave( emqx_common_test_helpers:start_slave(
Node, Name,
[ [
{apps, [emqx_ft]}, {apps, [emqx_ft]},
{join_to, SelfNode}, {join_to, node()},
{configure_gen_rpc, true}, {configure_gen_rpc, true},
{env_handler, fun {env_handler, fun
(emqx_ft) -> (emqx_ft) ->
@ -40,8 +39,7 @@ start_additional_node(Config, Node) ->
] ]
). ).
stop_additional_node(Config) -> stop_additional_node(Node) ->
Node = ?config(additional_node, Config),
ok = rpc:call(Node, ekka, leave, []), ok = rpc:call(Node, ekka, leave, []),
ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]), ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]),
{ok, _} = emqx_common_test_helpers:stop_slave(Node), {ok, _} = emqx_common_test_helpers:stop_slave(Node),
@ -58,13 +56,14 @@ ft_root(Config, Node) ->
upload_file(ClientId, FileId, Data, Node) -> upload_file(ClientId, FileId, Data, Node) ->
Port = tcp_port(Node), Port = tcp_port(Node),
Size = byte_size(Data),
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
Meta = #{ Meta = #{
name => FileId, name => FileId,
expire_at => erlang:system_time(_Unit = second) + 3600, expire_at => erlang:system_time(_Unit = second) + 3600,
size => byte_size(Data) size => Size
}, },
MetaPayload = emqx_json:encode(Meta), MetaPayload = emqx_json:encode(Meta),
@ -72,6 +71,6 @@ upload_file(ClientId, FileId, Data, Node) ->
{ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), {ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
{ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1), {ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1),
FinTopic = <<"$file/", FileId/binary, "/fin">>, FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>,
{ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1), {ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1),
ok = emqtt:stop(C1). ok = emqtt:stop(C1).