feat(ft-fs): introduce fs iterators concept + forward seeks

In order to support paging over filesystem contents, to serve REST
APIs effectively.
This commit is contained in:
Andrew Mayorov 2023-03-30 19:18:47 +03:00
parent 9f7e807e06
commit 573bb22ada
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 342 additions and 56 deletions

View File

@ -0,0 +1,219 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_fs_iterator).
-export([new/2]).
-export([next/1]).
-export([next_leaf/1]).
-export([seek/3]).
-export([fold/3]).
-export_type([t/0]).
-export_type([glob/0]).
-export_type([pathstack/0]).
-type root() :: file:name().
-type glob() :: ['*' | globfun()].
-type globfun() ::
fun((_Filename :: file:name()) -> boolean())
| fun((_Filename :: file:name(), pathstack()) -> boolean()).
% A path stack is a list of path components, in reverse order.
-type pathstack() :: [file:name(), ...].
-opaque t() :: #{
root := root(),
queue := [_PathStack :: [file:name()]],
head := glob(),
stack := [{[pathstack()], glob()}]
}.
-type entry() :: entry_leaf() | entry_node().
-type entry_leaf() ::
{leaf, file:name(), file:file_info() | {error, file:posix()}, pathstack()}.
-type entry_node() ::
{node, file:name(), {error, file:posix()}, pathstack()}.
-spec new(root(), glob()) ->
t().
new(Root, Glob) ->
#{
root => Root,
queue => [[]],
head => Glob,
stack => []
}.
-spec next(t()) ->
{entry(), t()} | none.
next(It = #{queue := [PathStack | Rest], head := []}) ->
{emit(PathStack, It), It#{queue => Rest}};
next(It = #{queue := [PathStack | Rest], head := [Pat | _], root := Root}) ->
Filepath = mk_filepath(PathStack),
case emqx_ft_fs_util:list_dir(filename:join(Root, Filepath)) of
{ok, Filenames} ->
Sorted = lists:sort(Filenames),
Matches = [[Fn | PathStack] || Fn <- Sorted, matches_glob(Pat, Fn, [Fn | PathStack])],
ItNext = windup(It),
next(ItNext#{queue => Matches});
{error, _} = Error ->
{{node, Filepath, Error, PathStack}, It#{queue => Rest}}
end;
next(It = #{queue := []}) ->
unwind(It).
windup(It = #{queue := [_ | Rest], head := [Pat | Glob], stack := Stack}) ->
% NOTE
% Preserve unfinished paths and glob in the stack, so that we can resume traversal
% when the lower levels of the tree are exhausted.
It#{
head => Glob,
stack => [{Rest, [Pat | Glob]} | Stack]
}.
unwind(It = #{stack := [{Queue, Glob} | StackRest]}) ->
% NOTE
% Resume traversal of unfinished paths from the upper levels of the tree.
next(It#{
queue => Queue,
head => Glob,
stack => StackRest
});
unwind(#{stack := []}) ->
none.
emit(PathStack, #{root := Root}) ->
Filepath = mk_filepath(PathStack),
case emqx_ft_fs_util:read_info(filename:join(Root, Filepath)) of
{ok, Fileinfo} ->
{leaf, Filepath, Fileinfo, PathStack};
{error, _} = Error ->
{leaf, Filepath, Error, PathStack}
end.
mk_filepath([]) ->
"";
mk_filepath(PathStack) ->
filename:join(lists:reverse(PathStack)).
matches_glob('*', _, _) ->
true;
matches_glob(FilterFun, Filename, _PathStack) when is_function(FilterFun, 1) ->
FilterFun(Filename);
matches_glob(FilterFun, Filename, PathStack) when is_function(FilterFun, 2) ->
FilterFun(Filename, PathStack).
%%
-spec next_leaf(t()) ->
{entry_leaf(), t()} | none.
next_leaf(It) ->
case next(It) of
{{leaf, _, _, _} = Leaf, ItNext} ->
{Leaf, ItNext};
{{node, _Filename, _Error, _PathStack}, ItNext} ->
% NOTE
% Intentionally skipping intermediate traversal errors here, for simplicity.
next_leaf(ItNext);
none ->
none
end.
%%
-spec seek([file:name()], root(), glob()) ->
t().
seek(PathSeek, Root, Glob) ->
SeekGlob = mk_seek_glob(PathSeek, Glob),
SeekStack = lists:reverse(PathSeek),
case next_leaf(new(Root, SeekGlob)) of
{{leaf, _Filepath, _Info, SeekStack}, It} ->
fixup_glob(Glob, It);
{{leaf, _Filepath, _Info, Successor}, It = #{queue := Queue}} ->
fixup_glob(Glob, It#{queue => [Successor | Queue]});
none ->
none(Root)
end.
mk_seek_glob(PathSeek, Glob) ->
% NOTE
% The seek glob is a glob that skips all the nodes / leaves that are lexicographically
% smaller than the seek path. For example, if the seek path is ["a", "b", "c"], and
% the glob is ['*', '*', '*', '*'], then the seek glob is:
% [ fun(Path) -> Path >= ["a"] end,
% fun(Path) -> Path >= ["a", "b"] end,
% fun(Path) -> Path >= ["a", "b", "c"] end,
% '*'
% ]
L = min(length(PathSeek), length(Glob)),
merge_glob([mk_seek_pat(lists:sublist(PathSeek, N)) || N <- lists:seq(1, L)], Glob).
mk_seek_pat(PathSeek) ->
% NOTE
% The `PathStack` and `PathSeek` are of the same length here.
fun(_Filename, PathStack) -> lists:reverse(PathStack) >= PathSeek end.
merge_glob([Pat | SeekRest], [PatOrig | Rest]) ->
[merge_pat(Pat, PatOrig) | merge_glob(SeekRest, Rest)];
merge_glob([], [PatOrig | Rest]) ->
[PatOrig | merge_glob([], Rest)];
merge_glob([], []) ->
[].
merge_pat(Pat, PatOrig) ->
fun(Filename, PathStack) ->
Pat(Filename, PathStack) andalso matches_glob(PatOrig, Filename, PathStack)
end.
fixup_glob(Glob, It = #{head := [], stack := Stack}) ->
% NOTE
% Restoring original glob through the stack. Strictly speaking, this is not usually
% necessary, it's a kind of optimization.
fixup_glob(Glob, lists:reverse(Stack), It#{stack => []}).
fixup_glob(Glob = [_ | Rest], [{Queue, _} | StackRest], It = #{stack := Stack}) ->
fixup_glob(Rest, StackRest, It#{stack => [{Queue, Glob} | Stack]});
fixup_glob(Rest, [], It) ->
It#{head => Rest}.
%%
-spec fold(fun((entry(), Acc) -> Acc), Acc, t()) ->
Acc.
fold(FoldFun, Acc, It) ->
case next(It) of
{Entry, ItNext} ->
fold(FoldFun, FoldFun(Entry, Acc), ItNext);
none ->
Acc
end.
%%
-spec none(root()) ->
t().
none(Root) ->
% NOTE
% The _none_ iterator is a valid iterator, but it will never yield any entries.
#{
root => Root,
queue => [],
head => [],
stack => []
}.

View File

@ -25,18 +25,16 @@
-export([read_decode_file/2]).
-export([read_info/1]).
-export([list_dir/1]).
-export([fold/4]).
-type glob() :: ['*' | globfun()].
-type globfun() ::
fun((_Filename :: file:name()) -> boolean()).
-type foldfun(Acc) ::
fun(
(
_Filepath :: file:name(),
_Info :: file:file_info() | {error, _IoError},
_Stack :: [file:name()],
_Info :: file:file_info() | {error, file:posix()},
_Stack :: emqx_ft_fs_iterator:pathstack(),
Acc
) -> Acc
).
@ -153,46 +151,8 @@ read_info(AbsPath) ->
% Be aware that this function is occasionally mocked in `emqx_ft_fs_util_SUITE`.
file:read_link_info(AbsPath, [{time, posix}, raw]).
-spec fold(foldfun(Acc), Acc, _Root :: file:name(), glob()) ->
Acc.
fold(Fun, Acc, Root, Glob) ->
fold(Fun, Acc, [], Root, Glob, []).
fold(Fun, AccIn, Path, Root, [Glob | Rest], Stack) when Glob == '*' orelse is_function(Glob) ->
case list_dir(filename:join(Root, Path)) of
{ok, Filenames} ->
lists:foldl(
fun(FN, Acc) ->
case matches_glob(Glob, FN) of
true when Path == [] ->
fold(Fun, Acc, FN, Root, Rest, [FN | Stack]);
true ->
fold(Fun, Acc, filename:join(Path, FN), Root, Rest, [FN | Stack]);
false ->
Acc
end
end,
AccIn,
Filenames
);
{error, enotdir} ->
AccIn;
{error, Reason} ->
Fun(Path, {error, Reason}, Stack, AccIn)
end;
fold(Fun, AccIn, Filepath, Root, [], Stack) ->
case ?MODULE:read_info(filename:join(Root, Filepath)) of
{ok, Info} ->
Fun(Filepath, Info, Stack, AccIn);
{error, Reason} ->
Fun(Filepath, {error, Reason}, Stack, AccIn)
end.
matches_glob('*', _) ->
true;
matches_glob(FilterFun, Filename) when is_function(FilterFun) ->
FilterFun(Filename).
-spec list_dir(file:name_all()) ->
{ok, [file:name()]} | {error, file:posix() | badarg}.
list_dir(AbsPath) ->
case ?MODULE:read_info(AbsPath) of
{ok, #file_info{type = directory}} ->
@ -202,3 +162,19 @@ list_dir(AbsPath) ->
{error, Reason} ->
{error, Reason}
end.
-spec fold(foldfun(Acc), Acc, _Root :: file:name(), emqx_ft_fs_iterator:glob()) ->
Acc.
fold(FoldFun, Acc, Root, Glob) ->
fold(FoldFun, Acc, emqx_ft_fs_iterator:new(Root, Glob)).
fold(FoldFun, Acc, It) ->
case emqx_ft_fs_iterator:next(It) of
{{node, _Path, {error, enotdir}, _PathStack}, ItNext} ->
fold(FoldFun, Acc, ItNext);
{{_Type, Path, Info, PathStack}, ItNext} ->
AccNext = FoldFun(Path, Info, PathStack, Acc),
fold(FoldFun, AccNext, ItNext);
none ->
Acc
end.

View File

@ -34,7 +34,7 @@ t_fold_single_level(Config) ->
{"c", #file_info{type = directory}, ["c"]},
{"d", #file_info{type = directory}, ["d"]}
],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*']))
sort(fold(fun cons/4, [], Root, ['*']))
).
t_fold_multi_level(Config) ->
@ -45,7 +45,7 @@ t_fold_multi_level(Config) ->
{"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]},
{"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
),
?assertMatch(
[
@ -53,32 +53,32 @@ t_fold_multi_level(Config) ->
{"c/bar/中文", #file_info{type = regular}, ["中文", "bar", "c"]},
{"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]}
],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*']))
sort(fold(fun cons/4, [], Root, ['*', '*', '*']))
).
t_fold_no_glob(Config) ->
Root = ?config(data_dir, Config),
?assertMatch(
[{"", #file_info{type = directory}, []}],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, []))
sort(fold(fun cons/4, [], Root, []))
).
t_fold_glob_too_deep(Config) ->
Root = ?config(data_dir, Config),
?assertMatch(
[],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*']))
sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*', '*']))
).
t_fold_invalid_root(Config) ->
Root = ?config(data_dir, Config),
?assertMatch(
[],
sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*']))
sort(fold(fun cons/4, [], filename:join([Root, "a", "link"]), ['*']))
),
?assertMatch(
[],
sort(emqx_ft_fs_util:fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*']))
sort(fold(fun cons/4, [], filename:join([Root, "d", "haystack"]), ['*']))
).
t_fold_filter_unicode(Config) ->
@ -88,13 +88,13 @@ t_fold_filter_unicode(Config) ->
{"a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]},
{"d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1]))
sort(fold(fun cons/4, [], Root, ['*', '*', '*', fun is_latin1/1]))
),
?assertMatch(
[
{"a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]}
],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)]))
sort(fold(fun cons/4, [], Root, ['*', '*', '*', is_not(fun is_latin1/1)]))
).
t_fold_filter_levels(Config) ->
@ -104,7 +104,7 @@ t_fold_filter_levels(Config) ->
{"a/b/foo", #file_info{type = directory}, ["foo", "b", "a"]},
{"d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]}
],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*']))
sort(fold(fun cons/4, [], Root, [fun is_letter/1, fun is_letter/1, '*']))
).
t_fold_errors(Config) ->
@ -128,11 +128,99 @@ t_fold_errors(Config) ->
{"c/link", {error, enotsup}, ["link", "c"]},
{"d/e/baz/needle", {error, ebusy}, ["needle", "baz", "e", "d"]}
],
sort(emqx_ft_fs_util:fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
sort(fold(fun cons/4, [], Root, ['*', '*', '*', '*']))
).
t_seek_fold(Config) ->
Root = ?config(data_dir, Config),
?assertMatch(
[
{leaf, "a/b/foo/42", #file_info{type = regular}, ["42", "foo", "b", "a"]},
{leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]},
{leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
| _Nodes
],
sort(
emqx_ft_fs_iterator:fold(
fun cons/2,
[],
emqx_ft_fs_iterator:seek(["a", "a"], Root, ['*', '*', '*', '*'])
)
)
),
?assertMatch(
[
{leaf, "a/b/foo/Я", #file_info{type = regular}, ["Я", "foo", "b", "a"]},
{leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
| _Nodes
],
sort(
emqx_ft_fs_iterator:fold(
fun cons/2,
[],
emqx_ft_fs_iterator:seek(["a", "b", "foo", "42"], Root, ['*', '*', '*', '*'])
)
)
),
?assertMatch(
[
{leaf, "d/e/baz/needle", #file_info{type = regular}, ["needle", "baz", "e", "d"]}
| _Nodes
],
sort(
emqx_ft_fs_iterator:fold(
fun cons/2,
[],
emqx_ft_fs_iterator:seek(["c", "d", "e", "f"], Root, ['*', '*', '*', '*'])
)
)
).
t_seek_empty(Config) ->
Root = ?config(data_dir, Config),
?assertEqual(
emqx_ft_fs_iterator:fold(
fun cons/2,
[],
emqx_ft_fs_iterator:new(Root, ['*', '*', '*', '*'])
),
emqx_ft_fs_iterator:fold(
fun cons/2,
[],
emqx_ft_fs_iterator:seek([], Root, ['*', '*', '*', '*'])
)
).
t_seek_past_end(Config) ->
Root = ?config(data_dir, Config),
?assertEqual(
none,
emqx_ft_fs_iterator:next(
emqx_ft_fs_iterator:seek(["g", "h"], Root, ['*', '*', '*', '*'])
)
).
t_seek_with_filter(Config) ->
Root = ?config(data_dir, Config),
?assertMatch(
[
{leaf, "d/e/baz", #file_info{type = directory}, ["baz", "e", "d"]}
| _Nodes
],
sort(
emqx_ft_fs_iterator:fold(
fun cons/2,
[],
emqx_ft_fs_iterator:seek(["a", "link"], Root, ['*', fun is_letter/1, '*'])
)
)
).
%%
fold(FoldFun, Acc, Root, Glob) ->
emqx_ft_fs_util:fold(FoldFun, Acc, Root, Glob).
is_not(F) ->
fun(X) -> not F(X) end.
@ -155,5 +243,8 @@ is_letter(Filename) ->
cons(Path, Info, Stack, Acc) ->
[{Path, Info, Stack} | Acc].
cons(Entry, Acc) ->
[Entry | Acc].
sort(L) when is_list(L) ->
lists:sort(L).