diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl index 2981dbd40..4208df97f 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v2.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -78,7 +78,7 @@ takeover_finish(ConnMod, ChanPid) -> erpc:call( node(ChanPid), emqx_cm, - takeover_session_finish, + takeover_finish, [ConnMod, ChanPid], ?T_TAKEOVER * 2 ). diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index ef4daf000..623e11714 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -34,7 +34,7 @@ storage :: _Storage, transfer :: emqx_ft:transfer(), assembly :: _TODO, - file :: io:device(), + file :: {file:filename(), io:device(), term()} | undefined, hash, callback :: fun((ok | {error, term()}) -> any()) }). @@ -120,10 +120,16 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO: pipelining case pread(Node, Segment, St) of {ok, Content} -> - {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), - {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])} - % {error, _} -> - % ... + case emqx_ft_storage_fs:write(St#st.file, Content) of + {ok, NHandle} -> + {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}; + %% TODO: better error handling + {error, Error} -> + error(Error) + end; + {error, Error} -> + %% TODO: better error handling + error(Error) end; handle_event(internal, _, {assemble, []}, St = #st{}) -> {next_state, complete, St, ?internal([])}; diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index a120a4067..a6559d470 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -85,7 +85,7 @@ %% Atomic operation. -spec store_filemeta(storage(), transfer(), filemeta()) -> % Quota? Some lower level errors? - {ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}. + ok | {error, conflict} | {error, _TODO}. store_filemeta(Storage, Transfer, Meta) -> % TODO safeguard against bad clientids / fileids. Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), @@ -226,8 +226,16 @@ ready_transfers(_Storage) -> end, Results ), - ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), - {ok, [File || {ok, Files} <- GoodResults, File <- Files]}. + case {GoodResults, BadResults} of + {[], _} -> + ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), + {error, no_nodes}; + {_, []} -> + {ok, [File || {ok, Files} <- GoodResults, File <- Files]}; + {_, _} -> + ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), + {ok, [File || {ok, Files} <- GoodResults, File <- Files]} + end. ready_transfers_local(Storage) -> {ok, Transfers} = transfers(Storage), @@ -323,7 +331,7 @@ open_file(Storage, Transfer, Filemeta) -> end. -spec write(handle(), iodata()) -> - ok | {error, _TODO}. + {ok, handle()} | {error, _TODO}. write({Filepath, IoDevice, Ctx}, IoData) -> case file:write(IoDevice, IoData) of ok -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl index 782959e19..373b92753 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl @@ -58,12 +58,9 @@ table(ReaderPid, Bytes) when is_pid(ReaderPid) andalso is_integer(Bytes) andalso eof -> []; {ok, Data} -> - [Data | fun() -> NextFun(Pid) end]; + [Data] ++ fun() -> NextFun(Pid) end; {error, Reason} -> ?SLOG(warning, #{msg => "file_read_error", reason => Reason}), - []; - {BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp -> - ?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}), [] end end, diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index 082df9ac0..992d62c48 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -51,8 +51,12 @@ pread(Node, Transfer, Frag, Offset, Size) -> erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]). -spec ready_transfers([node()]) -> - {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]} - | {error, term()}. + [ + {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]} + | {error, term()} + | {exit, term()} + | {throw, term()} + ]. ready_transfers(Nodes) -> erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []).