diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 42611e537..dba087484 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -268,8 +268,8 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() -> case assemble(Transfer, FinalSize) of %% Assembling completed, ack through the responder right away - % ok -> - % emqx_ft_responder:ack(FinPacketKey, ok); + ok -> + emqx_ft_responder:ack(FinPacketKey, ok); %% Assembling started, packet will be acked by the responder {async, Pid} -> ok = emqx_ft_responder:kickoff(FinPacketKey, Pid), diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 767930f98..19f6c48d1 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -24,6 +24,8 @@ -export([handle_event/4]). -export([terminate/3]). +-export([where/1]). + -type stdata() :: #{ storage := emqx_ft_storage_fs:storage(), transfer := emqx_ft:transfer(), @@ -39,6 +41,9 @@ start_link(Storage, Transfer, Size) -> gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []). +where(Transfer) -> + gproc:where(?NAME(Transfer)). + %% -type state() :: diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 9f77c8afb..71f2f2748 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -327,7 +327,9 @@ list(_Options, Query = #{transfer := _Transfer}) -> #{items := Exports = [_ | _]} -> {ok, #{items => Exports}}; #{items := [], errors := NodeErrors} -> - {error, NodeErrors} + {error, NodeErrors}; + #{items := []} -> + {ok, #{items => []}} end; list(_Options, Query) -> Result = list(Query), diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index cdc86d218..010d004a1 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -35,6 +35,7 @@ -export([read_filemeta/2]). -export([list/3]). -export([pread/5]). +-export([lookup_local_assembler/1]). -export([assemble/3]). -export([transfers/1]). @@ -211,11 +212,15 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> end. -spec assemble(storage(), transfer(), emqx_ft:bytes()) -> - {async, _Assembler :: pid()} | {error, _TODO}. + {async, _Assembler :: pid()} | ok | {error, _TODO}. assemble(Storage, Transfer, Size) -> - % TODO: ask cluster if the transfer is already assembled - {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size), - {async, Pid}. + LookupSources = [ + fun() -> lookup_local_assembler(Transfer) end, + fun() -> lookup_remote_assembler(Transfer) end, + fun() -> check_if_already_exported(Storage, Transfer) end, + fun() -> ensure_local_assembler(Storage, Transfer, Size) end + ], + lookup_assembler(LookupSources). %% @@ -252,6 +257,44 @@ stop(Storage) -> %% +lookup_assembler([LastSource]) -> + LastSource(); +lookup_assembler([Source | Sources]) -> + case Source() of + {error, not_found} -> lookup_assembler(Sources); + Result -> Result + end. + +check_if_already_exported(Storage, Transfer) -> + case files(Storage, #{transfer => Transfer}) of + {ok, #{items := [_ | _]}} -> ok; + _ -> {error, not_found} + end. + +lookup_local_assembler(Transfer) -> + case emqx_ft_assembler:where(Transfer) of + Pid when is_pid(Pid) -> {async, Pid}; + _ -> {error, not_found} + end. + +lookup_remote_assembler(Transfer) -> + Nodes = emqx:running_nodes() -- [node()], + Assemblers = lists:flatmap( + fun + ({ok, {async, Pid}}) -> [Pid]; + (_) -> [] + end, + emqx_ft_storage_fs_proto_v1:list_assemblers(Nodes, Transfer) + ), + case Assemblers of + [Pid | _] -> {async, Pid}; + _ -> {error, not_found} + end. + +ensure_local_assembler(Storage, Transfer, Size) -> + {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size), + {async, Pid}. + -spec transfers(storage()) -> {ok, #{transfer() => transferinfo()}}. transfers(Storage) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl index 597f84091..68cd4c2fd 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl @@ -22,7 +22,8 @@ -export([ list_local/2, - pread_local/4 + pread_local/4, + lookup_local_assembler/1 ]). list_local(Transfer, What) -> @@ -30,3 +31,6 @@ list_local(Transfer, What) -> pread_local(Transfer, Frag, Offset, Size) -> emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]). + +lookup_local_assembler(Transfer) -> + emqx_ft_storage:with_storage_type(local, lookup_local_assembler, [Transfer]). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index 3b91684e6..d09b9ec9a 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -22,6 +22,7 @@ -export([multilist/3]). -export([pread/5]). +-export([list_assemblers/2]). -type offset() :: emqx_ft:offset(). -type transfer() :: emqx_ft:transfer(). @@ -41,3 +42,8 @@ multilist(Nodes, Transfer, What) -> {ok, [filefrag()]} | {error, term()} | no_return(). pread(Node, Transfer, Frag, Offset, Size) -> erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]). + +-spec list_assemblers([node()], transfer()) -> + emqx_rpc:erpc_multicall([pid()]). +list_assemblers(Nodes, Transfer) -> + erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, lookup_local_assembler, [Transfer]). diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 6b675e0c0..97f0cbbcc 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -44,7 +44,8 @@ groups() -> group_cluster() -> [ t_switch_node, - t_unreliable_migrating_client + t_unreliable_migrating_client, + t_concurrent_fins ]. init_per_suite(Config) -> @@ -549,21 +550,11 @@ t_unreliable_migrating_client(Config) -> Exports = list_files(?config(clientid, Config)), - % NOTE - % The cluster had 2 assemblers running on two different nodes, because client sent `fin` - % twice. This is currently expected, files must be identical anyway. Node1Str = atom_to_list(Node1), - NodeSelfStr = atom_to_list(NodeSelf), % TODO: this testcase is specific to local fs storage backend ?assertMatch( - [#{"node" := Node1Str}, #{"node" := NodeSelfStr}], - lists:map( - fun(#{uri := URIString}) -> - #{query := QS} = uri_string:parse(URIString), - maps:from_list(uri_string:dissect_query(QS)) - end, - lists:sort(Exports) - ) + [#{"node" := Node1Str}], + fs_exported_file_attributes(Exports) ), [ @@ -571,6 +562,84 @@ t_unreliable_migrating_client(Config) -> || Export <- Exports ]. +t_concurrent_fins(Config) -> + NodeSelf = node(), + [Node1, Node2] = ?config(cluster_nodes, Config), + + ClientId = ?config(clientid, Config), + FileId = emqx_guid:to_hexstr(emqx_guid:gen()), + Filename = "migratory-birds-in-southern-hemisphere-2013.pdf", + Filesize = 100, + Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16), + Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)), + Meta = meta(Filename, Payload), + + %% Send filemeta and segments to Node1 + Context0 = #{ + clientid => ClientId, + fileid => FileId, + filesize => Filesize, + payload => Payload + }, + + Context1 = run_commands( + [ + {fun connect_mqtt_client/2, [Node1]}, + {fun send_filemeta/2, [Meta]}, + {fun send_segment/3, [0, 100]}, + {fun stop_mqtt_client/1, []} + ], + Context0 + ), + + %% Now send fins concurrently to the 3 nodes + Self = self(), + Nodes = [Node1, Node2, NodeSelf], + FinSenders = lists:map( + fun(Node) -> + %% takeovers and disconnects will happen due to concurrency + _ = erlang:process_flag(trap_exit, true), + _Context = run_commands( + [ + {fun connect_mqtt_client/2, [Node]}, + {fun send_finish/1, []} + ], + Context1 + ), + Self ! {done, Node} + end, + Nodes + ), + ok = lists:foreach( + fun(F) -> + _Pid = spawn_link(F) + end, + FinSenders + ), + ok = lists:foreach( + fun(Node) -> + receive + {done, Node} -> ok + after 1000 -> + ct:fail("Node ~p did not send finish successfully", [Node]) + end + end, + Nodes + ), + + %% Only one node should have the file + Exports = list_files(?config(clientid, Config)), + ?assertMatch( + [#{"node" := _Node}], + fs_exported_file_attributes(Exports) + ). + +%%------------------------------------------------------------------------------ +%% Command helpers +%%------------------------------------------------------------------------------ + +%% Command runners + run_commands(Commands, Context) -> lists:foldl(fun run_command/2, Context, Commands). @@ -578,6 +647,8 @@ run_command({Command, Args}, Context) -> ct:pal("COMMAND ~p ~p", [erlang:fun_info(Command, name), Args]), erlang:apply(Command, Args ++ [Context]). +%% Commands + connect_mqtt_client(Node, ContextIn) -> Context = #{clientid := ClientId} = disown_mqtt_client(ContextIn), NodePort = emqx_ft_test_helpers:tcp_port(Node), @@ -623,9 +694,18 @@ send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize ), Context. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Helpers -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ + +fs_exported_file_attributes(FSExports) -> + lists:map( + fun(#{uri := URIString}) -> + #{query := QS} = uri_string:parse(URIString), + maps:from_list(uri_string:dissect_query(QS)) + end, + lists:sort(FSExports) + ). mk_init_topic(FileId) -> <<"$file/", FileId/binary, "/init">>.