feat(ft): add fs storage bpapi and use it in assembler

This commit is contained in:
Andrew Mayorov 2023-02-03 18:48:50 +03:00
parent c11e251902
commit 9a56c34c8d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 118 additions and 7 deletions

View File

@ -80,13 +80,11 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
% {stop, Reason}
end;
handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
% TODO: portable "storage" ref
Args = [St#st.storage, St#st.transfer, fragment],
% TODO
% Async would better because we would not need to wait for some lagging nodes if
% the coverage is already complete.
% TODO: BP API?
Results = erpc:multicall(Nodes, emqx_ft_storage_fs, list, Args, ?RPC_LIST_TIMEOUT),
% TODO: portable "storage" ref
Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment),
NodeResults = lists:zip(Nodes, Results),
NAsm = emqx_ft_assembly:update(
lists:foldl(
@ -119,9 +117,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO
% Currently, race is possible between getting segment info from the remote node and
% this node garbage collecting the segment itself.
Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)],
% TODO: pipelining
case erpc:call(Node, emqx_ft_storage_fs, pread, Args, ?RPC_READSEG_TIMEOUT) of
case pread(Node, Segment, St) of
{ok, Content} ->
{ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
{next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}
@ -158,6 +155,11 @@ handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, call
% handle_cast(_Cast, St) ->
% {noreply, St}.
pread(Node, Segment, St) when Node =:= node() ->
emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
pread(Node, Segment, St) ->
emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)).
%%
segsize(#{fragment := {segment, Info}}) ->

View File

@ -24,6 +24,14 @@
]
).
-export([list_local/2]).
-export([pread_local/4]).
-export([local_transfers/0]).
-type offset() :: emqx_ft:offset().
-type transfer() :: emqx_ft:transfer().
-type storage() :: emqx_config:config().
-export_type([assemble_callback/0]).
@ -63,8 +71,41 @@ assemble(Transfer, Callback) ->
Mod = mod(),
Mod:assemble(storage(), Transfer, Callback).
%%--------------------------------------------------------------------
%% Local FS API
%%--------------------------------------------------------------------
-type filefrag() :: emqx_ft_storage_fs:filefrag().
-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
-spec list_local(transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()}.
list_local(Transfer, What) ->
with_local_storage(
fun(Mod, Storage) -> Mod:list(Storage, Transfer, What) end
).
-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, [filefrag()]} | {error, term()}.
pread_local(Transfer, Frag, Offset, Size) ->
with_local_storage(
fun(Mod, Storage) -> Mod:pread(Storage, Transfer, Frag, Offset, Size) end
).
-spec local_transfers() ->
{ok, node(), #{transfer() => transferinfo()}} | {error, term()}.
local_transfers() ->
with_local_storage(
fun(Mod, Storage) -> Mod:transfers(Storage) end
).
%%
mod() ->
case storage() of
mod(storage()).
mod(Storage) ->
case Storage of
#{type := local} ->
emqx_ft_storage_fs
% emqx_ft_storage_dummy
@ -72,3 +113,11 @@ mod() ->
storage() ->
emqx_config:get([file_transfer, storage]).
with_local_storage(Fun) ->
case storage() of
#{type := local} = Storage ->
Fun(mod(Storage), Storage);
#{type := Type} ->
{error, {unsupported_storage_type, Type}}
end.

View File

@ -31,6 +31,10 @@
-export([write/2]).
-export([discard/1]).
-export_type([filefrag/1]).
-export_type([filefrag/0]).
-export_type([transferinfo/0]).
-type transfer() :: emqx_ft:transfer().
-type offset() :: emqx_ft:offset().
-type filemeta() :: emqx_ft:filemeta().

View File

@ -0,0 +1,56 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_fs_proto_v1).
-behaviour(emqx_bpapi).
-export([introduced_in/0]).
-export([list/3]).
-export([multilist/3]).
-export([pread/5]).
-export([transfers/1]).
-type offset() :: emqx_ft:offset().
-type transfer() :: emqx_ft:transfer().
-type filefrag() :: emqx_ft_storage_fs:filefrag().
-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.17".
-spec list(node(), transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()}.
list(Node, Transfer, What) ->
erpc:call(Node, emqx_ft_storage, list_local, [Transfer, What]).
-spec multilist([node()], transfer(), fragment | result) ->
emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}).
multilist(Nodes, Transfer, What) ->
erpc:multicall(Nodes, emqx_ft_storage, list_local, [Transfer, What]).
-spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, [filefrag()]} | {error, term()}.
pread(Node, Transfer, Frag, Offset, Size) ->
erpc:call(Node, emqx_ft_storage, pread_local, [Transfer, Frag, Offset, Size]).
-spec transfers([node()]) ->
emqx_rpc:erpc_multicall({ok, #{transfer() => transferinfo()}} | {error, term()}).
transfers(Nodes) ->
erpc:multicall(Nodes, emqx_ft_storage, local_transfers, []).