diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index af50dee3b..ef4daf000 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -80,13 +80,11 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> % {stop, Reason} end; handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> - % TODO: portable "storage" ref - Args = [St#st.storage, St#st.transfer, fragment], % TODO % Async would better because we would not need to wait for some lagging nodes if % the coverage is already complete. - % TODO: BP API? - Results = erpc:multicall(Nodes, emqx_ft_storage_fs, list, Args, ?RPC_LIST_TIMEOUT), + % TODO: portable "storage" ref + Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment), NodeResults = lists:zip(Nodes, Results), NAsm = emqx_ft_assembly:update( lists:foldl( @@ -119,9 +117,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO % Currently, race is possible between getting segment info from the remote node and % this node garbage collecting the segment itself. - Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)], % TODO: pipelining - case erpc:call(Node, emqx_ft_storage_fs, pread, Args, ?RPC_READSEG_TIMEOUT) of + case pread(Node, Segment, St) of {ok, Content} -> {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])} @@ -158,6 +155,11 @@ handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, call % handle_cast(_Cast, St) -> % {noreply, St}. +pread(Node, Segment, St) when Node =:= node() -> + emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)); +pread(Node, Segment, St) -> + emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)). + %% segsize(#{fragment := {segment, Info}}) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 8729a2ad4..819656551 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -24,6 +24,14 @@ ] ). +-export([list_local/2]). +-export([pread_local/4]). + +-export([local_transfers/0]). + +-type offset() :: emqx_ft:offset(). +-type transfer() :: emqx_ft:transfer(). + -type storage() :: emqx_config:config(). -export_type([assemble_callback/0]). @@ -63,8 +71,41 @@ assemble(Transfer, Callback) -> Mod = mod(), Mod:assemble(storage(), Transfer, Callback). +%%-------------------------------------------------------------------- +%% Local FS API +%%-------------------------------------------------------------------- + +-type filefrag() :: emqx_ft_storage_fs:filefrag(). +-type transferinfo() :: emqx_ft_storage_fs:transferinfo(). + +-spec list_local(transfer(), fragment | result) -> + {ok, [filefrag()]} | {error, term()}. +list_local(Transfer, What) -> + with_local_storage( + fun(Mod, Storage) -> Mod:list(Storage, Transfer, What) end + ). + +-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> + {ok, [filefrag()]} | {error, term()}. +pread_local(Transfer, Frag, Offset, Size) -> + with_local_storage( + fun(Mod, Storage) -> Mod:pread(Storage, Transfer, Frag, Offset, Size) end + ). + +-spec local_transfers() -> + {ok, node(), #{transfer() => transferinfo()}} | {error, term()}. +local_transfers() -> + with_local_storage( + fun(Mod, Storage) -> Mod:transfers(Storage) end + ). + +%% + mod() -> - case storage() of + mod(storage()). + +mod(Storage) -> + case Storage of #{type := local} -> emqx_ft_storage_fs % emqx_ft_storage_dummy @@ -72,3 +113,11 @@ mod() -> storage() -> emqx_config:get([file_transfer, storage]). + +with_local_storage(Fun) -> + case storage() of + #{type := local} = Storage -> + Fun(mod(Storage), Storage); + #{type := Type} -> + {error, {unsupported_storage_type, Type}} + end. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 3a78559c1..4afbc5276 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -31,6 +31,10 @@ -export([write/2]). -export([discard/1]). +-export_type([filefrag/1]). +-export_type([filefrag/0]). +-export_type([transferinfo/0]). + -type transfer() :: emqx_ft:transfer(). -type offset() :: emqx_ft:offset(). -type filemeta() :: emqx_ft:filemeta(). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl new file mode 100644 index 000000000..45dd93ab8 --- /dev/null +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([list/3]). +-export([multilist/3]). +-export([pread/5]). +-export([transfers/1]). + +-type offset() :: emqx_ft:offset(). +-type transfer() :: emqx_ft:transfer(). +-type filefrag() :: emqx_ft_storage_fs:filefrag(). +-type transferinfo() :: emqx_ft_storage_fs:transferinfo(). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.17". + +-spec list(node(), transfer(), fragment | result) -> + {ok, [filefrag()]} | {error, term()}. +list(Node, Transfer, What) -> + erpc:call(Node, emqx_ft_storage, list_local, [Transfer, What]). + +-spec multilist([node()], transfer(), fragment | result) -> + emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}). +multilist(Nodes, Transfer, What) -> + erpc:multicall(Nodes, emqx_ft_storage, list_local, [Transfer, What]). + +-spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> + {ok, [filefrag()]} | {error, term()}. +pread(Node, Transfer, Frag, Offset, Size) -> + erpc:call(Node, emqx_ft_storage, pread_local, [Transfer, Frag, Offset, Size]). + +-spec transfers([node()]) -> + emqx_rpc:erpc_multicall({ok, #{transfer() => transferinfo()}} | {error, term()}). +transfers(Nodes) -> + erpc:multicall(Nodes, emqx_ft_storage, local_transfers, []).