diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index be2f80831..ce3a7a97d 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -66,7 +66,8 @@ %% TTL of individual segments %% Somewhat confusing that we won't know it on the nodes where the filemeta %% is missing. - segments_ttl => _Seconds :: pos_integer() + segments_ttl => _Seconds :: pos_integer(), + user_data => emqx_ft_schema:json_value() }. -type segment() :: {offset(), _Content :: binary()}. diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 4fd8d6e75..ef4daf000 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -66,7 +66,7 @@ init({Storage, Transfer, Callback}) -> handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> % TODO: what we do with non-transients errors here (e.g. `eacces`)? - {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer), + {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment), NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), NSt = St#st{assembly = NAsm}, case emqx_ft_assembly:status(NAsm) of @@ -80,13 +80,11 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> % {stop, Reason} end; handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> - % TODO: portable "storage" ref - Args = [St#st.storage, St#st.transfer], % TODO % Async would better because we would not need to wait for some lagging nodes if % the coverage is already complete. - % TODO: BP API? - Results = erpc:multicall(Nodes, emqx_ft_storage_fs, list, Args, ?RPC_LIST_TIMEOUT), + % TODO: portable "storage" ref + Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment), NodeResults = lists:zip(Nodes, Results), NAsm = emqx_ft_assembly:update( lists:foldl( @@ -119,9 +117,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO % Currently, race is possible between getting segment info from the remote node and % this node garbage collecting the segment itself. - Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)], % TODO: pipelining - case erpc:call(Node, emqx_ft_storage_fs, read_segment, Args, ?RPC_READSEG_TIMEOUT) of + case pread(Node, Segment, St) of {ok, Content} -> {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])} @@ -158,6 +155,11 @@ handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, call % handle_cast(_Cast, St) -> % {noreply, St}. +pread(Node, Segment, St) when Node =:= node() -> + emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)); +pread(Node, Segment, St) -> + emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)). + %% segsize(#{fragment := {segment, Info}}) -> diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index f40d2f40e..70acb8322 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -23,6 +23,20 @@ -export([namespace/0, roots/0, fields/1, tags/0]). +-export([schema/1]). + +-type json_value() :: + null + | boolean() + | binary() + | number() + | [json_value()] + | #{binary() => json_value()}. + +-reflect_type([json_value/0]). + +%% + namespace() -> file_transfer. tags() -> @@ -47,3 +61,29 @@ fields(local_storage) -> desc => ?DESC("local") }} ]. + +schema(filemeta) -> + #{ + roots => [ + {name, hoconsc:mk(string(), #{required => true})}, + {size, hoconsc:mk(non_neg_integer())}, + {expire_at, hoconsc:mk(non_neg_integer())}, + {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})}, + {segments_ttl, hoconsc:mk(pos_integer())}, + {user_data, hoconsc:mk(json_value())} + ] + }. + +converter(checksum) -> + fun + (undefined, #{}) -> + undefined; + ({sha256, Bin}, #{make_serializable := true}) -> + _ = is_binary(Bin) orelse throw({expected_type, string}), + _ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}), + binary:encode_hex(Bin); + (Hex, #{}) -> + _ = is_binary(Hex) orelse throw({expected_type, string}), + _ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}), + {sha256, binary:decode_hex(Hex)} + end. diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 8729a2ad4..819656551 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -24,6 +24,14 @@ ] ). +-export([list_local/2]). +-export([pread_local/4]). + +-export([local_transfers/0]). + +-type offset() :: emqx_ft:offset(). +-type transfer() :: emqx_ft:transfer(). + -type storage() :: emqx_config:config(). -export_type([assemble_callback/0]). @@ -63,8 +71,41 @@ assemble(Transfer, Callback) -> Mod = mod(), Mod:assemble(storage(), Transfer, Callback). +%%-------------------------------------------------------------------- +%% Local FS API +%%-------------------------------------------------------------------- + +-type filefrag() :: emqx_ft_storage_fs:filefrag(). +-type transferinfo() :: emqx_ft_storage_fs:transferinfo(). + +-spec list_local(transfer(), fragment | result) -> + {ok, [filefrag()]} | {error, term()}. +list_local(Transfer, What) -> + with_local_storage( + fun(Mod, Storage) -> Mod:list(Storage, Transfer, What) end + ). + +-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> + {ok, [filefrag()]} | {error, term()}. +pread_local(Transfer, Frag, Offset, Size) -> + with_local_storage( + fun(Mod, Storage) -> Mod:pread(Storage, Transfer, Frag, Offset, Size) end + ). + +-spec local_transfers() -> + {ok, node(), #{transfer() => transferinfo()}} | {error, term()}. +local_transfers() -> + with_local_storage( + fun(Mod, Storage) -> Mod:transfers(Storage) end + ). + +%% + mod() -> - case storage() of + mod(storage()). + +mod(Storage) -> + case Storage of #{type := local} -> emqx_ft_storage_fs % emqx_ft_storage_dummy @@ -72,3 +113,11 @@ mod() -> storage() -> emqx_config:get([file_transfer, storage]). + +with_local_storage(Fun) -> + case storage() of + #{type := local} = Storage -> + Fun(mod(Storage), Storage); + #{type := Type} -> + {error, {unsupported_storage_type, Type}} + end. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index a530c9cf8..4afbc5276 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -16,45 +16,59 @@ -module(emqx_ft_storage_fs). --include_lib("typerefl/include/types.hrl"). --include_lib("hocon/include/hoconsc.hrl"). - -behaviour(emqx_ft_storage). -export([store_filemeta/3]). -export([store_segment/3]). --export([list/2]). --export([read_segment/5]). +-export([list/3]). +-export([pread/5]). -export([assemble/3]). +-export([transfers/1]). + -export([open_file/3]). -export([complete/4]). -export([write/2]). -export([discard/1]). +-export_type([filefrag/1]). +-export_type([filefrag/0]). +-export_type([transferinfo/0]). + -type transfer() :: emqx_ft:transfer(). -type offset() :: emqx_ft:offset(). - -type filemeta() :: emqx_ft:filemeta(). - --type segment() :: {offset(), _Content :: binary()}. +-type segment() :: emqx_ft:segment(). -type segmentinfo() :: #{ offset := offset(), size := _Bytes :: non_neg_integer() }. +-type transferinfo() :: #{ + status := complete | incomplete, + result => [filefrag({result, #{}})] +}. + +% TODO naming -type filefrag(T) :: #{ path := file:name(), timestamp := emqx_datetime:epoch_second(), + size := _Bytes :: non_neg_integer(), fragment := T }. --type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}). +-type filefrag() :: filefrag( + {filemeta, filemeta()} + | {segment, segmentinfo()} + | {result, #{}} +). +-define(FRAGDIR, frags). +-define(TEMPDIR, tmp). +-define(RESULTDIR, result). -define(MANIFEST, "MANIFEST.json"). -define(SEGMENT, "SEG"). --define(TEMP, "TMP"). -type root() :: file:name(). @@ -80,7 +94,8 @@ % Quota? Some lower level errors? {ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}. store_filemeta(Storage, Transfer, Meta) -> - Filepath = mk_filepath(Storage, Transfer, ?MANIFEST), + % TODO safeguard against bad clientids / fileids. + Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), case read_file(Filepath, fun decode_filemeta/1) of {ok, Meta} -> _ = touch_file(Filepath), @@ -92,7 +107,7 @@ store_filemeta(Storage, Transfer, Meta) -> % about it too much now. {error, conflict}; {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent -> - write_file_atomic(Filepath, encode_filemeta(Meta)) + write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta)) end. %% Store a segment in the backing filesystem. @@ -102,32 +117,47 @@ store_filemeta(Storage, Transfer, Meta) -> % Quota? Some lower level errors? ok | {error, _TODO}. store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> - Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)), - write_file_atomic(Filepath, Content). + Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)), + write_file_atomic(Storage, Transfer, Filepath, Content). --spec list(storage(), transfer()) -> +-spec list(storage(), transfer(), _What :: fragment | result) -> % Some lower level errors? {error, notfound}? % Result will contain zero or only one filemeta. - {ok, list(filefrag())} | {error, _TODO}. -list(Storage, Transfer) -> - Dirname = mk_filedir(Storage, Transfer), + {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}. +list(Storage, Transfer, What) -> + Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)), case file:list_dir(Dirname) of {ok, Filenames} -> - {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)}; + % TODO + % In case of `What = result` there might be more than one file (though + % extremely bad luck is needed for that, e.g. concurrent assemblers with + % different filemetas from different nodes). This might be unexpected for a + % client given the current protocol, yet might be helpful in the future. + {ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)}; {error, enoent} -> {ok, []}; {error, _} = Error -> Error end. --spec read_segment( - storage(), transfer(), filefrag(segmentinfo()), offset(), _Size :: non_neg_integer() -) -> +get_subdirs_for(fragment) -> + [?FRAGDIR]; +get_subdirs_for(result) -> + [?RESULTDIR]. + +get_filefrag_fun_for(fragment) -> + fun mk_filefrag/2; +get_filefrag_fun_for(result) -> + fun mk_result_filefrag/2. + +-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> {ok, _Content :: iodata()} | {error, _TODO}. -read_segment(_Storage, _Transfer, Segment, Offset, Size) -> - Filepath = maps:get(path, Segment), - case file:open(Filepath, [raw, read]) of +pread(_Storage, _Transfer, Frag, Offset, Size) -> + Filepath = maps:get(path, Frag), + case file:open(Filepath, [read, raw, binary]) of {ok, IoDevice} -> + % NOTE + % Reading empty file is always `eof`. Read = file:pread(IoDevice, Offset, Size), ok = file:close(IoDevice), case Read of @@ -148,15 +178,63 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) -> assemble(Storage, Transfer, Callback) -> emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). +%% + +-spec transfers(storage()) -> + {ok, #{transfer() => transferinfo()}}. +transfers(Storage) -> + % TODO `Continuation` + % There might be millions of transfers on the node, we need a protocol and + % storage schema to iterate through them effectively. + ClientIds = try_list_dir(get_storage_root(Storage)), + {ok, + lists:foldl( + fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end, + #{}, + ClientIds + )}. + +transfers(Storage, ClientId, AccIn) -> + Dirname = mk_client_filedir(Storage, ClientId), + case file:list_dir(Dirname) of + {ok, FileIds} -> + lists:foldl( + fun(FileId, Acc) -> + Transfer = {filename_to_binary(ClientId), filename_to_binary(FileId)}, + read_transferinfo(Storage, Transfer, Acc) + end, + AccIn, + FileIds + ); + {error, _Reason} -> + % TODO worth logging + AccIn + end. + +read_transferinfo(Storage, Transfer, Acc) -> + case list(Storage, Transfer, result) of + {ok, Result = [_ | _]} -> + Info = #{status => complete, result => Result}, + Acc#{Transfer => Info}; + {ok, []} -> + Info = #{status => incomplete}, + Acc#{Transfer => Info}; + {error, _Reason} -> + % TODO worth logging + Acc + end. + +%% + -type handle() :: {file:name(), io:device(), crypto:hash_state()}. -spec open_file(storage(), transfer(), filemeta()) -> {ok, handle()} | {error, _TODO}. open_file(Storage, Transfer, Filemeta) -> Filename = maps:get(name, Filemeta), - Filepath = mk_filepath(Storage, Transfer, Filename), - TempFilepath = mk_temp_filepath(Filepath), - case file:open(TempFilepath, [write, raw]) of + TempFilepath = mk_temp_filepath(Storage, Transfer, Filename), + _ = filelib:ensure_dir(TempFilepath), + case file:open(TempFilepath, [write, raw, binary]) of {ok, Handle} -> _ = file:truncate(Handle), {ok, {TempFilepath, Handle, init_checksum(Filemeta)}}; @@ -177,11 +255,11 @@ write({Filepath, IoDevice, Ctx}, IoData) -> -spec complete(storage(), transfer(), filemeta(), handle()) -> ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}. complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) -> - TargetFilepath = mk_filepath(Storage, Transfer, maps:get(name, Filemeta)), + TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)), case verify_checksum(Ctx, Filemeta) of ok -> ok = file:close(IoDevice), - file:rename(Filepath, TargetFilepath); + mv_temp_file(Filepath, TargetFilepath); {error, _} = Error -> _ = discard(Handle), Error @@ -224,17 +302,6 @@ verify_checksum(undefined, _) -> -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). -schema() -> - #{ - roots => [ - {name, hoconsc:mk(string(), #{required => true})}, - {size, hoconsc:mk(non_neg_integer())}, - {expire_at, hoconsc:mk(non_neg_integer())}, - {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})}, - {segments_ttl, hoconsc:mk(pos_integer())} - ] - }. - % encode_filemeta(Meta) -> % emqx_json:encode( % ?PRELUDE( @@ -261,26 +328,14 @@ schema() -> encode_filemeta(Meta) -> % TODO: Looks like this should be hocon's responsibility. - Term = hocon_tconf:make_serializable(schema(), emqx_map_lib:binary_key_map(Meta), #{}), + Schema = emqx_ft_schema:schema(filemeta), + Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}), emqx_json:encode(?PRELUDE(_Vsn = 1, Term)). decode_filemeta(Binary) -> + Schema = emqx_ft_schema:schema(filemeta), ?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]), - hocon_tconf:check_plain(schema(), Term, #{atom_key => true, required => false}). - -converter(checksum) -> - fun - (undefined, #{}) -> - undefined; - ({sha256, Bin}, #{make_serializable := true}) -> - _ = is_binary(Bin) orelse throw({expected_type, string}), - _ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}), - binary:encode_hex(Bin); - (Hex, #{}) -> - _ = is_binary(Hex) orelse throw({expected_type, string}), - _ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}), - {sha256, binary:decode_hex(Hex)} - end. + hocon_tconf:check_plain(Schema, Term, #{atom_key => true, required => false}). % map_into(Fun, Into, Ks, Map) -> % map_foldr(map_into_fn(Fun, Into), Into, Ks, Map). @@ -310,11 +365,20 @@ break_segment_filename(Filename) -> {error, invalid} end. -mk_filedir(Storage, {ClientId, FileId}) -> - filename:join([get_storage_root(Storage), ClientId, FileId]). +mk_filedir(Storage, {ClientId, FileId}, SubDirs) -> + filename:join([get_storage_root(Storage), ClientId, FileId | SubDirs]). -mk_filepath(Storage, Transfer, Filename) -> - filename:join(mk_filedir(Storage, Transfer), Filename). +mk_client_filedir(Storage, ClientId) -> + filename:join([get_storage_root(Storage), ClientId]). + +mk_filepath(Storage, Transfer, SubDirs, Filename) -> + filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename). + +try_list_dir(Dirname) -> + case file:list_dir(Dirname) of + {ok, List} -> List; + {error, _} -> [] + end. get_storage_root(Storage) -> maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")). @@ -341,13 +405,13 @@ safe_decode(Content, DecodeFun) -> {error, corrupted} end. -write_file_atomic(Filepath, Content) when is_binary(Content) -> - TempFilepath = mk_temp_filepath(Filepath), +write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) -> + TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)), Result = emqx_misc:pipeline( [ fun filelib:ensure_dir/1, fun write_contents/2, - fun(FP) -> mv_temp_file(Filepath, FP) end + fun(_) -> mv_temp_file(TempFilepath, Filepath) end ], TempFilepath, Content @@ -360,11 +424,9 @@ write_file_atomic(Filepath, Content) when is_binary(Content) -> {error, Reason} end. -mk_temp_filepath(Filepath) -> - Dirname = filename:dirname(Filepath), - Filename = filename:basename(Filepath), +mk_temp_filepath(Storage, Transfer, Filename) -> Unique = erlang:unique_integer([positive]), - filename:join(Dirname, mk_filename([?TEMP, Unique, ".", Filename])). + filename:join(mk_filedir(Storage, Transfer, [?TEMPDIR]), mk_filename([Unique, ".", Filename])). mk_filename(Comps) -> lists:append(lists:map(fun mk_filename_component/1, Comps)). @@ -377,7 +439,8 @@ mk_filename_component(S) when is_list(S) -> S. write_contents(Filepath, Content) -> file:write_file(Filepath, Content). -mv_temp_file(Filepath, TempFilepath) -> +mv_temp_file(TempFilepath, Filepath) -> + _ = filelib:ensure_dir(Filepath), file:rename(TempFilepath, Filepath). touch_file(Filepath) -> @@ -391,9 +454,16 @@ mk_filefrag(Dirname, Filename = ?MANIFEST) -> mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2); mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) -> mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2); -mk_filefrag(_Dirname, _) -> +mk_filefrag(_Dirname, _Filename) -> + % TODO this is unexpected, worth logging? false. +mk_result_filefrag(Dirname, Filename) -> + % NOTE + % Any file in the `?RESULTDIR` subdir is currently considered the result of + % the file transfer. + mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end). + mk_filefrag(Dirname, Filename, Tag, Fun) -> Filepath = filename:join(Dirname, Filename), % TODO error handling? @@ -403,9 +473,11 @@ mk_filefrag(Dirname, Filename, Tag, Fun) -> {true, #{ path => Filepath, timestamp => Fileinfo#file_info.mtime, + size => Fileinfo#file_info.size, fragment => {Tag, Frag} }}; {error, _Reason} -> + % TODO loss of information false end. @@ -414,3 +486,6 @@ read_filemeta(_Filename, Filepath) -> read_segmentinfo(Filename, _Filepath) -> break_segment_filename(Filename). + +filename_to_binary(S) when is_list(S) -> unicode:characters_to_binary(S); +filename_to_binary(B) when is_binary(B) -> B. diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl new file mode 100644 index 000000000..45dd93ab8 --- /dev/null +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([list/3]). +-export([multilist/3]). +-export([pread/5]). +-export([transfers/1]). + +-type offset() :: emqx_ft:offset(). +-type transfer() :: emqx_ft:transfer(). +-type filefrag() :: emqx_ft_storage_fs:filefrag(). +-type transferinfo() :: emqx_ft_storage_fs:transferinfo(). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.17". + +-spec list(node(), transfer(), fragment | result) -> + {ok, [filefrag()]} | {error, term()}. +list(Node, Transfer, What) -> + erpc:call(Node, emqx_ft_storage, list_local, [Transfer, What]). + +-spec multilist([node()], transfer(), fragment | result) -> + emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}). +multilist(Nodes, Transfer, What) -> + erpc:multicall(Nodes, emqx_ft_storage, list_local, [Transfer, What]). + +-spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> + {ok, [filefrag()]} | {error, term()}. +pread(Node, Transfer, Frag, Offset, Size) -> + erpc:call(Node, emqx_ft_storage, pread_local, [Transfer, Frag, Offset, Size]). + +-spec transfers([node()]) -> + emqx_rpc:erpc_multicall({ok, #{transfer() => transferinfo()}} | {error, term()}). +transfers(Nodes) -> + erpc:multicall(Nodes, emqx_ft_storage, local_transfers, []). diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 9ec2fce61..336580584 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -24,24 +24,31 @@ -include_lib("kernel/include/file.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + t_assemble_empty_transfer, + t_assemble_complete_local_transfer, + + % NOTE + % It depends on the side effects of all previous testcases. + t_list_transfers + ]. init_per_suite(Config) -> - % {ok, Apps} = application:ensure_all_started(emqx_ft), - % [{suite_apps, Apps} | Config]. - % ok = emqx_common_test_helpers:start_apps([emqx_ft]), Config. end_per_suite(_Config) -> - % lists:foreach(fun application:stop/1, lists:reverse(?config(suite_apps, Config))). - % ok = emqx_common_test_helpers:stop_apps([emqx_ft]), ok. init_per_testcase(TC, Config) -> ok = snabbkaffe:start_trace(), - Root = filename:join(["roots", TC]), {ok, Pid} = emqx_ft_assembler_sup:start_link(), - [{storage_root, Root}, {assembler_sup, Pid} | Config]. + [ + {storage_root, "file_transfer_root"}, + {file_id, atom_to_binary(TC)}, + {assembler_sup, Pid} + | Config + ]. end_per_testcase(_TC, Config) -> ok = inspect_storage_root(Config), @@ -51,11 +58,12 @@ end_per_testcase(_TC, Config) -> %% --define(CLIENTID, <<"thatsme">>). +-define(CLIENTID1, <<"thatsme">>). +-define(CLIENTID2, <<"thatsnotme">>). t_assemble_empty_transfer(Config) -> Storage = storage(Config), - Transfer = {?CLIENTID, mk_fileid()}, + Transfer = {?CLIENTID1, ?config(file_id, Config)}, Filename = "important.pdf", Meta = #{ name => Filename, @@ -71,7 +79,7 @@ t_assemble_empty_transfer(Config) -> fragment := {filemeta, Meta} } ]}, - emqx_ft_storage_fs:list(Storage, Transfer) + emqx_ft_storage_fs:list(Storage, Transfer, fragment) ), {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1), {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}), @@ -81,11 +89,16 @@ t_assemble_empty_transfer(Config) -> % TODO file:read_file(mk_assembly_filename(Config, Transfer, Filename)) ), + {ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result), + ?assertEqual( + {error, eof}, + emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size) + ), ok. t_assemble_complete_local_transfer(Config) -> Storage = storage(Config), - Transfer = {?CLIENTID, mk_fileid()}, + Transfer = {?CLIENTID2, ?config(file_id, Config)}, Filename = "topsecret.pdf", TransferSize = 10000 + rand:uniform(50000), SegmentSize = 4096, @@ -109,7 +122,7 @@ t_assemble_complete_local_transfer(Config) -> end ), - {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer), + {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment), ?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)), ?assertEqual( [Meta], @@ -122,6 +135,16 @@ t_assemble_complete_local_transfer(Config) -> ?assertMatch(#{result := ok}, Event), AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename), + ?assertMatch( + {ok, [ + #{ + path := AssemblyFilename, + size := TransferSize, + fragment := {result, #{}} + } + ]}, + emqx_ft_storage_fs:list(Storage, Transfer, result) + ), ?assertMatch( {ok, #file_info{type = regular, size = TransferSize}}, file:read_file_info(AssemblyFilename) @@ -133,13 +156,31 @@ t_assemble_complete_local_transfer(Config) -> ). mk_assembly_filename(Config, {ClientID, FileID}, Filename) -> - filename:join([?config(storage_root, Config), ClientID, FileID, Filename]). + filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]). on_assembly_finished(Result) -> ?tp(test_assembly_finished, #{result => Result}). %% +t_list_transfers(Config) -> + Storage = storage(Config), + ?assertMatch( + {ok, #{ + {?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{ + status := complete, + result := [#{path := _, size := 0, fragment := {result, _}}] + }, + {?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{ + status := complete, + result := [#{path := _, size := Size, fragment := {result, _}}] + } + }} when Size > 0, + emqx_ft_storage_fs:transfers(Storage) + ). + +%% + -include_lib("kernel/include/file.hrl"). inspect_storage_root(Config) ->