diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index 85fd8d8cb..7e1ed97ad 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -19,7 +19,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include_lib("emqx/include/logger.hrl"). %% Swagger specs from hocon schema -export([ @@ -30,12 +29,14 @@ ]). -export([ - roots/0 + roots/0, + fields/1 ]). %% API callbacks -export([ - '/file_transfer/files'/2 + '/file_transfer/files'/2, + '/file_transfer/files/:clientid/:fileid'/2 ]). -import(hoconsc, [mk/2, ref/1, ref/2]). @@ -47,7 +48,8 @@ api_spec() -> paths() -> [ - "/file_transfer/files" + "/file_transfer/files", + "/file_transfer/files/:clientid/:fileid" ]. schema("/file_transfer/files") -> @@ -57,29 +59,133 @@ schema("/file_transfer/files") -> tags => [<<"file_transfer">>], summary => <<"List all uploaded files">>, description => ?DESC("file_list"), + parameters => [ + ref(following), + ref(emqx_dashboard_swagger, limit) + ], responses => #{ 200 => <<"Operation success">>, + 400 => emqx_dashboard_swagger:error_codes( + ['BAD_REQUEST'], <<"Invalid cursor">> + ), 503 => emqx_dashboard_swagger:error_codes( - ['SERVICE_UNAVAILABLE'], <<"Service unavailable">> + ['SERVICE_UNAVAILABLE'], error_desc('SERVICE_UNAVAILABLE') + ) + } + } + }; +schema("/file_transfer/files/:clientid/:fileid") -> + #{ + 'operationId' => '/file_transfer/files/:clientid/:fileid', + get => #{ + tags => [<<"file_transfer">>], + summary => <<"List files uploaded in a specific transfer">>, + description => ?DESC("file_list_transfer"), + parameters => [ + ref(client_id), + ref(file_id) + ], + responses => #{ + 200 => <<"Operation success">>, + 404 => emqx_dashboard_swagger:error_codes( + ['FILES_NOT_FOUND'], error_desc('FILES_NOT_FOUND') + ), + 503 => emqx_dashboard_swagger:error_codes( + ['SERVICE_UNAVAILABLE'], error_desc('SERVICE_UNAVAILABLE') ) } } }. -'/file_transfer/files'(get, #{}) -> - case emqx_ft_storage:files() of - {ok, Files} -> - {200, #{<<"files">> => lists:map(fun format_file_info/1, Files)}}; - {error, _} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} +'/file_transfer/files'(get, #{ + query_string := QueryString +}) -> + try + Limit = limit(QueryString), + Query = + case maps:get(<<"following">>, QueryString, undefined) of + undefined -> + #{limit => Limit}; + Cursor -> + #{limit => Limit, following => Cursor} + end, + case emqx_ft_storage:files(Query) of + {ok, Page} -> + {200, format_page(Page)}; + {error, _} -> + {503, error_msg('SERVICE_UNAVAILABLE')} + end + catch + error:{badarg, cursor} -> + {400, error_msg('BAD_REQUEST', <<"Invalid cursor">>)} end. +'/file_transfer/files/:clientid/:fileid'(get, #{ + bindings := #{clientid := ClientId, fileid := FileId} +}) -> + Transfer = {ClientId, FileId}, + case emqx_ft_storage:files(#{transfer => Transfer}) of + {ok, Page} -> + {200, format_page(Page)}; + {error, [{_Node, enoent} | _]} -> + {404, error_msg('FILES_NOT_FOUND')}; + {error, _} -> + {503, error_msg('SERVICE_UNAVAILABLE')} + end. + +format_page(#{items := Files, cursor := Cursor}) -> + #{ + <<"files">> => lists:map(fun format_file_info/1, Files), + <<"cursor">> => Cursor + }; +format_page(#{items := Files}) -> + #{ + <<"files">> => lists:map(fun format_file_info/1, Files) + }. + +error_msg(Code) -> + #{code => Code, message => error_desc(Code)}. + error_msg(Code, Msg) -> #{code => Code, message => emqx_utils:readable_error_msg(Msg)}. +error_desc('FILES_NOT_FOUND') -> + <<"Files requested for this transfer could not be found">>; +error_desc('SERVICE_UNAVAILABLE') -> + <<"Service unavailable">>. + roots() -> []. +-spec fields(hocon_schema:name()) -> [hoconsc:field()]. +fields(client_id) -> + [ + {clientid, + mk(binary(), #{ + in => path, + desc => <<"MQTT Client ID">>, + required => true + })} + ]; +fields(file_id) -> + [ + {fileid, + mk(binary(), #{ + in => path, + desc => <<"File ID">>, + required => true + })} + ]; +fields(following) -> + [ + {following, + mk(binary(), #{ + in => query, + desc => <<"Cursor to start listing files from">>, + required => false + })} + ]. + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- @@ -115,3 +221,6 @@ format_name(NameBin) when is_binary(NameBin) -> NameBin; format_name(Name) when is_list(Name) -> iolist_to_binary(Name). + +limit(QueryString) -> + maps:get(<<"limit">>, QueryString, emqx_mgmt:default_row_limit()). diff --git a/apps/emqx_ft/src/emqx_ft_fs_iterator.erl b/apps/emqx_ft/src/emqx_ft_fs_iterator.erl new file mode 100644 index 000000000..7fb8d8634 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_fs_iterator.erl @@ -0,0 +1,235 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_fs_iterator). + +-export([new/2]). +-export([next/1]). +-export([next_leaf/1]). + +-export([seek/3]). + +-export([fold/3]). +-export([fold_n/4]). + +-export_type([t/0]). +-export_type([glob/0]). +-export_type([pathstack/0]). + +-type root() :: file:name(). +-type glob() :: ['*' | globfun()]. +-type globfun() :: + fun((_Filename :: file:name()) -> boolean()) + | fun((_Filename :: file:name(), pathstack()) -> boolean()). + +% A path stack is a list of path components, in reverse order. +-type pathstack() :: [file:name(), ...]. + +-opaque t() :: #{ + root := root(), + queue := [_PathStack :: [file:name()]], + head := glob(), + stack := [{[pathstack()], glob()}] +}. + +-type entry() :: entry_leaf() | entry_node(). +-type entry_leaf() :: + {leaf, file:name(), file:file_info() | {error, file:posix()}, pathstack()}. +-type entry_node() :: + {node, file:name(), {error, file:posix()}, pathstack()}. + +-spec new(root(), glob()) -> + t(). +new(Root, Glob) -> + #{ + root => Root, + queue => [[]], + head => Glob, + stack => [] + }. + +-spec next(t()) -> + {entry(), t()} | none. +next(It = #{queue := [PathStack | Rest], head := []}) -> + {emit(PathStack, It), It#{queue => Rest}}; +next(It = #{queue := [PathStack | Rest], head := [Pat | _], root := Root}) -> + Filepath = mk_filepath(PathStack), + case emqx_ft_fs_util:list_dir(filename:join(Root, Filepath)) of + {ok, Filenames} -> + Sorted = lists:sort(Filenames), + Matches = [[Fn | PathStack] || Fn <- Sorted, matches_glob(Pat, Fn, [Fn | PathStack])], + ItNext = windup(It), + next(ItNext#{queue => Matches}); + {error, _} = Error -> + {{node, Filepath, Error, PathStack}, It#{queue => Rest}} + end; +next(It = #{queue := []}) -> + unwind(It). + +windup(It = #{queue := [_ | Rest], head := [Pat | Glob], stack := Stack}) -> + % NOTE + % Preserve unfinished paths and glob in the stack, so that we can resume traversal + % when the lower levels of the tree are exhausted. + It#{ + head => Glob, + stack => [{Rest, [Pat | Glob]} | Stack] + }. + +unwind(It = #{stack := [{Queue, Glob} | StackRest]}) -> + % NOTE + % Resume traversal of unfinished paths from the upper levels of the tree. + next(It#{ + queue => Queue, + head => Glob, + stack => StackRest + }); +unwind(#{stack := []}) -> + none. + +emit(PathStack, #{root := Root}) -> + Filepath = mk_filepath(PathStack), + case emqx_ft_fs_util:read_info(filename:join(Root, Filepath)) of + {ok, Fileinfo} -> + {leaf, Filepath, Fileinfo, PathStack}; + {error, _} = Error -> + {leaf, Filepath, Error, PathStack} + end. + +mk_filepath([]) -> + ""; +mk_filepath(PathStack) -> + filename:join(lists:reverse(PathStack)). + +matches_glob('*', _, _) -> + true; +matches_glob(FilterFun, Filename, _PathStack) when is_function(FilterFun, 1) -> + FilterFun(Filename); +matches_glob(FilterFun, Filename, PathStack) when is_function(FilterFun, 2) -> + FilterFun(Filename, PathStack). + +%% + +-spec next_leaf(t()) -> + {entry_leaf(), t()} | none. +next_leaf(It) -> + case next(It) of + {{leaf, _, _, _} = Leaf, ItNext} -> + {Leaf, ItNext}; + {{node, _Filename, _Error, _PathStack}, ItNext} -> + % NOTE + % Intentionally skipping intermediate traversal errors here, for simplicity. + next_leaf(ItNext); + none -> + none + end. + +%% + +-spec seek([file:name()], root(), glob()) -> + t(). +seek(PathSeek, Root, Glob) -> + SeekGlob = mk_seek_glob(PathSeek, Glob), + SeekStack = lists:reverse(PathSeek), + case next_leaf(new(Root, SeekGlob)) of + {{leaf, _Filepath, _Info, SeekStack}, It} -> + fixup_glob(Glob, It); + {{leaf, _Filepath, _Info, Successor}, It = #{queue := Queue}} -> + fixup_glob(Glob, It#{queue => [Successor | Queue]}); + none -> + none(Root) + end. + +mk_seek_glob(PathSeek, Glob) -> + % NOTE + % The seek glob is a glob that skips all the nodes / leaves that are lexicographically + % smaller than the seek path. For example, if the seek path is ["a", "b", "c"], and + % the glob is ['*', '*', '*', '*'], then the seek glob is: + % [ fun(Path) -> Path >= ["a"] end, + % fun(Path) -> Path >= ["a", "b"] end, + % fun(Path) -> Path >= ["a", "b", "c"] end, + % '*' + % ] + L = min(length(PathSeek), length(Glob)), + merge_glob([mk_seek_pat(lists:sublist(PathSeek, N)) || N <- lists:seq(1, L)], Glob). + +mk_seek_pat(PathSeek) -> + % NOTE + % The `PathStack` and `PathSeek` are of the same length here. + fun(_Filename, PathStack) -> lists:reverse(PathStack) >= PathSeek end. + +merge_glob([Pat | SeekRest], [PatOrig | Rest]) -> + [merge_pat(Pat, PatOrig) | merge_glob(SeekRest, Rest)]; +merge_glob([], [PatOrig | Rest]) -> + [PatOrig | merge_glob([], Rest)]; +merge_glob([], []) -> + []. + +merge_pat(Pat, PatOrig) -> + fun(Filename, PathStack) -> + Pat(Filename, PathStack) andalso matches_glob(PatOrig, Filename, PathStack) + end. + +fixup_glob(Glob, It = #{head := [], stack := Stack}) -> + % NOTE + % Restoring original glob through the stack. Strictly speaking, this is not usually + % necessary, it's a kind of optimization. + fixup_glob(Glob, lists:reverse(Stack), It#{stack => []}). + +fixup_glob(Glob = [_ | Rest], [{Queue, _} | StackRest], It = #{stack := Stack}) -> + fixup_glob(Rest, StackRest, It#{stack => [{Queue, Glob} | Stack]}); +fixup_glob(Rest, [], It) -> + It#{head => Rest}. + +%% + +-spec fold(fun((entry(), Acc) -> Acc), Acc, t()) -> + Acc. +fold(FoldFun, Acc, It) -> + case next(It) of + {Entry, ItNext} -> + fold(FoldFun, FoldFun(Entry, Acc), ItNext); + none -> + Acc + end. + +%% NOTE +%% Passing negative `N` is allowed, in which case the iterator will be exhausted +%% completely, like in `fold/3`. +-spec fold_n(fun((entry(), Acc) -> Acc), Acc, t(), _N :: integer()) -> + {Acc, {more, t()} | none}. +fold_n(_FoldFun, Acc, It, 0) -> + {Acc, {more, It}}; +fold_n(FoldFun, Acc, It, N) -> + case next(It) of + {Entry, ItNext} -> + fold_n(FoldFun, FoldFun(Entry, Acc), ItNext, N - 1); + none -> + {Acc, none} + end. + +%% + +-spec none(root()) -> + t(). +none(Root) -> + % NOTE + % The _none_ iterator is a valid iterator, but it will never yield any entries. + #{ + root => Root, + queue => [], + head => [], + stack => [] + }. diff --git a/apps/emqx_ft/src/emqx_ft_fs_util.erl b/apps/emqx_ft/src/emqx_ft_fs_util.erl index df9135816..b731d3270 100644 --- a/apps/emqx_ft/src/emqx_ft_fs_util.erl +++ b/apps/emqx_ft/src/emqx_ft_fs_util.erl @@ -25,18 +25,16 @@ -export([read_decode_file/2]). -export([read_info/1]). +-export([list_dir/1]). -export([fold/4]). --type glob() :: ['*' | globfun()]. --type globfun() :: - fun((_Filename :: file:name()) -> boolean()). -type foldfun(Acc) :: fun( ( _Filepath :: file:name(), - _Info :: file:file_info() | {error, _IoError}, - _Stack :: [file:name()], + _Info :: file:file_info() | {error, file:posix()}, + _Stack :: emqx_ft_fs_iterator:pathstack(), Acc ) -> Acc ). @@ -153,46 +151,8 @@ read_info(AbsPath) -> % Be aware that this function is occasionally mocked in `emqx_ft_fs_util_SUITE`. file:read_link_info(AbsPath, [{time, posix}, raw]). --spec fold(foldfun(Acc), Acc, _Root :: file:name(), glob()) -> - Acc. -fold(Fun, Acc, Root, Glob) -> - fold(Fun, Acc, [], Root, Glob, []). - -fold(Fun, AccIn, Path, Root, [Glob | Rest], Stack) when Glob == '*' orelse is_function(Glob) -> - case list_dir(filename:join(Root, Path)) of - {ok, Filenames} -> - lists:foldl( - fun(FN, Acc) -> - case matches_glob(Glob, FN) of - true when Path == [] -> - fold(Fun, Acc, FN, Root, Rest, [FN | Stack]); - true -> - fold(Fun, Acc, filename:join(Path, FN), Root, Rest, [FN | Stack]); - false -> - Acc - end - end, - AccIn, - Filenames - ); - {error, enotdir} -> - AccIn; - {error, Reason} -> - Fun(Path, {error, Reason}, Stack, AccIn) - end; -fold(Fun, AccIn, Filepath, Root, [], Stack) -> - case ?MODULE:read_info(filename:join(Root, Filepath)) of - {ok, Info} -> - Fun(Filepath, Info, Stack, AccIn); - {error, Reason} -> - Fun(Filepath, {error, Reason}, Stack, AccIn) - end. - -matches_glob('*', _) -> - true; -matches_glob(FilterFun, Filename) when is_function(FilterFun) -> - FilterFun(Filename). - +-spec list_dir(file:name_all()) -> + {ok, [file:name()]} | {error, file:posix() | badarg}. list_dir(AbsPath) -> case ?MODULE:read_info(AbsPath) of {ok, #file_info{type = directory}} -> @@ -202,3 +162,19 @@ list_dir(AbsPath) -> {error, Reason} -> {error, Reason} end. + +-spec fold(foldfun(Acc), Acc, _Root :: file:name(), emqx_ft_fs_iterator:glob()) -> + Acc. +fold(FoldFun, Acc, Root, Glob) -> + fold(FoldFun, Acc, emqx_ft_fs_iterator:new(Root, Glob)). + +fold(FoldFun, Acc, It) -> + case emqx_ft_fs_iterator:next(It) of + {{node, _Path, {error, enotdir}, _PathStack}, ItNext} -> + fold(FoldFun, Acc, ItNext); + {{_Type, Path, Info, PathStack}, ItNext} -> + AccNext = FoldFun(Path, Info, PathStack, Acc), + fold(FoldFun, AccNext, ItNext); + none -> + Acc + end. diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index fee16cd09..5364211a4 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -25,6 +25,7 @@ assemble/2, files/0, + files/1, with_storage_type/2, with_storage_type/3, @@ -36,12 +37,27 @@ -type storage() :: emqx_config:config(). -export_type([assemble_callback/0]). + +-export_type([query/1]). +-export_type([page/2]). -export_type([file_info/0]). -export_type([export_data/0]). -export_type([reader/0]). -type assemble_callback() :: fun((ok | {error, term()}) -> any()). +-type query(Cursor) :: + #{transfer => emqx_ft:transfer()} + | #{ + limit => non_neg_integer(), + following => Cursor + }. + +-type page(Item, Cursor) :: #{ + items := [Item], + cursor => Cursor +}. + -type file_info() :: #{ transfer := emqx_ft:transfer(), name := file:name(), @@ -71,8 +87,8 @@ -callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. --callback files(storage()) -> - {ok, [file_info()]} | {error, term()}. +-callback files(storage(), query(Cursor)) -> + {ok, page(file_info(), Cursor)} | {error, term()}. %%-------------------------------------------------------------------- %% API @@ -105,9 +121,14 @@ assemble(Transfer, Size) -> with_storage(assemble, [Transfer, Size]). -spec files() -> - {ok, [file_info()]} | {error, term()}. + {ok, page(file_info(), _)} | {error, term()}. files() -> - with_storage(files, []). + files(#{}). + +-spec files(query(Cursor)) -> + {ok, page(file_info(), Cursor)} | {error, term()}. +files(Query) -> + with_storage(files, [Query]). -spec with_storage(atom() | function()) -> any(). with_storage(Fun) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index 72128cb40..fb44093c1 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -17,7 +17,7 @@ %% Filesystem storage exporter %% %% This is conceptually a part of the Filesystem storage backend that defines -%% how and where complete tranfers are assembled into files and stored. +%% how and where complete transfers are assembled into files and stored. -module(emqx_ft_storage_exporter). @@ -28,7 +28,7 @@ -export([discard/1]). %% Listing API --export([list/1]). +-export([list/2]). %% Lifecycle API -export([on_config_update/2]). @@ -70,8 +70,8 @@ -callback discard(ExportSt :: export_st()) -> ok | {error, _Reason}. --callback list(storage()) -> - {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}. +-callback list(exporter_conf(), emqx_ft_storage:query(Cursor)) -> + {ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}. %% Lifecycle callbacks @@ -133,11 +133,11 @@ complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemet discard(#{mod := ExporterMod, st := ExportSt}) -> ExporterMod:discard(ExportSt). --spec list(storage()) -> - {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}. -list(Storage) -> +-spec list(storage(), emqx_ft_storage:query(Cursor)) -> + {ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}. +list(Storage, Query) -> {ExporterMod, ExporterOpts} = exporter(Storage), - ExporterMod:list(ExporterOpts). + ExporterMod:list(ExporterOpts, Query). %% Lifecycle diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 4b05c9a58..9109dadbb 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -37,12 +37,23 @@ %% Internal API for RPC -export([list_local/1]). -export([list_local/2]). +-export([list_local_transfer/2]). -export([start_reader/3]). -% TODO -% -export([list/2]). +-export([list/2]). + +-export_type([export_st/0]). +-export_type([options/0]). + +-type options() :: #{ + root => file:name(), + _ => _ +}. + +-type query() :: emqx_ft_storage:query(cursor()). +-type page(T) :: emqx_ft_storage:page(T, cursor()). +-type cursor() :: iodata(). --type options() :: _TODO. -type transfer() :: emqx_ft:transfer(). -type filemeta() :: emqx_ft:filemeta(). -type exportinfo() :: emqx_ft_storage:file_info(). @@ -70,22 +81,6 @@ %% 2 symbols = at most 256 directories on the second level -define(BUCKET2_LEN, 2). --define(SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options), - ?SLOG(notice, "filesystem_object_unexpected", #{ - relpath => RelFilepath, - fileinfo => Fileinfo, - options => Options - }) -). - --define(SLOG_INACCESSIBLE(RelFilepath, Reason, Options), - ?SLOG(warning, "filesystem_object_inaccessible", #{ - relpath => RelFilepath, - reason => Reason, - options => Options - }) -). - %%-------------------------------------------------------------------- %% Exporter behaviour %%-------------------------------------------------------------------- @@ -162,33 +157,33 @@ update(_OldOptions, _NewOptions) -> ok. %% Internal API %%-------------------------------------------------------------------- --spec list_local(options(), transfer()) -> - {ok, [exportinfo(), ...]} | {error, file_error()}. -list_local(Options, Transfer) -> - TransferRoot = mk_absdir(Options, Transfer, result), - case - emqx_ft_fs_util:fold( - fun - (_Path, {error, Reason}, [], []) -> - {error, Reason}; - (_Path, Fileinfo = #file_info{type = regular}, [Filename | _], Acc) -> - RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]), - Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo), - [Info | Acc]; - (RelFilepath, Fileinfo = #file_info{}, _, Acc) -> - ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options), - Acc; - (RelFilepath, {error, Reason}, _, Acc) -> - ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options), - Acc - end, - [], - TransferRoot, - [fun filter_manifest/1] - ) - of +-type local_query() :: emqx_ft_storage:query({transfer(), file:name()}). + +-spec list_local_transfer(options(), transfer()) -> + {ok, [exportinfo()]} | {error, file_error()}. +list_local_transfer(Options, Transfer) -> + It = emqx_ft_fs_iterator:new( + mk_absdir(Options, Transfer, result), + [fun filter_manifest/1] + ), + Result = emqx_ft_fs_iterator:fold( + fun + ({leaf, _Path, Fileinfo = #file_info{type = regular}, [Filename | _]}, Acc) -> + RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]), + Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo), + [Info | Acc]; + ({node, _Path, {error, Reason}, []}, []) -> + {error, Reason}; + (Entry, Acc) -> + ok = log_invalid_entry(Options, Entry), + Acc + end, + [], + It + ), + case Result of Infos = [_ | _] -> - {ok, Infos}; + {ok, lists:reverse(Infos)}; [] -> {error, enoent}; {error, Reason} -> @@ -196,9 +191,17 @@ list_local(Options, Transfer) -> end. -spec list_local(options()) -> - {ok, #{transfer() => [exportinfo(), ...]}}. + {ok, [exportinfo()]} | {error, file_error()}. list_local(Options) -> - Pattern = [ + list_local(Options, #{}). + +-spec list_local(options(), local_query()) -> + {ok, [exportinfo()]} | {error, file_error()}. +list_local(Options, #{transfer := Transfer}) -> + list_local_transfer(Options, Transfer); +list_local(Options, #{} = Query) -> + Root = get_storage_root(Options), + Glob = [ _Bucket1 = '*', _Bucket2 = '*', _Rest = '*', @@ -206,16 +209,30 @@ list_local(Options) -> _FileId = '*', fun filter_manifest/1 ], - Root = get_storage_root(Options), - {ok, - emqx_ft_fs_util:fold( - fun(RelFilepath, Info, Stack, Acc) -> - read_exportinfo(Options, RelFilepath, Info, Stack, Acc) - end, - [], - Root, - Pattern - )}. + It = + case Query of + #{following := Cursor} -> + emqx_ft_fs_iterator:seek(mk_path_seek(Cursor), Root, Glob); + #{} -> + emqx_ft_fs_iterator:new(Root, Glob) + end, + % NOTE + % In the rare case when some transfer contain more than one file, the paging mechanic + % here may skip over some files, when the cursor is transfer-only. + Limit = maps:get(limit, Query, -1), + {Exports, _} = emqx_ft_fs_iterator:fold_n( + fun(Entry, Acc) -> read_exportinfo(Options, Entry, Acc) end, + [], + It, + Limit + ), + {ok, Exports}. + +mk_path_seek(#{transfer := Transfer, name := Filename}) -> + mk_result_reldir(Transfer) ++ [Filename]; +mk_path_seek(#{transfer := Transfer}) -> + % NOTE: Any bitstring is greater than any list. + mk_result_reldir(Transfer) ++ [<<>>]. %%-------------------------------------------------------------------- %% Helpers @@ -227,16 +244,21 @@ filter_manifest(?MANIFEST) -> filter_manifest(Filename) -> ?MANIFEST =/= string:find(Filename, ?MANIFEST, trailing). -read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{type = regular}, Stack, Acc) -> - [Filename, FileId, ClientId | _] = Stack, +read_exportinfo( + Options, + {leaf, RelFilepath, Fileinfo = #file_info{type = regular}, [Filename, FileId, ClientId | _]}, + Acc +) -> + % NOTE + % There might be more than one file for a single transfer (though + % extremely bad luck is needed for that, e.g. concurrent assemblers with + % different filemetas from different nodes). This might be unexpected for a + % client given the current protocol, yet might be helpful in the future. Transfer = dirnames_to_transfer(ClientId, FileId), Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo), [Info | Acc]; -read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{}, _Stack, Acc) -> - ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options), - Acc; -read_exportinfo(Options, RelFilepath, {error, Reason}, _Stack, Acc) -> - ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options), +read_exportinfo(Options, Entry, Acc) -> + ok = log_invalid_entry(Options, Entry), Acc. mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) -> @@ -268,6 +290,19 @@ try_read_filemeta(Filepath, Info) -> mk_export_uri(RelFilepath) -> emqx_ft_storage_exporter_fs_api:mk_export_uri(node(), RelFilepath). +log_invalid_entry(Options, {_Type, RelFilepath, Fileinfo = #file_info{}, _Stack}) -> + ?SLOG(notice, "filesystem_object_unexpected", #{ + relpath => RelFilepath, + fileinfo => Fileinfo, + options => Options + }); +log_invalid_entry(Options, {_Type, RelFilepath, {error, Reason}, _Stack}) -> + ?SLOG(warning, "filesystem_object_inaccessible", #{ + relpath => RelFilepath, + reason => Reason, + options => Options + }). + -spec start_reader(options(), file:name(), _Caller :: pid()) -> {ok, reader()} | {error, enoent}. start_reader(Options, RelFilepath, CallerPid) -> @@ -282,32 +317,112 @@ start_reader(Options, RelFilepath, CallerPid) -> %% --spec list(options()) -> - {ok, [exportinfo(), ...]} | {error, [{node(), _Reason}]}. -list(_Options) -> - Nodes = mria_mnesia:running_nodes(), - Replies = emqx_ft_storage_exporter_fs_proto_v1:list_exports(Nodes), - {Results, Errors} = lists:foldl( - fun - ({_Node, {ok, {ok, Files}}}, {Acc, Errors}) -> - {Files ++ Acc, Errors}; - ({Node, {ok, {error, _} = Error}}, {Acc, Errors}) -> - {Acc, [{Node, Error} | Errors]}; - ({Node, Error}, {Acc, Errors}) -> - {Acc, [{Node, Error} | Errors]} - end, - {[], []}, - lists:zip(Nodes, Replies) - ), - length(Errors) > 0 andalso - ?SLOG(warning, #{msg => "list_remote_exports_failed", errors => Errors}), - case Results of - [_ | _] -> - {ok, Results}; - [] when Errors =:= [] -> - {ok, Results}; - [] -> - {error, Errors} +-spec list(options(), query()) -> + {ok, page(exportinfo())} | {error, [{node(), _Reason}]}. +list(_Options, Query = #{transfer := _Transfer}) -> + case list(Query) of + #{items := Exports = [_ | _]} -> + {ok, #{items => Exports}}; + #{items := [], errors := NodeErrors} -> + {error, NodeErrors} + end; +list(_Options, Query) -> + Result = list(Query), + case Result of + #{errors := NodeErrors} -> + ?SLOG(warning, "list_exports_errors", #{ + query => Query, + errors => NodeErrors + }); + #{} -> + ok + end, + case Result of + #{items := Exports, cursor := Cursor} -> + {ok, #{items => lists:reverse(Exports), cursor => encode_cursor(Cursor)}}; + #{items := Exports} -> + {ok, #{items => lists:reverse(Exports)}} + end. + +list(QueryIn) -> + {Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(mria_mnesia:running_nodes())), + list_nodes(NodeQuery, Nodes, #{items => []}). + +list_nodes(Query, Nodes = [Node | Rest], Acc) -> + case emqx_ft_storage_exporter_fs_proto_v1:list_exports([Node], Query) of + [{ok, Result}] -> + list_accumulate(Result, Query, Nodes, Acc); + [Failure] -> + ?SLOG(warning, #{ + msg => "list_remote_exports_failed", + node => Node, + query => Query, + failure => Failure + }), + list_next(Query, Rest, Acc) + end; +list_nodes(_Query, [], Acc) -> + Acc. + +list_accumulate({ok, Exports}, Query, [Node | Rest], Acc = #{items := EAcc}) -> + NExports = length(Exports), + AccNext = Acc#{items := Exports ++ EAcc}, + case Query of + #{limit := Limit} when NExports < Limit -> + list_next(Query#{limit => Limit - NExports}, Rest, AccNext); + #{limit := _} -> + AccNext#{cursor => mk_cursor(Node, Exports)}; + #{} -> + list_next(Query, Rest, AccNext) + end; +list_accumulate({error, Reason}, Query, [Node | Rest], Acc) -> + EAcc = maps:get(errors, Acc, []), + list_next(Query, Rest, Acc#{errors => [{Node, Reason} | EAcc]}). + +list_next(Query, Nodes, Acc) -> + list_nodes(maps:remove(following, Query), Nodes, Acc). + +decode_query(Query = #{following := Cursor}, Nodes) -> + {Node, NodeCursor} = decode_cursor(Cursor), + {skip_query_nodes(Node, Nodes), Query#{following => NodeCursor}}; +decode_query(Query = #{}, Nodes) -> + {Nodes, Query}. + +skip_query_nodes(CNode, Nodes) -> + lists:dropwhile(fun(N) -> N < CNode end, Nodes). + +mk_cursor(Node, [_Last = #{transfer := Transfer, name := Name} | _]) -> + {Node, #{transfer => Transfer, name => Name}}. + +encode_cursor({Node, #{transfer := {ClientId, FileId}, name := Name}}) -> + emqx_utils_json:encode(#{ + <<"n">> => Node, + <<"cid">> => ClientId, + <<"fid">> => FileId, + <<"fn">> => unicode:characters_to_binary(Name) + }). + +decode_cursor(Cursor) -> + try + #{ + <<"n">> := NodeIn, + <<"cid">> := ClientId, + <<"fid">> := FileId, + <<"fn">> := NameIn + } = emqx_utils_json:decode(Cursor), + true = is_binary(ClientId), + true = is_binary(FileId), + Node = binary_to_existing_atom(NodeIn), + Name = unicode:characters_to_list(NameIn), + true = is_list(Name), + {Node, #{transfer => {ClientId, FileId}, name => Name}} + catch + error:{_, invalid_json} -> + error({badarg, cursor}); + error:{badmatch, _} -> + error({badarg, cursor}); + error:badarg -> + error({badarg, cursor}) end. %% @@ -352,9 +467,9 @@ mk_result_reldir(Transfer = {ClientId, FileId}) -> BucketRest/binary >> = binary:encode_hex(Hash), [ - Bucket1, - Bucket2, - BucketRest, + binary_to_list(Bucket1), + binary_to_list(Bucket2), + binary_to_list(BucketRest), emqx_ft_fs_util:escape_filename(ClientId), emqx_ft_fs_util:escape_filename(FileId) ]. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl index 943c053ff..13160bfc6 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl @@ -21,15 +21,15 @@ -module(emqx_ft_storage_exporter_fs_proxy). -export([ - list_exports_local/0, + list_exports_local/1, read_export_file_local/2 ]). -list_exports_local() -> +list_exports_local(Query) -> emqx_ft_storage:with_storage_type(local, fun(Storage) -> case emqx_ft_storage_exporter:exporter(Storage) of {emqx_ft_storage_exporter_fs, Options} -> - emqx_ft_storage_exporter_fs:list_local(Options) + emqx_ft_storage_exporter_fs:list_local(Options, Query) % NOTE % This case clause is currently deemed unreachable by dialyzer. % InvalidExporter -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index adf000346..c7110c74a 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -23,7 +23,7 @@ -export([write/2]). -export([complete/2]). -export([discard/1]). --export([list/1]). +-export([list/2]). -export([ start/1, @@ -43,6 +43,10 @@ filemeta => filemeta() }. +-type query() :: emqx_ft_storage:query(cursor()). +-type page(T) :: emqx_ft_storage:page(T, cursor()). +-type cursor() :: iodata(). + -type export_st() :: #{ pid := pid(), filemeta := filemeta(), @@ -92,10 +96,10 @@ complete(#{pid := Pid} = _ExportSt, _Checksum) -> discard(#{pid := Pid} = _ExportSt) -> emqx_s3_uploader:abort(Pid). --spec list(options()) -> - {ok, [exportinfo()]} | {error, term()}. -list(Options) -> - emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options) end). +-spec list(options(), query()) -> + {ok, page(exportinfo())} | {error, term()}. +list(Options, Query) -> + emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options, Query) end). %%-------------------------------------------------------------------- %% Exporter behaviour (lifecycle) @@ -117,12 +121,11 @@ update(_OldOptions, NewOptions) -> %% Internal functions %% ------------------------------------------------------------------- -s3_key({ClientId, FileId} = _Transfer, #{name := Filename}) -> - filename:join([ - emqx_ft_fs_util:escape_filename(ClientId), - emqx_ft_fs_util:escape_filename(FileId), - Filename - ]). +s3_key(Transfer, #{name := Filename}) -> + s3_prefix(Transfer) ++ "/" ++ Filename. + +s3_prefix({ClientId, FileId} = _Transfer) -> + emqx_ft_fs_util:escape_filename(ClientId) ++ "/" ++ emqx_ft_fs_util:escape_filename(FileId). s3_headers({ClientId, FileId}, Filemeta) -> #{ @@ -137,54 +140,97 @@ s3_headers({ClientId, FileId}, Filemeta) -> s3_header_filemeta(Filemeta) -> emqx_utils_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]). -list(Client, Options) -> - case list_key_info(Client, Options) of - {ok, KeyInfos} -> - MaybeExportInfos = lists:map( - fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos - ), - ExportInfos = [ExportInfo || {ok, ExportInfo} <- MaybeExportInfos], - {ok, ExportInfos}; +list(Client, _Options, #{transfer := Transfer}) -> + case list_key_info(Client, [{prefix, s3_prefix(Transfer)}, {max_keys, ?S3_LIST_LIMIT}]) of + {ok, {Exports, _Marker}} -> + {ok, #{items => Exports}}; + {error, _Reason} = Error -> + Error + end; +list(Client, _Options, Query) -> + Limit = maps:get(limit, Query, undefined), + Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)), + case list_pages(Client, Marker, Limit, []) of + {ok, {Exports, undefined}} -> + {ok, #{items => Exports}}; + {ok, {Exports, NextMarker}} -> + {ok, #{items => Exports, cursor => encode_cursor(NextMarker)}}; {error, _Reason} = Error -> Error end. -list_key_info(Client, Options) -> - list_key_info(Client, Options, _Marker = [], _Acc = []). +list_pages(Client, Marker, Limit, Acc) -> + MaxKeys = min(?S3_LIST_LIMIT, Limit), + ListOptions = [{marker, Marker} || Marker =/= undefined], + case list_key_info(Client, [{max_keys, MaxKeys} | ListOptions]) of + {ok, {Exports, NextMarker}} -> + list_accumulate(Client, Limit, NextMarker, [Exports | Acc]); + {error, _Reason} = Error -> + Error + end. -list_key_info(Client, Options, Marker, Acc) -> - ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker, +list_accumulate(_Client, _Limit, undefined, Acc) -> + {ok, {flatten_pages(Acc), undefined}}; +list_accumulate(Client, undefined, Marker, Acc) -> + list_pages(Client, Marker, undefined, Acc); +list_accumulate(Client, Limit, Marker, Acc = [Exports | _]) -> + case Limit - length(Exports) of + 0 -> + {ok, {flatten_pages(Acc), Marker}}; + Left -> + list_pages(Client, Marker, Left, Acc) + end. + +flatten_pages(Pages) -> + lists:append(lists:reverse(Pages)). + +list_key_info(Client, ListOptions) -> case emqx_s3_client:list(Client, ListOptions) of {ok, Result} -> ?SLOG(debug, #{msg => "list_key_info", result => Result}), KeyInfos = proplists:get_value(contents, Result, []), - case proplists:get_value(is_truncated, Result, false) of - true -> - NewMarker = next_marker(KeyInfos), - list_key_info(Client, Options, NewMarker, [KeyInfos | Acc]); - false -> - {ok, lists:append(lists:reverse([KeyInfos | Acc]))} - end; + Exports = lists:filtermap( + fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo) end, KeyInfos + ), + Marker = + case proplists:get_value(is_truncated, Result, false) of + true -> + next_marker(KeyInfos); + false -> + undefined + end, + {ok, {Exports, Marker}}; {error, _Reason} = Error -> Error end. -next_marker(KeyInfos) -> - [{marker, proplists:get_value(key, lists:last(KeyInfos))}]. +encode_cursor(Key) -> + unicode:characters_to_binary(Key). -key_info_to_exportinfo(Client, KeyInfo, _Options) -> +decode_cursor(Cursor) -> + case unicode:characters_to_list(Cursor) of + Key when is_list(Key) -> + Key; + _ -> + error({badarg, cursor}) + end. + +next_marker(KeyInfos) -> + proplists:get_value(key, lists:last(KeyInfos)). + +key_info_to_exportinfo(Client, KeyInfo) -> Key = proplists:get_value(key, KeyInfo), case parse_transfer_and_name(Key) of {ok, {Transfer, Name}} -> - {ok, #{ + {true, #{ transfer => Transfer, name => unicode:characters_to_binary(Name), uri => emqx_s3_client:uri(Client, Key), timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)), size => proplists:get_value(size, KeyInfo) }}; - {error, _Reason} = Error -> - Error + {error, _Reason} -> + false end. -define(EPOCH_START, 62167219200). diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 823407307..bd88727c0 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -45,7 +45,7 @@ -export([get_subdir/2]). -export([get_subdir/3]). --export([files/1]). +-export([files/2]). -export([on_config_update/2]). @@ -217,8 +217,8 @@ assemble(Storage, Transfer, Size) -> %% -files(Storage) -> - emqx_ft_storage_exporter:list(Storage). +files(Storage, Query) -> + emqx_ft_storage_exporter:list(Storage, Query). %% diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl index 4c64011de..9ca6db786 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl @@ -20,7 +20,7 @@ -export([introduced_in/0]). --export([list_exports/1]). +-export([list_exports/2]). -export([read_export_file/3]). -include_lib("emqx/include/bpapi.hrl"). @@ -28,14 +28,17 @@ introduced_in() -> "5.0.17". --spec list_exports([node()]) -> - emqx_rpc:erpc_multicall([emqx_ft_storage:file_info()]). -list_exports(Nodes) -> +-spec list_exports([node()], emqx_ft_storage:query(_LocalCursor)) -> + emqx_rpc:erpc_multicall( + {ok, [emqx_ft_storage:file_info()]} + | {error, file:posix() | disabled | {invalid_storage_type, _}} + ). +list_exports(Nodes, Query) -> erpc:multicall( Nodes, emqx_ft_storage_exporter_fs_proxy, list_exports_local, - [] + [Query] ). -spec read_export_file(node(), file:name(), pid()) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index d3a3aee21..929665ca9 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -671,7 +671,7 @@ encode_meta(Meta) -> emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)). list_files(ClientId) -> - {ok, Files} = emqx_ft_storage:files(), + {ok, #{items := Files}} = emqx_ft_storage:files(), [File || File = #{transfer := {CId, _}} <- Files, CId == ClientId]. read_export(#{path := AbsFilepath}) -> diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 523026d5a..4efa31205 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -22,11 +22,19 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --include_lib("emqx/include/asserts.hrl"). - -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, single}, + {group, cluster} + ]. + +groups() -> + [ + {single, [], emqx_common_test_helpers:all(?MODULE)}, + {cluster, [], emqx_common_test_helpers:all(?MODULE)} + ]. init_per_suite(Config) -> ok = emqx_mgmt_api_test_util:init_suite( @@ -38,6 +46,41 @@ end_per_suite(_Config) -> ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]), ok. +init_per_group(Group = cluster, Config) -> + Cluster = mk_cluster_specs(Config), + ct:pal("Starting ~p", [Cluster]), + Nodes = [ + emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()}) + || {Name, Opts} <- Cluster + ], + [{group, Group}, {cluster_nodes, Nodes} | Config]; +init_per_group(Group, Config) -> + [{group, Group} | Config]. + +end_per_group(cluster, Config) -> + ok = lists:foreach( + fun emqx_ft_test_helpers:stop_additional_node/1, + ?config(cluster_nodes, Config) + ); +end_per_group(_Group, _Config) -> + ok. + +mk_cluster_specs(Config) -> + Specs = [ + {core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}}, + {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}} + ], + CommOpts = [ + {env, [{emqx, boot_modules, [broker, listeners]}]}, + {apps, [emqx_ft]}, + {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, + {env_handler, emqx_ft_test_helpers:env_handler(Config)} + ], + emqx_common_test_helpers:emqx_cluster( + Specs, + CommOpts + ). + init_per_testcase(Case, Config) -> [{tc, Case} | Config]. end_per_testcase(_Case, _Config) -> @@ -47,30 +90,46 @@ end_per_testcase(_Case, _Config) -> %% Tests %%-------------------------------------------------------------------- -t_list_ready_transfers(Config) -> +t_list_files(Config) -> ClientId = client_id(Config), + FileId = <<"f1">>, - ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, "f1", <<"data">>), + Node = lists:last(cluster(Config)), + ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), {ok, 200, #{<<"files">> := Files}} = - request(get, uri(["file_transfer", "files"]), fun json/1), + request_json(get, uri(["file_transfer", "files"])), - ?assertInclude( - #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}, - Files + ?assertMatch( + [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}], + [File || File = #{<<"clientid">> := CId} <- Files, CId == ClientId] + ), + + {ok, 200, #{<<"files">> := FilesTransfer}} = + request_json(get, uri(["file_transfer", "files", ClientId, FileId])), + + ?assertMatch( + [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}], + FilesTransfer + ), + + ?assertMatch( + {ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}}, + request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>])) ). t_download_transfer(Config) -> ClientId = client_id(Config), + FileId = <<"f1">>, - ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, "f1", <<"data">>), + Node = lists:last(cluster(Config)), + ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, - request( + request_json( get, - uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}), - fun json/1 + uri(["file_transfer", "file"]) ++ query(#{fileref => FileId}) ) ), @@ -80,7 +139,7 @@ t_download_transfer(Config) -> get, uri(["file_transfer", "file"]) ++ query(#{ - fileref => <<"f1">>, + fileref => FileId, node => <<"nonode@nohost">> }) ) @@ -99,7 +158,7 @@ t_download_transfer(Config) -> ), {ok, 200, #{<<"files">> := [File]}} = - request(get, uri(["file_transfer", "files"]), fun json/1), + request_json(get, uri(["file_transfer", "files", ClientId, FileId])), {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)), @@ -108,20 +167,85 @@ t_download_transfer(Config) -> Response ). +t_list_files_paging(Config) -> + ClientId = client_id(Config), + NFiles = 20, + Nodes = cluster(Config), + Uploads = [ + {mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)} + || N <- lists:seq(1, NFiles) + ], + ok = lists:foreach( + fun({FileId, Name, Node}) -> + ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, <<"data">>, Node) + end, + Uploads + ), + + ?assertMatch( + {ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}}, + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3})) + ), + + {ok, 200, #{<<"files">> := Files}} = + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})), + + ?assert(length(Files) >= NFiles), + + ?assertNotMatch( + {ok, 200, #{<<"cursor">> := _}}, + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})) + ), + + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0})) + ), + + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json( + get, + uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>}) + ) + ), + + PageThrough = fun PageThrough(Query, Acc) -> + case request_json(get, uri(["file_transfer", "files"]) ++ query(Query)) of + {ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} -> + PageThrough(Query#{following => Cursor}, Acc ++ FilesPage); + {ok, 200, #{<<"files">> := FilesPage}} -> + Acc ++ FilesPage + end + end, + + ?assertEqual(Files, PageThrough(#{limit => 1}, [])), + ?assertEqual(Files, PageThrough(#{limit => 8}, [])), + ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- +cluster(Config) -> + [node() | proplists:get_value(cluster_nodes, Config, [])]. + client_id(Config) -> - atom_to_binary(?config(tc, Config), utf8). + iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])). + +mk_file_id(Prefix, N) -> + iolist_to_binary([Prefix, integer_to_list(N)]). + +mk_file_name(N) -> + "file." ++ integer_to_list(N). request(Method, Url) -> emqx_mgmt_api_test_util:request(Method, Url, []). -request(Method, Url, Decoder) when is_function(Decoder) -> +request_json(Method, Url) -> case emqx_mgmt_api_test_util:request(Method, Url, []) of {ok, Code, Body} -> - {ok, Code, Decoder(Body)}; + {ok, Code, json(Body)}; Otherwise -> Otherwise end. @@ -138,7 +262,12 @@ uri_encode(T) -> to_list(A) when is_atom(A) -> atom_to_list(A); +to_list(A) when is_integer(A) -> + integer_to_list(A); to_list(B) when is_binary(B) -> binary_to_list(B); to_list(L) when is_list(L) -> L. + +pick(N, List) -> + lists:nth(1 + (N rem length(List)), List). diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index a7323fc0e..24a4593c2 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -240,7 +240,7 @@ list_exports(Config) -> list_exports(Config, Transfer) -> {emqx_ft_storage_exporter_fs, Options} = exporter(Config), - emqx_ft_storage_exporter_fs:list_local(Options, Transfer). + emqx_ft_storage_exporter_fs:list_local_transfer(Options, Transfer). exporter(Config) -> emqx_ft_storage_exporter:exporter(storage(Config)). diff --git a/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl b/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl index 81a483651..e4aa70f81 100644 --- a/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl @@ -34,7 +34,7 @@ t_fold_single_level(Config) -> {"c", #file_info{type = directory}, ["c"]}, {"d", #file_info{type = directory}, ["d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*'])) + sort(fold(fun cons/4, [], Root, ['*'])) ). t_fold_multi_level(Config) -> @@ -45,7 +45,7 @@ t_fold_multi_level(Config) -> {"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}, {"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) ), ?assertMatch( [ @@ -53,32 +53,32 @@ t_fold_multi_level(Config) -> {"c/bar/中文", #file_info{type = regular}, ["中文", "bar", "c"]}, {"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*'])) ). t_fold_no_glob(Config) -> Root = ?config(data_dir, Config), ?assertMatch( [{"", #file_info{type = directory}, []}], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, [])) + sort(fold(fun cons/4, [], Root, [])) ). t_fold_glob_too_deep(Config) -> Root = ?config(data_dir, Config), ?assertMatch( [], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*'])) ). t_fold_invalid_root(Config) -> Root = ?config(data_dir, Config), ?assertMatch( [], - sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*'])) + sort(fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*'])) ), ?assertMatch( [], - sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*'])) + sort(fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*'])) ). t_fold_filter_unicode(Config) -> @@ -88,13 +88,13 @@ t_fold_filter_unicode(Config) -> {"a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]}, {"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1])) ), ?assertMatch( [ {"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)])) ). t_fold_filter_levels(Config) -> @@ -104,7 +104,7 @@ t_fold_filter_levels(Config) -> {"a/b/foo", #file_info{type = directory}, ["foo", "b", "a"]}, {"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*'])) + sort(fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*'])) ). t_fold_errors(Config) -> @@ -128,11 +128,99 @@ t_fold_errors(Config) -> {"c/link", {error, enotsup}, ["link", "c"]}, {"d/e/baz/needle", {error, ebusy}, ["needle", "baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) + ). + +t_seek_fold(Config) -> + Root = ?config(data_dir, Config), + ?assertMatch( + [ + {leaf, "a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]}, + {leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}, + {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["a", "a"], Root, ['*', '*', '*', '*']) + ) + ) + ), + ?assertMatch( + [ + {leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}, + {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["a", "b", "foo", "42"], Root, ['*', '*', '*', '*']) + ) + ) + ), + ?assertMatch( + [ + {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["c", "d", "e", "f"], Root, ['*', '*', '*', '*']) + ) + ) + ). + +t_seek_empty(Config) -> + Root = ?config(data_dir, Config), + ?assertEqual( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:new(Root, ['*', '*', '*', '*']) + ), + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek([], Root, ['*', '*', '*', '*']) + ) + ). + +t_seek_past_end(Config) -> + Root = ?config(data_dir, Config), + ?assertEqual( + none, + emqx_ft_fs_iterator:next( + emqx_ft_fs_iterator:seek(["g", "h"], Root, ['*', '*', '*', '*']) + ) + ). + +t_seek_with_filter(Config) -> + Root = ?config(data_dir, Config), + ?assertMatch( + [ + {leaf, "d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["a", "link"], Root, ['*', fun is_letter/1, '*']) + ) + ) ). %% +fold(FoldFun, Acc, Root, Glob) -> + emqx_ft_fs_util:fold(FoldFun, Acc, Root, Glob). + is_not(F) -> fun(X) -> not F(X) end. @@ -155,5 +243,8 @@ is_letter(Filename) -> cons(Path, Info, Stack, Acc) -> [{Path, Info, Stack} | Acc]. +cons(Entry, Acc) -> + [Entry | Acc]. + sort(L) when is_list(L) -> lists:sort(L). diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index d4c13f7d1..5635d981b 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -27,7 +27,7 @@ all() -> {group, cluster} ]. --define(CLUSTER_CASES, [t_multinode_ready_transfers]). +-define(CLUSTER_CASES, [t_multinode_exports]). groups() -> [ @@ -61,7 +61,7 @@ end_per_group(_Group, _Config) -> %% Tests %%-------------------------------------------------------------------- -t_multinode_ready_transfers(Config) -> +t_multinode_exports(Config) -> Node1 = ?config(additional_node, Config), ok = emqx_ft_test_helpers:upload_file(<<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1), @@ -87,5 +87,5 @@ storage(Config) -> emqx_ft_test_helpers:local_storage(Config). list_files(Config) -> - {ok, Files} = emqx_ft_storage_fs:files(storage(Config)), + {ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}), Files. diff --git a/rel/i18n/emqx_ft_api.hocon b/rel/i18n/emqx_ft_api.hocon index 0c67db554..bf6c22411 100644 --- a/rel/i18n/emqx_ft_api.hocon +++ b/rel/i18n/emqx_ft_api.hocon @@ -3,6 +3,9 @@ emqx_ft_api { file_list.desc: """List all uploaded files.""" +file_list_transfer.desc +"""List a file uploaded during specified transfer, identified by client id and file id.""" + } emqx_ft_storage_exporter_fs_api {