diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index ac419cb36..348916920 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -35,7 +35,7 @@ decode_filemeta/1 ]). --export([on_assemble/2]). +-export([on_complete/4]). -export_type([ clientid/0, @@ -76,7 +76,8 @@ -type segment() :: {offset(), _Content :: binary()}. --define(ASSEMBLE_TIMEOUT, 5000). +-define(STORE_SEGMENT_TIMEOUT, 10000). +-define(ASSEMBLE_TIMEOUT, 60000). %%-------------------------------------------------------------------- %% API for app @@ -143,52 +144,59 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> on_file_command(PacketId, Msg, FileCommand) -> case string:split(FileCommand, <<"/">>, all) of [FileId, <<"init">>] -> - on_init(Msg, FileId); + on_init(PacketId, Msg, transfer(Msg, FileId)); [FileId, <<"fin">>] -> - on_fin(PacketId, Msg, FileId, undefined); + on_fin(PacketId, Msg, transfer(Msg, FileId), undefined); [FileId, <<"fin">>, Checksum] -> - on_fin(PacketId, Msg, FileId, Checksum); + on_fin(PacketId, Msg, transfer(Msg, FileId), Checksum); [FileId, <<"abort">>] -> - on_abort(Msg, FileId); + on_abort(Msg, transfer(Msg, FileId)); [FileId, OffsetBin] -> validate([{offset, OffsetBin}], fun([Offset]) -> - on_segment(Msg, FileId, Offset, undefined) + on_segment(PacketId, Msg, transfer(Msg, FileId), Offset, undefined) end); [FileId, OffsetBin, ChecksumBin] -> validate([{offset, OffsetBin}, {checksum, ChecksumBin}], fun([Offset, Checksum]) -> - on_segment(Msg, FileId, Offset, Checksum) + on_segment(PacketId, Msg, transfer(Msg, FileId), Offset, Checksum) end); _ -> ?RC_UNSPECIFIED_ERROR end. -on_init(Msg, FileId) -> +on_init(PacketId, Msg, Transfer) -> ?SLOG(info, #{ msg => "on_init", mqtt_msg => Msg, - file_id => FileId + packet_id => PacketId, + transfer => Transfer }), Payload = Msg#message.payload, + PacketKey = {self(), PacketId}, % %% Add validations here case decode_filemeta(Payload) of {ok, Meta} -> - case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of - ok -> - ?RC_SUCCESS; - {error, Reason} -> - ?SLOG(warning, #{ - msg => "store_filemeta_failed", - mqtt_msg => Msg, - file_id => FileId, - reason => Reason - }), - ?RC_UNSPECIFIED_ERROR - end; + Callback = fun(Result) -> + ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result) + end, + with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> + case store_filemeta(Transfer, Meta) of + % Stored, ack through the responder right away + ok -> + emqx_ft_responder:ack(PacketKey, ok); + % Storage operation started, packet will be acked by the responder + {async, Pid} -> + ok = emqx_ft_responder:kickoff(PacketKey, Pid), + ok; + %% Storage operation failed, ack through the responder + {error, _} = Error -> + emqx_ft_responder:ack(PacketKey, Error) + end + end); {error, Reason} -> ?SLOG(error, #{ msg => "on_init: invalid filemeta", mqtt_msg => Msg, - file_id => FileId, + transfer => Transfer, reason => Reason }), ?RC_UNSPECIFIED_ERROR @@ -198,48 +206,69 @@ on_abort(_Msg, _FileId) -> %% TODO ?RC_SUCCESS. -on_segment(Msg, FileId, Offset, Checksum) -> +on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> ?SLOG(info, #{ msg => "on_segment", mqtt_msg => Msg, - file_id => FileId, + packet_id => PacketId, + transfer => Transfer, offset => Offset, checksum => Checksum }), %% TODO: handle checksum Payload = Msg#message.payload, Segment = {Offset, Payload}, + PacketKey = {self(), PacketId}, + Callback = fun(Result) -> + ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result) + end, %% Add offset/checksum validations - case emqx_ft_storage:store_segment(transfer(Msg, FileId), Segment) of - ok -> - ?RC_SUCCESS; - {error, _Reason} -> - ?RC_UNSPECIFIED_ERROR - end. + with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> + case store_segment(Transfer, Segment) of + ok -> + emqx_ft_responder:ack(PacketKey, ok); + {async, Pid} -> + ok = emqx_ft_responder:kickoff(PacketKey, Pid), + ok; + {error, _} = Error -> + emqx_ft_responder:ack(PacketKey, Error) + end + end). -on_fin(PacketId, Msg, FileId, Checksum) -> +on_fin(PacketId, Msg, Transfer, Checksum) -> ?SLOG(info, #{ msg => "on_fin", mqtt_msg => Msg, - file_id => FileId, - checksum => Checksum, - packet_id => PacketId + packet_id => PacketId, + transfer => Transfer, + checksum => Checksum }), %% TODO: handle checksum? Do we need it? FinPacketKey = {self(), PacketId}, - case emqx_ft_responder:start(FinPacketKey, fun ?MODULE:on_assemble/2, ?ASSEMBLE_TIMEOUT) of - %% We have new fin packet + Callback = fun(Result) -> + ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) + end, + with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() -> + case assemble(Transfer) of + %% Assembling completed, ack through the responder right away + ok -> + emqx_ft_responder:ack(FinPacketKey, ok); + %% Assembling started, packet will be acked by the responder + {async, Pid} -> + ok = emqx_ft_responder:kickoff(FinPacketKey, Pid), + ok; + %% Assembling failed, ack through the responder + {error, _} = Error -> + emqx_ft_responder:ack(FinPacketKey, Error) + end + end). + +with_responder(Key, Callback, Timeout, CriticalSection) -> + case emqx_ft_responder:start(Key, Callback, Timeout) of + %% We have new packet {ok, _} -> - Callback = fun(Result) -> emqx_ft_responder:ack(FinPacketKey, Result) end, - case assemble(transfer(Msg, FileId), Callback) of - %% Assembling started, packet will be acked by the callback or the responder - {ok, _} -> - ok; - %% Assembling failed, ack through the responder - {error, _} = Error -> - emqx_ft_responder:ack(FinPacketKey, Error) - end; - %% Fin packet already received. + CriticalSection(); + %% Packet already received. %% Since we are still handling the previous one, %% we probably have retransmit here {error, {already_started, _}} -> @@ -247,13 +276,35 @@ on_fin(PacketId, Msg, FileId, Checksum) -> end, undefined. -assemble(Transfer, Callback) -> +store_filemeta(Transfer, Segment) -> try - emqx_ft_storage:assemble(Transfer, Callback) + emqx_ft_storage:store_filemeta(Transfer, Segment) catch C:E:S -> - ?SLOG(warning, #{ - msg => "file_assemble_failed", class => C, reason => E, stacktrace => S + ?SLOG(error, #{ + msg => "start_store_filemeta_failed", class => C, reason => E, stacktrace => S + }), + {error, {internal_error, E}} + end. + +store_segment(Transfer, Segment) -> + try + emqx_ft_storage:store_segment(Transfer, Segment) + catch + C:E:S -> + ?SLOG(error, #{ + msg => "start_store_segment_failed", class => C, reason => E, stacktrace => S + }), + {error, {internal_error, E}} + end. + +assemble(Transfer) -> + try + emqx_ft_storage:assemble(Transfer) + catch + C:E:S -> + ?SLOG(error, #{ + msg => "start_assemble_failed", class => C, reason => E, stacktrace => S }), {error, {internal_error, E}} end. @@ -262,14 +313,28 @@ transfer(Msg, FileId) -> ClientId = Msg#message.from, {ClientId, FileId}. -on_assemble({ChanPid, PacketId}, Result) -> - ?SLOG(debug, #{msg => "on_assemble", packet_id => PacketId, result => Result}), +on_complete(Op, {ChanPid, PacketId}, Transfer, Result) -> + ?SLOG(debug, #{ + msg => "on_complete", + operation => Op, + packet_id => PacketId, + transfer => Transfer + }), case Result of - {ack, ok} -> + {Mode, ok} when Mode == ack orelse Mode == down -> erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); - {ack, {error, _}} -> + {Mode, {error, _} = Reason} when Mode == ack orelse Mode == down -> + ?SLOG(error, #{ + msg => Op ++ "_failed", + transfer => Transfer, + reason => Reason + }), erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}); timeout -> + ?SLOG(error, #{ + msg => Op ++ "_timed_out", + transfer => Transfer + }), erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) end. diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 38faebf03..083b4afcc 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -18,36 +18,31 @@ -include_lib("emqx/include/logger.hrl"). --export([start_link/3]). +-export([start_link/2]). -behaviour(gen_statem). -export([callback_mode/0]). -export([init/1]). -% -export([list_local_fragments/3]). -% -export([list_remote_fragments/3]). -% -export([start_assembling/3]). -export([handle_event/4]). -% -export([handle_continue/2]). -% -export([handle_call/3]). -% -export([handle_cast/2]). - -record(st, { storage :: _Storage, transfer :: emqx_ft:transfer(), assembly :: _TODO, file :: {file:filename(), io:device(), term()} | undefined, - hash, - callback :: fun((ok | {error, term()}) -> any()) + hash }). --define(RPC_LIST_TIMEOUT, 1000). --define(RPC_READSEG_TIMEOUT, 5000). +-define(NAME(Transfer), {n, l, {?MODULE, Transfer}}). +-define(REF(Transfer), {via, gproc, ?NAME(Transfer)}). %% -start_link(Storage, Transfer, Callback) -> - gen_statem:start_link(?MODULE, {Storage, Transfer, Callback}, []). +start_link(Storage, Transfer) -> + %% TODO + %% Additional callbacks? They won't survive restarts by the supervisor, which brings a + %% question if we even need to retry with the help of supervisor. + gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer}, []). %% @@ -56,16 +51,21 @@ start_link(Storage, Transfer, Callback) -> callback_mode() -> handle_event_function. -init({Storage, Transfer, Callback}) -> +init({Storage, Transfer}) -> St = #st{ storage = Storage, transfer = Transfer, assembly = emqx_ft_assembly:new(), - hash = crypto:hash_init(sha256), - callback = Callback + hash = crypto:hash_init(sha256) }, - {ok, list_local_fragments, St, ?internal([])}. + {ok, idle, St}. +handle_event(info, kickoff, idle, St) -> + % NOTE + % Someone's told us to start the work, which usually means that it has set up a monitor. + % We could wait for this message and handle it at the end of the assembling rather than at + % the beginning, however it would make error handling much more messier. + {next_state, list_local_fragments, St, ?internal([])}; handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> % TODO: what we do with non-transients errors here (e.g. `eacces`)? {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment), @@ -76,10 +76,10 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> {next_state, start_assembling, NSt, ?internal([])}; {incomplete, _} -> Nodes = mria_mnesia:running_nodes() -- [node()], - {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])} + {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}; % TODO: recovery? - % {error, _} = Reason -> - % {stop, Reason} + {error, _} = Error -> + {stop, {shutdown, Error}} end; handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> % TODO @@ -107,12 +107,14 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> {next_state, start_assembling, NSt, ?internal([])}; % TODO: retries / recovery? {incomplete, _} = Status -> - {next_state, {failure, {error, Status}}, NSt, ?internal([])} + {stop, {shutdown, {error, Status}}}; + {error, _} = Error -> + {stop, {shutdown, Error}} end; handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Coverage = emqx_ft_assembly:coverage(Asm), - % TODO: errors + % TODO: better error handling {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta), {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])}; handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> @@ -120,50 +122,16 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % Currently, race is possible between getting segment info from the remote node and % this node garbage collecting the segment itself. % TODO: pipelining - case pread(Node, Segment, St) of - {ok, Content} -> - case emqx_ft_storage_fs:write(St#st.file, Content) of - {ok, NHandle} -> - {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}; - %% TODO: better error handling - {error, _} = Error -> - {next_state, {failure, Error}, St, ?internal([])} - end; - {error, _} = Error -> - %% TODO: better error handling - {next_state, {failure, Error}, St, ?internal([])} - end; + % TODO: better error handling + {ok, Content} = pread(Node, Segment, St), + {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), + {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}; handle_event(internal, _, {assemble, []}, St = #st{}) -> {next_state, complete, St, ?internal([])}; -handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) -> +handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), - _ = safe_apply(Callback, Result), - {stop, shutdown}; -handle_event(internal, _, {failure, Error}, #st{callback = Callback}) -> - _ = safe_apply(Callback, Error), - {stop, Error}. - -% handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) -> -% % TODO: what we do with non-transients errors here (e.g. `eacces`)? -% {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer), -% NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), -% NSt = St#st{assembly = NAsm}, -% case emqx_ft_assembly:status(NAsm) of -% complete -> -% {noreply, NSt, {continue}}; -% {more, _} -> -% error(noimpl); -% {error, _} -> -% error(noimpl) -% end, -% {noreply, St}. - -% handle_call(_Call, _From, St) -> -% {reply, {error, badcall}, St}. - -% handle_cast(_Cast, St) -> -% {noreply, St}. + {stop, {shutdown, Result}}. pread(Node, Segment, St) when Node =:= node() -> emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)); diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index 17aa4d998..34783cbd3 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -17,7 +17,7 @@ -module(emqx_ft_assembler_sup). -export([start_link/0]). --export([start_child/3]). +-export([ensure_child/2]). -behaviour(supervisor). -export([init/1]). @@ -25,13 +25,18 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -start_child(Storage, Transfer, Callback) -> +ensure_child(Storage, Transfer) -> Childspec = #{ - id => {Storage, Transfer}, - start => {emqx_ft_assembler, start_link, [Storage, Transfer, Callback]}, + id => Transfer, + start => {emqx_ft_assembler, start_link, [Storage, Transfer]}, restart => temporary }, - supervisor:start_child(?MODULE, Childspec). + case supervisor:start_child(?MODULE, Childspec) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid} + end. init(_) -> SupFlags = #{ diff --git a/apps/emqx_ft/src/emqx_ft_responder.erl b/apps/emqx_ft/src/emqx_ft_responder.erl index 7b9220774..fb823c433 100644 --- a/apps/emqx_ft/src/emqx_ft_responder.erl +++ b/apps/emqx_ft/src/emqx_ft_responder.erl @@ -25,6 +25,7 @@ %% API -export([start/3]). +-export([kickoff/2]). -export([ack/2]). %% Supervisor API @@ -35,7 +36,7 @@ -define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}). -type key() :: term(). --type respfun() :: fun(({ack, _Result} | timeout) -> _SideEffect). +-type respfun() :: fun(({ack, _Result} | {down, _Result} | timeout) -> _SideEffect). %%-------------------------------------------------------------------- %% API @@ -45,6 +46,10 @@ start(Key, RespFun, Timeout) -> emqx_ft_responder_sup:start_child(Key, RespFun, Timeout). +-spec kickoff(key(), pid()) -> ok. +kickoff(Key, Pid) -> + gen_server:call(?REF(Key), {kickoff, Pid}). + -spec ack(key(), _Result) -> _Return. ack(Key, Result) -> % TODO: it's possible to avoid term copy @@ -63,8 +68,13 @@ init({Key, RespFun, Timeout}) -> _TRef = erlang:send_after(Timeout, self(), timeout), {ok, {Key, RespFun}}. +handle_call({kickoff, Pid}, _From, St) -> + % TODO: more state? + _MRef = erlang:monitor(process, Pid), + _ = Pid ! kickoff, + {reply, ok, St}; handle_call({ack, Result}, _From, {Key, RespFun}) -> - Ret = apply(RespFun, [Key, {ack, Result}]), + Ret = apply(RespFun, [{ack, Result}]), ?tp(ft_responder_ack, #{key => Key, result => Result, return => Ret}), {stop, {shutdown, Ret}, Ret, undefined}; handle_call(Msg, _From, State) -> @@ -76,9 +86,13 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info(timeout, {Key, RespFun}) -> - Ret = apply(RespFun, [Key, timeout]), + Ret = apply(RespFun, [timeout]), ?tp(ft_responder_timeout, #{key => Key, return => Ret}), {stop, {shutdown, Ret}, undefined}; +handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) -> + Ret = apply(RespFun, [{down, map_down_reason(Reason)}]), + ?tp(ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}), + {stop, {shutdown, Ret}, undefined}; handle_info(Msg, State) -> ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}), {noreply, State}. @@ -86,6 +100,17 @@ handle_info(Msg, State) -> terminate(_Reason, undefined) -> ok; terminate(Reason, {Key, RespFun}) -> - Ret = apply(RespFun, [Key, timeout]), + Ret = apply(RespFun, [timeout]), ?tp(ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}), ok. + +map_down_reason(normal) -> + ok; +map_down_reason(shutdown) -> + ok; +map_down_reason({shutdown, Result}) -> + Result; +map_down_reason(noproc) -> + {error, noproc}; +map_down_reason(Error) -> + {error, {internal_error, Error}}. diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 0dd9d7989..6ca9e9ecb 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -20,7 +20,7 @@ [ store_filemeta/2, store_segment/2, - assemble/2, + assemble/1, ready_transfers/0, get_ready_transfer/1, @@ -43,12 +43,18 @@ %% Behaviour %%-------------------------------------------------------------------- +%% NOTE +%% An async task will wait for a `kickoff` message to start processing, to give some time +%% to set up monitors, etc. Async task will not explicitly report the processing result, +%% you are expected to receive and handle exit reason of the process, which is +%% -type result() :: `{shutdown, ok | {error, _}}`. + -callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) -> - ok | {error, term()}. + ok | {async, pid()} | {error, term()}. -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) -> - ok | {error, term()}. --callback assemble(storage(), emqx_ft:transfer(), assemble_callback()) -> - {ok, pid()} | {error, term()}. + ok | {async, pid()} | {error, term()}. +-callback assemble(storage(), emqx_ft:transfer()) -> + ok | {async, pid()} | {error, term()}. -callback ready_transfers(storage()) -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. -callback get_ready_transfer(storage(), ready_transfer_id()) -> @@ -59,22 +65,22 @@ %%-------------------------------------------------------------------- -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) -> - ok | {error, term()}. + ok | {async, pid()} | {error, term()}. store_filemeta(Transfer, FileMeta) -> Mod = mod(), Mod:store_filemeta(storage(), Transfer, FileMeta). -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) -> - ok | {error, term()}. + ok | {async, pid()} | {error, term()}. store_segment(Transfer, Segment) -> Mod = mod(), Mod:store_segment(storage(), Transfer, Segment). --spec assemble(emqx_ft:transfer(), assemble_callback()) -> - {ok, pid()} | {error, term()}. -assemble(Transfer, Callback) -> +-spec assemble(emqx_ft:transfer()) -> + ok | {async, pid()} | {error, term()}. +assemble(Transfer) -> Mod = mod(), - Mod:assemble(storage(), Transfer, Callback). + Mod:assemble(storage(), Transfer). -spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. ready_transfers() -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index c6cb09cf2..ef032a639 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -24,7 +24,7 @@ -export([store_segment/3]). -export([list/3]). -export([pread/5]). --export([assemble/3]). +-export([assemble/2]). -export([transfers/1]). @@ -168,11 +168,13 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> {error, Reason} end. --spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) -> +-spec assemble(storage(), transfer()) -> % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. - {ok, _Assembler :: pid()} | {error, _TODO}. -assemble(Storage, Transfer, Callback) -> - emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). + {async, _Assembler :: pid()} | {error, _TODO}. +assemble(Storage, Transfer) -> + % TODO: ask cluster if the transfer is already assembled + {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer), + {async, Pid}. get_ready_transfer(_Storage, ReadyTransferId) -> case parse_ready_transfer_id(ReadyTransferId) of diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 6a125449f..0ca298265 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -361,7 +361,7 @@ t_assemble_crash(Config) -> C = ?config(client, Config), meck:new(emqx_ft_storage_fs), - meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end), + meck:expect(emqx_ft_storage_fs, assemble, fun(_, _) -> meck:exception(error, oops) end), ?assertRCName( unspecified_error, diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index c5dbd418f..d4b619e43 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -37,7 +37,8 @@ all() -> ]. init_per_suite(Config) -> - Config. + Apps = application:ensure_all_started(gproc), + [{suite_apps, Apps} | Config]. end_per_suite(_Config) -> ok. @@ -83,9 +84,8 @@ t_assemble_empty_transfer(Config) -> ]}, emqx_ft_storage_fs:list(Storage, Transfer, fragment) ), - {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1), - {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}), - ?assertMatch(#{result := ok}, Event), + Status = complete_assemble(Storage, Transfer), + ?assertEqual({shutdown, ok}, Status), ?assertEqual( {ok, <<>>}, % TODO @@ -132,9 +132,8 @@ t_assemble_complete_local_transfer(Config) -> Fragments ), - {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1), - {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}), - ?assertMatch(#{result := ok}, Event), + Status = complete_assemble(Storage, Transfer), + ?assertEqual({shutdown, ok}, Status), AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename), ?assertMatch( @@ -172,37 +171,32 @@ t_assemble_incomplete_transfer(Config) -> expire_at => 42 }, ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta), - Self = self(), - {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) -> - Self ! {test_assembly_finished, Result} - end), - receive - {test_assembly_finished, Result} -> - ?assertMatch({error, _}, Result) - after 1000 -> - ct:fail("Assembler did not called callback") - end. + Status = complete_assemble(Storage, Transfer), + ?assertMatch({shutdown, {error, _}}, Status). t_assemble_no_meta(Config) -> Storage = storage(Config), Transfer = {?CLIENTID2, ?config(file_id, Config)}, - Self = self(), - {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) -> - Self ! {test_assembly_finished, Result} - end), + Status = complete_assemble(Storage, Transfer), + ?assertMatch({shutdown, {error, {incomplete, _}}}, Status). + +complete_assemble(Storage, Transfer) -> + complete_assemble(Storage, Transfer, 1000). + +complete_assemble(Storage, Transfer, Timeout) -> + {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer), + MRef = erlang:monitor(process, Pid), + Pid ! kickoff, receive - {test_assembly_finished, Result} -> - ?assertMatch({error, _}, Result) - after 1000 -> - ct:fail("Assembler did not called callback") + {'DOWN', MRef, process, Pid, Result} -> + Result + after Timeout -> + ct:fail("Assembler did not finish in time") end. mk_assembly_filename(Config, {ClientID, FileID}, Filename) -> filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]). -on_assembly_finished(Result) -> - ?tp(test_assembly_finished, #{result => Result}). - %% t_list_transfers(Config) -> diff --git a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl index 447d41f11..1674d05e4 100644 --- a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl @@ -40,7 +40,7 @@ end_per_testcase(_Case, _Config) -> t_start_ack(_Config) -> Key = <<"test">>, - DefaultAction = fun(_Key, {ack, Ref}) -> Ref end, + DefaultAction = fun({ack, Ref}) -> Ref end, ?assertMatch( {ok, _Pid}, emqx_ft_responder:start(Key, DefaultAction, 1000) @@ -62,7 +62,7 @@ t_start_ack(_Config) -> t_timeout(_Config) -> Key = <<"test">>, Self = self(), - DefaultAction = fun(K, timeout) -> Self ! {timeout, K} end, + DefaultAction = fun(timeout) -> Self ! {timeout, Key} end, {ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20), receive {timeout, Key} -> @@ -89,7 +89,7 @@ t_timeout(_Config) -> % ). t_unknown_msgs(_Config) -> - {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_, _) -> ok end, 100), + {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_) -> ok end, 100), Pid ! {unknown_msg, <<"test">>}, ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}), ?assertEqual(