diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index f54119440..c4f1caed5 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -189,7 +189,7 @@ on_fin(PacketId, Msg, FileId, Checksum) -> %% We have new fin packet ok -> Callback = callback(FinPacketKey, FileId), - case emqx_ft_storage:assemble(transfer(Msg, FileId), Callback) of + case assemble(transfer(Msg, FileId), Callback) of %% Assembling started, packet will be acked by the callback or the responder {ok, _} -> undefined; @@ -214,6 +214,17 @@ on_fin(PacketId, Msg, FileId, Checksum) -> undefined end. +assemble(Transfer, Callback) -> + try + emqx_ft_storage:assemble(Transfer, Callback) + catch + C:E:S -> + ?SLOG(warning, #{ + msg => "file_assemble_failed", class => C, reason => E, stacktrace => S + }), + {error, {internal_error, E}} + end. + callback({ChanPid, PacketId} = Key, _FileId) -> fun(Result) -> case emqx_ft_responder:unregister(Key) of diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 37a433433..088c316ae 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -28,10 +28,8 @@ -export([transfers/1]). --export([pread_local/4]). --export([list_local/2]). --export([ready_transfers_local/0, ready_transfers_local/1]). --export([get_ready_transfer_local/1, get_ready_transfer_local/2]). +-export([ready_transfers_local/1]). +-export([get_ready_transfer_local/2]). -export([ready_transfers/1]). -export([get_ready_transfer/2]). @@ -173,16 +171,6 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> assemble(Storage, Transfer, Callback) -> emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). --spec list_local(transfer(), fragment | result) -> - {ok, [filefrag()]} | {error, term()}. -list_local(Transfer, What) -> - emqx_ft_storage:with_storage_type(local, list, [Transfer, What]). - --spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> - {ok, [filefrag()]} | {error, term()}. -pread_local(Transfer, Frag, Offset, Size) -> - emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]). - get_ready_transfer(_Storage, ReadyTransferId) -> case parse_ready_transfer_id(ReadyTransferId) of {ok, {Node, Transfer}} -> @@ -198,9 +186,6 @@ get_ready_transfer(_Storage, ReadyTransferId) -> Error end. -get_ready_transfer_local(Transfer) -> - emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]). - get_ready_transfer_local(Storage, Transfer) -> Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)), case file:list_dir(Dirname) of @@ -223,9 +208,6 @@ ready_transfers(_Storage) -> ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), {ok, [File || {ok, Files} <- GoodResults, File <- Files]}. -ready_transfers_local() -> - emqx_ft_storage:with_storage_type(local, ready_transfers_local, []). - ready_transfers_local(Storage) -> {ok, Transfers} = transfers(Storage), lists:filtermap( diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl new file mode 100644 index 000000000..0c30f5567 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl @@ -0,0 +1,40 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% This methods are called via rpc by `emqx_ft_storage_fs` +%% They populate the call with actual storage which may be configured differently +%% on a concrete node. + +-module(emqx_ft_storage_fs_proxy). + +-export([ + list_local/2, + pread_local/4, + get_ready_transfer_local/1, + ready_transfers_local/0 +]). + +list_local(Transfer, What) -> + emqx_ft_storage:with_storage_type(local, list, [Transfer, What]). + +pread_local(Transfer, Frag, Offset, Size) -> + emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]). + +get_ready_transfer_local(Transfer) -> + emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]). + +ready_transfers_local() -> + emqx_ft_storage:with_storage_type(local, ready_transfers_local, []). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index 4f354be63..2e3dc8632 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -36,28 +36,29 @@ introduced_in() -> "5.0.17". -spec list(node(), transfer(), fragment | result) -> - {ok, [filefrag()]} | {error, term()}. + {ok, [filefrag()]} | {error, term()} | no_return(). list(Node, Transfer, What) -> - erpc:call(Node, emqx_ft_storage_fs, list_local, [Transfer, What]). + erpc:call(Node, emqx_ft_storage_fs_proxy, list_local, [Transfer, What]). -spec multilist([node()], transfer(), fragment | result) -> emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}). multilist(Nodes, Transfer, What) -> - erpc:multicall(Nodes, emqx_ft_storage_fs, list_local, [Transfer, What]). + erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, list_local, [Transfer, What]). -spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> - {ok, [filefrag()]} | {error, term()}. + {ok, [filefrag()]} | {error, term()} | no_return(). pread(Node, Transfer, Frag, Offset, Size) -> - erpc:call(Node, emqx_ft_storage_fs, pread_local, [Transfer, Frag, Offset, Size]). + erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]). -spec ready_transfers([node()]) -> {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]} | {error, term()}. ready_transfers(Nodes) -> - erpc:multicall(Nodes, emqx_ft_storage_fs, ready_transfers_local, []). + erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []). -spec get_ready_transfer(node(), emqx_ft_storage:ready_transfer_id()) -> {ok, emqx_ft_storage:ready_transfer_data()} - | {error, term()}. + | {error, term()} + | no_return(). get_ready_transfer(Node, ReadyTransferId) -> - erpc:call(Node, emqx_ft_storage_fs, get_ready_transfer_local, [ReadyTransferId]). + erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [ReadyTransferId]).