From 573bb22ada94110485dc3660a4033981d188cadb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 30 Mar 2023 19:18:47 +0300 Subject: [PATCH] feat(ft-fs): introduce fs iterators concept + forward seeks In order to support paging over filesystem contents, to serve REST APIs effectively. --- apps/emqx_ft/src/emqx_ft_fs_iterator.erl | 219 ++++++++++++++++++++ apps/emqx_ft/src/emqx_ft_fs_util.erl | 66 ++---- apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl | 113 +++++++++- 3 files changed, 342 insertions(+), 56 deletions(-) create mode 100644 apps/emqx_ft/src/emqx_ft_fs_iterator.erl diff --git a/apps/emqx_ft/src/emqx_ft_fs_iterator.erl b/apps/emqx_ft/src/emqx_ft_fs_iterator.erl new file mode 100644 index 000000000..5c8857ab0 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_fs_iterator.erl @@ -0,0 +1,219 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_fs_iterator). + +-export([new/2]). +-export([next/1]). +-export([next_leaf/1]). + +-export([seek/3]). + +-export([fold/3]). + +-export_type([t/0]). +-export_type([glob/0]). +-export_type([pathstack/0]). + +-type root() :: file:name(). +-type glob() :: ['*' | globfun()]. +-type globfun() :: + fun((_Filename :: file:name()) -> boolean()) + | fun((_Filename :: file:name(), pathstack()) -> boolean()). + +% A path stack is a list of path components, in reverse order. +-type pathstack() :: [file:name(), ...]. + +-opaque t() :: #{ + root := root(), + queue := [_PathStack :: [file:name()]], + head := glob(), + stack := [{[pathstack()], glob()}] +}. + +-type entry() :: entry_leaf() | entry_node(). +-type entry_leaf() :: + {leaf, file:name(), file:file_info() | {error, file:posix()}, pathstack()}. +-type entry_node() :: + {node, file:name(), {error, file:posix()}, pathstack()}. + +-spec new(root(), glob()) -> + t(). +new(Root, Glob) -> + #{ + root => Root, + queue => [[]], + head => Glob, + stack => [] + }. + +-spec next(t()) -> + {entry(), t()} | none. +next(It = #{queue := [PathStack | Rest], head := []}) -> + {emit(PathStack, It), It#{queue => Rest}}; +next(It = #{queue := [PathStack | Rest], head := [Pat | _], root := Root}) -> + Filepath = mk_filepath(PathStack), + case emqx_ft_fs_util:list_dir(filename:join(Root, Filepath)) of + {ok, Filenames} -> + Sorted = lists:sort(Filenames), + Matches = [[Fn | PathStack] || Fn <- Sorted, matches_glob(Pat, Fn, [Fn | PathStack])], + ItNext = windup(It), + next(ItNext#{queue => Matches}); + {error, _} = Error -> + {{node, Filepath, Error, PathStack}, It#{queue => Rest}} + end; +next(It = #{queue := []}) -> + unwind(It). + +windup(It = #{queue := [_ | Rest], head := [Pat | Glob], stack := Stack}) -> + % NOTE + % Preserve unfinished paths and glob in the stack, so that we can resume traversal + % when the lower levels of the tree are exhausted. + It#{ + head => Glob, + stack => [{Rest, [Pat | Glob]} | Stack] + }. + +unwind(It = #{stack := [{Queue, Glob} | StackRest]}) -> + % NOTE + % Resume traversal of unfinished paths from the upper levels of the tree. + next(It#{ + queue => Queue, + head => Glob, + stack => StackRest + }); +unwind(#{stack := []}) -> + none. + +emit(PathStack, #{root := Root}) -> + Filepath = mk_filepath(PathStack), + case emqx_ft_fs_util:read_info(filename:join(Root, Filepath)) of + {ok, Fileinfo} -> + {leaf, Filepath, Fileinfo, PathStack}; + {error, _} = Error -> + {leaf, Filepath, Error, PathStack} + end. + +mk_filepath([]) -> + ""; +mk_filepath(PathStack) -> + filename:join(lists:reverse(PathStack)). + +matches_glob('*', _, _) -> + true; +matches_glob(FilterFun, Filename, _PathStack) when is_function(FilterFun, 1) -> + FilterFun(Filename); +matches_glob(FilterFun, Filename, PathStack) when is_function(FilterFun, 2) -> + FilterFun(Filename, PathStack). + +%% + +-spec next_leaf(t()) -> + {entry_leaf(), t()} | none. +next_leaf(It) -> + case next(It) of + {{leaf, _, _, _} = Leaf, ItNext} -> + {Leaf, ItNext}; + {{node, _Filename, _Error, _PathStack}, ItNext} -> + % NOTE + % Intentionally skipping intermediate traversal errors here, for simplicity. + next_leaf(ItNext); + none -> + none + end. + +%% + +-spec seek([file:name()], root(), glob()) -> + t(). +seek(PathSeek, Root, Glob) -> + SeekGlob = mk_seek_glob(PathSeek, Glob), + SeekStack = lists:reverse(PathSeek), + case next_leaf(new(Root, SeekGlob)) of + {{leaf, _Filepath, _Info, SeekStack}, It} -> + fixup_glob(Glob, It); + {{leaf, _Filepath, _Info, Successor}, It = #{queue := Queue}} -> + fixup_glob(Glob, It#{queue => [Successor | Queue]}); + none -> + none(Root) + end. + +mk_seek_glob(PathSeek, Glob) -> + % NOTE + % The seek glob is a glob that skips all the nodes / leaves that are lexicographically + % smaller than the seek path. For example, if the seek path is ["a", "b", "c"], and + % the glob is ['*', '*', '*', '*'], then the seek glob is: + % [ fun(Path) -> Path >= ["a"] end, + % fun(Path) -> Path >= ["a", "b"] end, + % fun(Path) -> Path >= ["a", "b", "c"] end, + % '*' + % ] + L = min(length(PathSeek), length(Glob)), + merge_glob([mk_seek_pat(lists:sublist(PathSeek, N)) || N <- lists:seq(1, L)], Glob). + +mk_seek_pat(PathSeek) -> + % NOTE + % The `PathStack` and `PathSeek` are of the same length here. + fun(_Filename, PathStack) -> lists:reverse(PathStack) >= PathSeek end. + +merge_glob([Pat | SeekRest], [PatOrig | Rest]) -> + [merge_pat(Pat, PatOrig) | merge_glob(SeekRest, Rest)]; +merge_glob([], [PatOrig | Rest]) -> + [PatOrig | merge_glob([], Rest)]; +merge_glob([], []) -> + []. + +merge_pat(Pat, PatOrig) -> + fun(Filename, PathStack) -> + Pat(Filename, PathStack) andalso matches_glob(PatOrig, Filename, PathStack) + end. + +fixup_glob(Glob, It = #{head := [], stack := Stack}) -> + % NOTE + % Restoring original glob through the stack. Strictly speaking, this is not usually + % necessary, it's a kind of optimization. + fixup_glob(Glob, lists:reverse(Stack), It#{stack => []}). + +fixup_glob(Glob = [_ | Rest], [{Queue, _} | StackRest], It = #{stack := Stack}) -> + fixup_glob(Rest, StackRest, It#{stack => [{Queue, Glob} | Stack]}); +fixup_glob(Rest, [], It) -> + It#{head => Rest}. + +%% + +-spec fold(fun((entry(), Acc) -> Acc), Acc, t()) -> + Acc. +fold(FoldFun, Acc, It) -> + case next(It) of + {Entry, ItNext} -> + fold(FoldFun, FoldFun(Entry, Acc), ItNext); + none -> + Acc + end. + +%% + +-spec none(root()) -> + t(). +none(Root) -> + % NOTE + % The _none_ iterator is a valid iterator, but it will never yield any entries. + #{ + root => Root, + queue => [], + head => [], + stack => [] + }. diff --git a/apps/emqx_ft/src/emqx_ft_fs_util.erl b/apps/emqx_ft/src/emqx_ft_fs_util.erl index df9135816..b731d3270 100644 --- a/apps/emqx_ft/src/emqx_ft_fs_util.erl +++ b/apps/emqx_ft/src/emqx_ft_fs_util.erl @@ -25,18 +25,16 @@ -export([read_decode_file/2]). -export([read_info/1]). +-export([list_dir/1]). -export([fold/4]). --type glob() :: ['*' | globfun()]. --type globfun() :: - fun((_Filename :: file:name()) -> boolean()). -type foldfun(Acc) :: fun( ( _Filepath :: file:name(), - _Info :: file:file_info() | {error, _IoError}, - _Stack :: [file:name()], + _Info :: file:file_info() | {error, file:posix()}, + _Stack :: emqx_ft_fs_iterator:pathstack(), Acc ) -> Acc ). @@ -153,46 +151,8 @@ read_info(AbsPath) -> % Be aware that this function is occasionally mocked in `emqx_ft_fs_util_SUITE`. file:read_link_info(AbsPath, [{time, posix}, raw]). --spec fold(foldfun(Acc), Acc, _Root :: file:name(), glob()) -> - Acc. -fold(Fun, Acc, Root, Glob) -> - fold(Fun, Acc, [], Root, Glob, []). - -fold(Fun, AccIn, Path, Root, [Glob | Rest], Stack) when Glob == '*' orelse is_function(Glob) -> - case list_dir(filename:join(Root, Path)) of - {ok, Filenames} -> - lists:foldl( - fun(FN, Acc) -> - case matches_glob(Glob, FN) of - true when Path == [] -> - fold(Fun, Acc, FN, Root, Rest, [FN | Stack]); - true -> - fold(Fun, Acc, filename:join(Path, FN), Root, Rest, [FN | Stack]); - false -> - Acc - end - end, - AccIn, - Filenames - ); - {error, enotdir} -> - AccIn; - {error, Reason} -> - Fun(Path, {error, Reason}, Stack, AccIn) - end; -fold(Fun, AccIn, Filepath, Root, [], Stack) -> - case ?MODULE:read_info(filename:join(Root, Filepath)) of - {ok, Info} -> - Fun(Filepath, Info, Stack, AccIn); - {error, Reason} -> - Fun(Filepath, {error, Reason}, Stack, AccIn) - end. - -matches_glob('*', _) -> - true; -matches_glob(FilterFun, Filename) when is_function(FilterFun) -> - FilterFun(Filename). - +-spec list_dir(file:name_all()) -> + {ok, [file:name()]} | {error, file:posix() | badarg}. list_dir(AbsPath) -> case ?MODULE:read_info(AbsPath) of {ok, #file_info{type = directory}} -> @@ -202,3 +162,19 @@ list_dir(AbsPath) -> {error, Reason} -> {error, Reason} end. + +-spec fold(foldfun(Acc), Acc, _Root :: file:name(), emqx_ft_fs_iterator:glob()) -> + Acc. +fold(FoldFun, Acc, Root, Glob) -> + fold(FoldFun, Acc, emqx_ft_fs_iterator:new(Root, Glob)). + +fold(FoldFun, Acc, It) -> + case emqx_ft_fs_iterator:next(It) of + {{node, _Path, {error, enotdir}, _PathStack}, ItNext} -> + fold(FoldFun, Acc, ItNext); + {{_Type, Path, Info, PathStack}, ItNext} -> + AccNext = FoldFun(Path, Info, PathStack, Acc), + fold(FoldFun, AccNext, ItNext); + none -> + Acc + end. diff --git a/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl b/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl index 81a483651..e4aa70f81 100644 --- a/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_fs_util_SUITE.erl @@ -34,7 +34,7 @@ t_fold_single_level(Config) -> {"c", #file_info{type = directory}, ["c"]}, {"d", #file_info{type = directory}, ["d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*'])) + sort(fold(fun cons/4, [], Root, ['*'])) ). t_fold_multi_level(Config) -> @@ -45,7 +45,7 @@ t_fold_multi_level(Config) -> {"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}, {"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) ), ?assertMatch( [ @@ -53,32 +53,32 @@ t_fold_multi_level(Config) -> {"c/bar/中文", #file_info{type = regular}, ["中文", "bar", "c"]}, {"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*'])) ). t_fold_no_glob(Config) -> Root = ?config(data_dir, Config), ?assertMatch( [{"", #file_info{type = directory}, []}], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, [])) + sort(fold(fun cons/4, [], Root, [])) ). t_fold_glob_too_deep(Config) -> Root = ?config(data_dir, Config), ?assertMatch( [], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*'])) ). t_fold_invalid_root(Config) -> Root = ?config(data_dir, Config), ?assertMatch( [], - sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*'])) + sort(fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*'])) ), ?assertMatch( [], - sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*'])) + sort(fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*'])) ). t_fold_filter_unicode(Config) -> @@ -88,13 +88,13 @@ t_fold_filter_unicode(Config) -> {"a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]}, {"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1])) ), ?assertMatch( [ {"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)])) ). t_fold_filter_levels(Config) -> @@ -104,7 +104,7 @@ t_fold_filter_levels(Config) -> {"a/b/foo", #file_info{type = directory}, ["foo", "b", "a"]}, {"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*'])) + sort(fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*'])) ). t_fold_errors(Config) -> @@ -128,11 +128,99 @@ t_fold_errors(Config) -> {"c/link", {error, enotsup}, ["link", "c"]}, {"d/e/baz/needle", {error, ebusy}, ["needle", "baz", "e", "d"]} ], - sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) + sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*'])) + ). + +t_seek_fold(Config) -> + Root = ?config(data_dir, Config), + ?assertMatch( + [ + {leaf, "a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]}, + {leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}, + {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["a", "a"], Root, ['*', '*', '*', '*']) + ) + ) + ), + ?assertMatch( + [ + {leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}, + {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["a", "b", "foo", "42"], Root, ['*', '*', '*', '*']) + ) + ) + ), + ?assertMatch( + [ + {leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["c", "d", "e", "f"], Root, ['*', '*', '*', '*']) + ) + ) + ). + +t_seek_empty(Config) -> + Root = ?config(data_dir, Config), + ?assertEqual( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:new(Root, ['*', '*', '*', '*']) + ), + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek([], Root, ['*', '*', '*', '*']) + ) + ). + +t_seek_past_end(Config) -> + Root = ?config(data_dir, Config), + ?assertEqual( + none, + emqx_ft_fs_iterator:next( + emqx_ft_fs_iterator:seek(["g", "h"], Root, ['*', '*', '*', '*']) + ) + ). + +t_seek_with_filter(Config) -> + Root = ?config(data_dir, Config), + ?assertMatch( + [ + {leaf, "d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]} + | _Nodes + ], + sort( + emqx_ft_fs_iterator:fold( + fun cons/2, + [], + emqx_ft_fs_iterator:seek(["a", "link"], Root, ['*', fun is_letter/1, '*']) + ) + ) ). %% +fold(FoldFun, Acc, Root, Glob) -> + emqx_ft_fs_util:fold(FoldFun, Acc, Root, Glob). + is_not(F) -> fun(X) -> not F(X) end. @@ -155,5 +243,8 @@ is_letter(Filename) -> cons(Path, Info, Stack, Acc) -> [{Path, Info, Stack} | Acc]. +cons(Entry, Acc) -> + [Entry | Acc]. + sort(L) when is_list(L) -> lists:sort(L).