feat(ft-api): add paging support through cursors
This commit is contained in:
parent
573bb22ada
commit
ed3756ea09
|
@ -19,7 +19,6 @@
|
|||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% Swagger specs from hocon schema
|
||||
-export([
|
||||
|
@ -30,12 +29,14 @@
|
|||
]).
|
||||
|
||||
-export([
|
||||
roots/0
|
||||
roots/0,
|
||||
fields/1
|
||||
]).
|
||||
|
||||
%% API callbacks
|
||||
-export([
|
||||
'/file_transfer/files'/2
|
||||
'/file_transfer/files'/2,
|
||||
'/file_transfer/files/:clientid/:fileid'/2
|
||||
]).
|
||||
|
||||
-import(hoconsc, [mk/2, ref/1, ref/2]).
|
||||
|
@ -47,7 +48,8 @@ api_spec() ->
|
|||
|
||||
paths() ->
|
||||
[
|
||||
"/file_transfer/files"
|
||||
"/file_transfer/files",
|
||||
"/file_transfer/files/:clientid/:fileid"
|
||||
].
|
||||
|
||||
schema("/file_transfer/files") ->
|
||||
|
@ -57,29 +59,133 @@ schema("/file_transfer/files") ->
|
|||
tags => [<<"file_transfer">>],
|
||||
summary => <<"List all uploaded files">>,
|
||||
description => ?DESC("file_list"),
|
||||
parameters => [
|
||||
ref(following),
|
||||
ref(emqx_dashboard_swagger, limit)
|
||||
],
|
||||
responses => #{
|
||||
200 => <<"Operation success">>,
|
||||
400 => emqx_dashboard_swagger:error_codes(
|
||||
['BAD_REQUEST'], <<"Invalid cursor">>
|
||||
),
|
||||
503 => emqx_dashboard_swagger:error_codes(
|
||||
['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
|
||||
['SERVICE_UNAVAILABLE'], error_desc('SERVICE_UNAVAILABLE')
|
||||
)
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/file_transfer/files/:clientid/:fileid") ->
|
||||
#{
|
||||
'operationId' => '/file_transfer/files/:clientid/:fileid',
|
||||
get => #{
|
||||
tags => [<<"file_transfer">>],
|
||||
summary => <<"List files uploaded in a specific transfer">>,
|
||||
description => ?DESC("file_list_transfer"),
|
||||
parameters => [
|
||||
ref(client_id),
|
||||
ref(file_id)
|
||||
],
|
||||
responses => #{
|
||||
200 => <<"Operation success">>,
|
||||
404 => emqx_dashboard_swagger:error_codes(
|
||||
['FILES_NOT_FOUND'], error_desc('FILES_NOT_FOUND')
|
||||
),
|
||||
503 => emqx_dashboard_swagger:error_codes(
|
||||
['SERVICE_UNAVAILABLE'], error_desc('SERVICE_UNAVAILABLE')
|
||||
)
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
'/file_transfer/files'(get, #{}) ->
|
||||
case emqx_ft_storage:files() of
|
||||
{ok, Files} ->
|
||||
{200, #{<<"files">> => lists:map(fun format_file_info/1, Files)}};
|
||||
{error, _} ->
|
||||
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
|
||||
'/file_transfer/files'(get, #{
|
||||
query_string := QueryString
|
||||
}) ->
|
||||
try
|
||||
Limit = limit(QueryString),
|
||||
Query =
|
||||
case maps:get(<<"following">>, QueryString, undefined) of
|
||||
undefined ->
|
||||
#{limit => Limit};
|
||||
Cursor ->
|
||||
#{limit => Limit, following => Cursor}
|
||||
end,
|
||||
case emqx_ft_storage:files(Query) of
|
||||
{ok, Page} ->
|
||||
{200, format_page(Page)};
|
||||
{error, _} ->
|
||||
{503, error_msg('SERVICE_UNAVAILABLE')}
|
||||
end
|
||||
catch
|
||||
error:{badarg, cursor} ->
|
||||
{400, error_msg('BAD_REQUEST', <<"Invalid cursor">>)}
|
||||
end.
|
||||
|
||||
'/file_transfer/files/:clientid/:fileid'(get, #{
|
||||
bindings := #{clientid := ClientId, fileid := FileId}
|
||||
}) ->
|
||||
Transfer = {ClientId, FileId},
|
||||
case emqx_ft_storage:files(#{transfer => Transfer}) of
|
||||
{ok, Page} ->
|
||||
{200, format_page(Page)};
|
||||
{error, [{_Node, enoent} | _]} ->
|
||||
{404, error_msg('FILES_NOT_FOUND')};
|
||||
{error, _} ->
|
||||
{503, error_msg('SERVICE_UNAVAILABLE')}
|
||||
end.
|
||||
|
||||
format_page(#{items := Files, cursor := Cursor}) ->
|
||||
#{
|
||||
<<"files">> => lists:map(fun format_file_info/1, Files),
|
||||
<<"cursor">> => Cursor
|
||||
};
|
||||
format_page(#{items := Files}) ->
|
||||
#{
|
||||
<<"files">> => lists:map(fun format_file_info/1, Files)
|
||||
}.
|
||||
|
||||
error_msg(Code) ->
|
||||
#{code => Code, message => error_desc(Code)}.
|
||||
|
||||
error_msg(Code, Msg) ->
|
||||
#{code => Code, message => emqx_utils:readable_error_msg(Msg)}.
|
||||
|
||||
error_desc('FILES_NOT_FOUND') ->
|
||||
<<"Files requested for this transfer could not be found">>;
|
||||
error_desc('SERVICE_UNAVAILABLE') ->
|
||||
<<"Service unavailable">>.
|
||||
|
||||
roots() ->
|
||||
[].
|
||||
|
||||
-spec fields(hocon_schema:name()) -> [hoconsc:field()].
|
||||
fields(client_id) ->
|
||||
[
|
||||
{clientid,
|
||||
mk(binary(), #{
|
||||
in => path,
|
||||
desc => <<"MQTT Client ID">>,
|
||||
required => true
|
||||
})}
|
||||
];
|
||||
fields(file_id) ->
|
||||
[
|
||||
{fileid,
|
||||
mk(binary(), #{
|
||||
in => path,
|
||||
desc => <<"File ID">>,
|
||||
required => true
|
||||
})}
|
||||
];
|
||||
fields(following) ->
|
||||
[
|
||||
{following,
|
||||
mk(binary(), #{
|
||||
in => query,
|
||||
desc => <<"Cursor to start listing files from">>,
|
||||
required => false
|
||||
})}
|
||||
].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -115,3 +221,6 @@ format_name(NameBin) when is_binary(NameBin) ->
|
|||
NameBin;
|
||||
format_name(Name) when is_list(Name) ->
|
||||
iolist_to_binary(Name).
|
||||
|
||||
limit(QueryString) ->
|
||||
maps:get(<<"limit">>, QueryString, emqx_mgmt:default_row_limit()).
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
-export([seek/3]).
|
||||
|
||||
-export([fold/3]).
|
||||
-export([fold_n/4]).
|
||||
|
||||
-export_type([t/0]).
|
||||
-export_type([glob/0]).
|
||||
|
@ -204,6 +205,21 @@ fold(FoldFun, Acc, It) ->
|
|||
Acc
|
||||
end.
|
||||
|
||||
%% NOTE
|
||||
%% Passing negative `N` is allowed, in which case the iterator will be exhausted
|
||||
%% completely, like in `fold/3`.
|
||||
-spec fold_n(fun((entry(), Acc) -> Acc), Acc, t(), _N :: integer()) ->
|
||||
{Acc, {more, t()} | none}.
|
||||
fold_n(_FoldFun, Acc, It, 0) ->
|
||||
{Acc, {more, It}};
|
||||
fold_n(FoldFun, Acc, It, N) ->
|
||||
case next(It) of
|
||||
{Entry, ItNext} ->
|
||||
fold_n(FoldFun, FoldFun(Entry, Acc), ItNext, N - 1);
|
||||
none ->
|
||||
{Acc, none}
|
||||
end.
|
||||
|
||||
%%
|
||||
|
||||
-spec none(root()) ->
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
assemble/2,
|
||||
|
||||
files/0,
|
||||
files/1,
|
||||
|
||||
with_storage_type/2,
|
||||
with_storage_type/3,
|
||||
|
@ -36,12 +37,27 @@
|
|||
-type storage() :: emqx_config:config().
|
||||
|
||||
-export_type([assemble_callback/0]).
|
||||
|
||||
-export_type([query/1]).
|
||||
-export_type([page/2]).
|
||||
-export_type([file_info/0]).
|
||||
-export_type([export_data/0]).
|
||||
-export_type([reader/0]).
|
||||
|
||||
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
|
||||
|
||||
-type query(Cursor) ::
|
||||
#{transfer => emqx_ft:transfer()}
|
||||
| #{
|
||||
limit => non_neg_integer(),
|
||||
following => Cursor
|
||||
}.
|
||||
|
||||
-type page(Item, Cursor) :: #{
|
||||
items := [Item],
|
||||
cursor => Cursor
|
||||
}.
|
||||
|
||||
-type file_info() :: #{
|
||||
transfer := emqx_ft:transfer(),
|
||||
name := file:name(),
|
||||
|
@ -71,8 +87,8 @@
|
|||
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
|
||||
ok | {async, pid()} | {error, term()}.
|
||||
|
||||
-callback files(storage()) ->
|
||||
{ok, [file_info()]} | {error, term()}.
|
||||
-callback files(storage(), query(Cursor)) ->
|
||||
{ok, page(file_info(), Cursor)} | {error, term()}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
|
@ -105,9 +121,14 @@ assemble(Transfer, Size) ->
|
|||
with_storage(assemble, [Transfer, Size]).
|
||||
|
||||
-spec files() ->
|
||||
{ok, [file_info()]} | {error, term()}.
|
||||
{ok, page(file_info(), _)} | {error, term()}.
|
||||
files() ->
|
||||
with_storage(files, []).
|
||||
files(#{}).
|
||||
|
||||
-spec files(query(Cursor)) ->
|
||||
{ok, page(file_info(), Cursor)} | {error, term()}.
|
||||
files(Query) ->
|
||||
with_storage(files, [Query]).
|
||||
|
||||
-spec with_storage(atom() | function()) -> any().
|
||||
with_storage(Fun) ->
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
%% Filesystem storage exporter
|
||||
%%
|
||||
%% This is conceptually a part of the Filesystem storage backend that defines
|
||||
%% how and where complete tranfers are assembled into files and stored.
|
||||
%% how and where complete transfers are assembled into files and stored.
|
||||
|
||||
-module(emqx_ft_storage_exporter).
|
||||
|
||||
|
@ -28,7 +28,7 @@
|
|||
-export([discard/1]).
|
||||
|
||||
%% Listing API
|
||||
-export([list/1]).
|
||||
-export([list/2]).
|
||||
|
||||
%% Lifecycle API
|
||||
-export([on_config_update/2]).
|
||||
|
@ -70,8 +70,8 @@
|
|||
-callback discard(ExportSt :: export_st()) ->
|
||||
ok | {error, _Reason}.
|
||||
|
||||
-callback list(storage()) ->
|
||||
{ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
|
||||
-callback list(exporter_conf(), emqx_ft_storage:query(Cursor)) ->
|
||||
{ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}.
|
||||
|
||||
%% Lifecycle callbacks
|
||||
|
||||
|
@ -133,11 +133,11 @@ complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemet
|
|||
discard(#{mod := ExporterMod, st := ExportSt}) ->
|
||||
ExporterMod:discard(ExportSt).
|
||||
|
||||
-spec list(storage()) ->
|
||||
{ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
|
||||
list(Storage) ->
|
||||
-spec list(storage(), emqx_ft_storage:query(Cursor)) ->
|
||||
{ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}.
|
||||
list(Storage, Query) ->
|
||||
{ExporterMod, ExporterOpts} = exporter(Storage),
|
||||
ExporterMod:list(ExporterOpts).
|
||||
ExporterMod:list(ExporterOpts, Query).
|
||||
|
||||
%% Lifecycle
|
||||
|
||||
|
|
|
@ -37,12 +37,23 @@
|
|||
%% Internal API for RPC
|
||||
-export([list_local/1]).
|
||||
-export([list_local/2]).
|
||||
-export([list_local_transfer/2]).
|
||||
-export([start_reader/3]).
|
||||
|
||||
% TODO
|
||||
% -export([list/2]).
|
||||
-export([list/2]).
|
||||
|
||||
-export_type([export_st/0]).
|
||||
-export_type([options/0]).
|
||||
|
||||
-type options() :: #{
|
||||
root => file:name(),
|
||||
_ => _
|
||||
}.
|
||||
|
||||
-type query() :: emqx_ft_storage:query(cursor()).
|
||||
-type page(T) :: emqx_ft_storage:page(T, cursor()).
|
||||
-type cursor() :: iodata().
|
||||
|
||||
-type options() :: _TODO.
|
||||
-type transfer() :: emqx_ft:transfer().
|
||||
-type filemeta() :: emqx_ft:filemeta().
|
||||
-type exportinfo() :: emqx_ft_storage:file_info().
|
||||
|
@ -70,22 +81,6 @@
|
|||
%% 2 symbols = at most 256 directories on the second level
|
||||
-define(BUCKET2_LEN, 2).
|
||||
|
||||
-define(SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
|
||||
?SLOG(notice, "filesystem_object_unexpected", #{
|
||||
relpath => RelFilepath,
|
||||
fileinfo => Fileinfo,
|
||||
options => Options
|
||||
})
|
||||
).
|
||||
|
||||
-define(SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
|
||||
?SLOG(warning, "filesystem_object_inaccessible", #{
|
||||
relpath => RelFilepath,
|
||||
reason => Reason,
|
||||
options => Options
|
||||
})
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Exporter behaviour
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -162,33 +157,33 @@ update(_OldOptions, _NewOptions) -> ok.
|
|||
%% Internal API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec list_local(options(), transfer()) ->
|
||||
{ok, [exportinfo(), ...]} | {error, file_error()}.
|
||||
list_local(Options, Transfer) ->
|
||||
TransferRoot = mk_absdir(Options, Transfer, result),
|
||||
case
|
||||
emqx_ft_fs_util:fold(
|
||||
fun
|
||||
(_Path, {error, Reason}, [], []) ->
|
||||
{error, Reason};
|
||||
(_Path, Fileinfo = #file_info{type = regular}, [Filename | _], Acc) ->
|
||||
RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]),
|
||||
Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
|
||||
[Info | Acc];
|
||||
(RelFilepath, Fileinfo = #file_info{}, _, Acc) ->
|
||||
?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
|
||||
Acc;
|
||||
(RelFilepath, {error, Reason}, _, Acc) ->
|
||||
?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
|
||||
Acc
|
||||
end,
|
||||
[],
|
||||
TransferRoot,
|
||||
[fun filter_manifest/1]
|
||||
)
|
||||
of
|
||||
-type local_query() :: emqx_ft_storage:query({transfer(), file:name()}).
|
||||
|
||||
-spec list_local_transfer(options(), transfer()) ->
|
||||
{ok, [exportinfo()]} | {error, file_error()}.
|
||||
list_local_transfer(Options, Transfer) ->
|
||||
It = emqx_ft_fs_iterator:new(
|
||||
mk_absdir(Options, Transfer, result),
|
||||
[fun filter_manifest/1]
|
||||
),
|
||||
Result = emqx_ft_fs_iterator:fold(
|
||||
fun
|
||||
({leaf, _Path, Fileinfo = #file_info{type = regular}, [Filename | _]}, Acc) ->
|
||||
RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]),
|
||||
Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
|
||||
[Info | Acc];
|
||||
({node, _Path, {error, Reason}, []}, []) ->
|
||||
{error, Reason};
|
||||
(Entry, Acc) ->
|
||||
ok = log_invalid_entry(Options, Entry),
|
||||
Acc
|
||||
end,
|
||||
[],
|
||||
It
|
||||
),
|
||||
case Result of
|
||||
Infos = [_ | _] ->
|
||||
{ok, Infos};
|
||||
{ok, lists:reverse(Infos)};
|
||||
[] ->
|
||||
{error, enoent};
|
||||
{error, Reason} ->
|
||||
|
@ -196,9 +191,17 @@ list_local(Options, Transfer) ->
|
|||
end.
|
||||
|
||||
-spec list_local(options()) ->
|
||||
{ok, #{transfer() => [exportinfo(), ...]}}.
|
||||
{ok, [exportinfo()]} | {error, file_error()}.
|
||||
list_local(Options) ->
|
||||
Pattern = [
|
||||
list_local(Options, #{}).
|
||||
|
||||
-spec list_local(options(), local_query()) ->
|
||||
{ok, [exportinfo()]} | {error, file_error()}.
|
||||
list_local(Options, #{transfer := Transfer}) ->
|
||||
list_local_transfer(Options, Transfer);
|
||||
list_local(Options, #{} = Query) ->
|
||||
Root = get_storage_root(Options),
|
||||
Glob = [
|
||||
_Bucket1 = '*',
|
||||
_Bucket2 = '*',
|
||||
_Rest = '*',
|
||||
|
@ -206,16 +209,30 @@ list_local(Options) ->
|
|||
_FileId = '*',
|
||||
fun filter_manifest/1
|
||||
],
|
||||
Root = get_storage_root(Options),
|
||||
{ok,
|
||||
emqx_ft_fs_util:fold(
|
||||
fun(RelFilepath, Info, Stack, Acc) ->
|
||||
read_exportinfo(Options, RelFilepath, Info, Stack, Acc)
|
||||
end,
|
||||
[],
|
||||
Root,
|
||||
Pattern
|
||||
)}.
|
||||
It =
|
||||
case Query of
|
||||
#{following := Cursor} ->
|
||||
emqx_ft_fs_iterator:seek(mk_path_seek(Cursor), Root, Glob);
|
||||
#{} ->
|
||||
emqx_ft_fs_iterator:new(Root, Glob)
|
||||
end,
|
||||
% NOTE
|
||||
% In the rare case when some transfer contain more than one file, the paging mechanic
|
||||
% here may skip over some files, when the cursor is transfer-only.
|
||||
Limit = maps:get(limit, Query, -1),
|
||||
{Exports, _} = emqx_ft_fs_iterator:fold_n(
|
||||
fun(Entry, Acc) -> read_exportinfo(Options, Entry, Acc) end,
|
||||
[],
|
||||
It,
|
||||
Limit
|
||||
),
|
||||
{ok, Exports}.
|
||||
|
||||
mk_path_seek(#{transfer := Transfer, name := Filename}) ->
|
||||
mk_result_reldir(Transfer) ++ [Filename];
|
||||
mk_path_seek(#{transfer := Transfer}) ->
|
||||
% NOTE: Any bitstring is greater than any list.
|
||||
mk_result_reldir(Transfer) ++ [<<>>].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
|
@ -227,16 +244,21 @@ filter_manifest(?MANIFEST) ->
|
|||
filter_manifest(Filename) ->
|
||||
?MANIFEST =/= string:find(Filename, ?MANIFEST, trailing).
|
||||
|
||||
read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{type = regular}, Stack, Acc) ->
|
||||
[Filename, FileId, ClientId | _] = Stack,
|
||||
read_exportinfo(
|
||||
Options,
|
||||
{leaf, RelFilepath, Fileinfo = #file_info{type = regular}, [Filename, FileId, ClientId | _]},
|
||||
Acc
|
||||
) ->
|
||||
% NOTE
|
||||
% There might be more than one file for a single transfer (though
|
||||
% extremely bad luck is needed for that, e.g. concurrent assemblers with
|
||||
% different filemetas from different nodes). This might be unexpected for a
|
||||
% client given the current protocol, yet might be helpful in the future.
|
||||
Transfer = dirnames_to_transfer(ClientId, FileId),
|
||||
Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
|
||||
[Info | Acc];
|
||||
read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{}, _Stack, Acc) ->
|
||||
?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
|
||||
Acc;
|
||||
read_exportinfo(Options, RelFilepath, {error, Reason}, _Stack, Acc) ->
|
||||
?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
|
||||
read_exportinfo(Options, Entry, Acc) ->
|
||||
ok = log_invalid_entry(Options, Entry),
|
||||
Acc.
|
||||
|
||||
mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) ->
|
||||
|
@ -268,6 +290,19 @@ try_read_filemeta(Filepath, Info) ->
|
|||
mk_export_uri(RelFilepath) ->
|
||||
emqx_ft_storage_exporter_fs_api:mk_export_uri(node(), RelFilepath).
|
||||
|
||||
log_invalid_entry(Options, {_Type, RelFilepath, Fileinfo = #file_info{}, _Stack}) ->
|
||||
?SLOG(notice, "filesystem_object_unexpected", #{
|
||||
relpath => RelFilepath,
|
||||
fileinfo => Fileinfo,
|
||||
options => Options
|
||||
});
|
||||
log_invalid_entry(Options, {_Type, RelFilepath, {error, Reason}, _Stack}) ->
|
||||
?SLOG(warning, "filesystem_object_inaccessible", #{
|
||||
relpath => RelFilepath,
|
||||
reason => Reason,
|
||||
options => Options
|
||||
}).
|
||||
|
||||
-spec start_reader(options(), file:name(), _Caller :: pid()) ->
|
||||
{ok, reader()} | {error, enoent}.
|
||||
start_reader(Options, RelFilepath, CallerPid) ->
|
||||
|
@ -282,32 +317,112 @@ start_reader(Options, RelFilepath, CallerPid) ->
|
|||
|
||||
%%
|
||||
|
||||
-spec list(options()) ->
|
||||
{ok, [exportinfo(), ...]} | {error, [{node(), _Reason}]}.
|
||||
list(_Options) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
Replies = emqx_ft_storage_exporter_fs_proto_v1:list_exports(Nodes),
|
||||
{Results, Errors} = lists:foldl(
|
||||
fun
|
||||
({_Node, {ok, {ok, Files}}}, {Acc, Errors}) ->
|
||||
{Files ++ Acc, Errors};
|
||||
({Node, {ok, {error, _} = Error}}, {Acc, Errors}) ->
|
||||
{Acc, [{Node, Error} | Errors]};
|
||||
({Node, Error}, {Acc, Errors}) ->
|
||||
{Acc, [{Node, Error} | Errors]}
|
||||
end,
|
||||
{[], []},
|
||||
lists:zip(Nodes, Replies)
|
||||
),
|
||||
length(Errors) > 0 andalso
|
||||
?SLOG(warning, #{msg => "list_remote_exports_failed", errors => Errors}),
|
||||
case Results of
|
||||
[_ | _] ->
|
||||
{ok, Results};
|
||||
[] when Errors =:= [] ->
|
||||
{ok, Results};
|
||||
[] ->
|
||||
{error, Errors}
|
||||
-spec list(options(), query()) ->
|
||||
{ok, page(exportinfo())} | {error, [{node(), _Reason}]}.
|
||||
list(_Options, Query = #{transfer := _Transfer}) ->
|
||||
case list(Query) of
|
||||
#{items := Exports = [_ | _]} ->
|
||||
{ok, #{items => Exports}};
|
||||
#{items := [], errors := NodeErrors} ->
|
||||
{error, NodeErrors}
|
||||
end;
|
||||
list(_Options, Query) ->
|
||||
Result = list(Query),
|
||||
case Result of
|
||||
#{errors := NodeErrors} ->
|
||||
?SLOG(warning, "list_exports_errors", #{
|
||||
query => Query,
|
||||
errors => NodeErrors
|
||||
});
|
||||
#{} ->
|
||||
ok
|
||||
end,
|
||||
case Result of
|
||||
#{items := Exports, cursor := Cursor} ->
|
||||
{ok, #{items => lists:reverse(Exports), cursor => encode_cursor(Cursor)}};
|
||||
#{items := Exports} ->
|
||||
{ok, #{items => lists:reverse(Exports)}}
|
||||
end.
|
||||
|
||||
list(QueryIn) ->
|
||||
{Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(mria_mnesia:running_nodes())),
|
||||
list_nodes(NodeQuery, Nodes, #{items => []}).
|
||||
|
||||
list_nodes(Query, Nodes = [Node | Rest], Acc) ->
|
||||
case emqx_ft_storage_exporter_fs_proto_v1:list_exports([Node], Query) of
|
||||
[{ok, Result}] ->
|
||||
list_accumulate(Result, Query, Nodes, Acc);
|
||||
[Failure] ->
|
||||
?SLOG(warning, #{
|
||||
msg => "list_remote_exports_failed",
|
||||
node => Node,
|
||||
query => Query,
|
||||
failure => Failure
|
||||
}),
|
||||
list_next(Query, Rest, Acc)
|
||||
end;
|
||||
list_nodes(_Query, [], Acc) ->
|
||||
Acc.
|
||||
|
||||
list_accumulate({ok, Exports}, Query, [Node | Rest], Acc = #{items := EAcc}) ->
|
||||
NExports = length(Exports),
|
||||
AccNext = Acc#{items := Exports ++ EAcc},
|
||||
case Query of
|
||||
#{limit := Limit} when NExports < Limit ->
|
||||
list_next(Query#{limit => Limit - NExports}, Rest, AccNext);
|
||||
#{limit := _} ->
|
||||
AccNext#{cursor => mk_cursor(Node, Exports)};
|
||||
#{} ->
|
||||
list_next(Query, Rest, AccNext)
|
||||
end;
|
||||
list_accumulate({error, Reason}, Query, [Node | Rest], Acc) ->
|
||||
EAcc = maps:get(errors, Acc, []),
|
||||
list_next(Query, Rest, Acc#{errors => [{Node, Reason} | EAcc]}).
|
||||
|
||||
list_next(Query, Nodes, Acc) ->
|
||||
list_nodes(maps:remove(following, Query), Nodes, Acc).
|
||||
|
||||
decode_query(Query = #{following := Cursor}, Nodes) ->
|
||||
{Node, NodeCursor} = decode_cursor(Cursor),
|
||||
{skip_query_nodes(Node, Nodes), Query#{following => NodeCursor}};
|
||||
decode_query(Query = #{}, Nodes) ->
|
||||
{Nodes, Query}.
|
||||
|
||||
skip_query_nodes(CNode, Nodes) ->
|
||||
lists:dropwhile(fun(N) -> N < CNode end, Nodes).
|
||||
|
||||
mk_cursor(Node, [_Last = #{transfer := Transfer, name := Name} | _]) ->
|
||||
{Node, #{transfer => Transfer, name => Name}}.
|
||||
|
||||
encode_cursor({Node, #{transfer := {ClientId, FileId}, name := Name}}) ->
|
||||
emqx_utils_json:encode(#{
|
||||
<<"n">> => Node,
|
||||
<<"cid">> => ClientId,
|
||||
<<"fid">> => FileId,
|
||||
<<"fn">> => unicode:characters_to_binary(Name)
|
||||
}).
|
||||
|
||||
decode_cursor(Cursor) ->
|
||||
try
|
||||
#{
|
||||
<<"n">> := NodeIn,
|
||||
<<"cid">> := ClientId,
|
||||
<<"fid">> := FileId,
|
||||
<<"fn">> := NameIn
|
||||
} = emqx_utils_json:decode(Cursor),
|
||||
true = is_binary(ClientId),
|
||||
true = is_binary(FileId),
|
||||
Node = binary_to_existing_atom(NodeIn),
|
||||
Name = unicode:characters_to_list(NameIn),
|
||||
true = is_list(Name),
|
||||
{Node, #{transfer => {ClientId, FileId}, name => Name}}
|
||||
catch
|
||||
error:{_, invalid_json} ->
|
||||
error({badarg, cursor});
|
||||
error:{badmatch, _} ->
|
||||
error({badarg, cursor});
|
||||
error:badarg ->
|
||||
error({badarg, cursor})
|
||||
end.
|
||||
|
||||
%%
|
||||
|
@ -352,9 +467,9 @@ mk_result_reldir(Transfer = {ClientId, FileId}) ->
|
|||
BucketRest/binary
|
||||
>> = binary:encode_hex(Hash),
|
||||
[
|
||||
Bucket1,
|
||||
Bucket2,
|
||||
BucketRest,
|
||||
binary_to_list(Bucket1),
|
||||
binary_to_list(Bucket2),
|
||||
binary_to_list(BucketRest),
|
||||
emqx_ft_fs_util:escape_filename(ClientId),
|
||||
emqx_ft_fs_util:escape_filename(FileId)
|
||||
].
|
||||
|
|
|
@ -21,15 +21,15 @@
|
|||
-module(emqx_ft_storage_exporter_fs_proxy).
|
||||
|
||||
-export([
|
||||
list_exports_local/0,
|
||||
list_exports_local/1,
|
||||
read_export_file_local/2
|
||||
]).
|
||||
|
||||
list_exports_local() ->
|
||||
list_exports_local(Query) ->
|
||||
emqx_ft_storage:with_storage_type(local, fun(Storage) ->
|
||||
case emqx_ft_storage_exporter:exporter(Storage) of
|
||||
{emqx_ft_storage_exporter_fs, Options} ->
|
||||
emqx_ft_storage_exporter_fs:list_local(Options)
|
||||
emqx_ft_storage_exporter_fs:list_local(Options, Query)
|
||||
% NOTE
|
||||
% This case clause is currently deemed unreachable by dialyzer.
|
||||
% InvalidExporter ->
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
-export([get_subdir/2]).
|
||||
-export([get_subdir/3]).
|
||||
|
||||
-export([files/1]).
|
||||
-export([files/2]).
|
||||
|
||||
-export([on_config_update/2]).
|
||||
|
||||
|
@ -217,8 +217,8 @@ assemble(Storage, Transfer, Size) ->
|
|||
|
||||
%%
|
||||
|
||||
files(Storage) ->
|
||||
emqx_ft_storage_exporter:list(Storage).
|
||||
files(Storage, Query) ->
|
||||
emqx_ft_storage_exporter:list(Storage, Query).
|
||||
|
||||
%%
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
-export([introduced_in/0]).
|
||||
|
||||
-export([list_exports/1]).
|
||||
-export([list_exports/2]).
|
||||
-export([read_export_file/3]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
@ -28,14 +28,17 @@
|
|||
introduced_in() ->
|
||||
"5.0.17".
|
||||
|
||||
-spec list_exports([node()]) ->
|
||||
emqx_rpc:erpc_multicall([emqx_ft_storage:file_info()]).
|
||||
list_exports(Nodes) ->
|
||||
-spec list_exports([node()], emqx_ft_storage:query(_LocalCursor)) ->
|
||||
emqx_rpc:erpc_multicall(
|
||||
{ok, [emqx_ft_storage:file_info()]}
|
||||
| {error, file:posix() | disabled | {invalid_storage_type, _}}
|
||||
).
|
||||
list_exports(Nodes, Query) ->
|
||||
erpc:multicall(
|
||||
Nodes,
|
||||
emqx_ft_storage_exporter_fs_proxy,
|
||||
list_exports_local,
|
||||
[]
|
||||
[Query]
|
||||
).
|
||||
|
||||
-spec read_export_file(node(), file:name(), pid()) ->
|
||||
|
|
|
@ -671,7 +671,7 @@ encode_meta(Meta) ->
|
|||
emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)).
|
||||
|
||||
list_files(ClientId) ->
|
||||
{ok, Files} = emqx_ft_storage:files(),
|
||||
{ok, #{items := Files}} = emqx_ft_storage:files(),
|
||||
[File || File = #{transfer := {CId, _}} <- Files, CId == ClientId].
|
||||
|
||||
read_export(#{path := AbsFilepath}) ->
|
||||
|
|
|
@ -47,17 +47,31 @@ end_per_testcase(_Case, _Config) ->
|
|||
%% Tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_list_ready_transfers(Config) ->
|
||||
t_list_files(Config) ->
|
||||
ClientId = client_id(Config),
|
||||
FileId = <<"f1">>,
|
||||
|
||||
ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, "f1", <<"data">>),
|
||||
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>),
|
||||
|
||||
{ok, 200, #{<<"files">> := Files}} =
|
||||
request(get, uri(["file_transfer", "files"]), fun json/1),
|
||||
request_json(get, uri(["file_transfer", "files"])),
|
||||
|
||||
?assertInclude(
|
||||
#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>},
|
||||
Files
|
||||
?assertMatch(
|
||||
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
|
||||
[File || File = #{<<"clientid">> := CId} <- Files, CId == ClientId]
|
||||
),
|
||||
|
||||
{ok, 200, #{<<"files">> := FilesTransfer}} =
|
||||
request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
|
||||
|
||||
?assertMatch(
|
||||
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
|
||||
FilesTransfer
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
|
||||
request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]))
|
||||
).
|
||||
|
||||
t_download_transfer(Config) ->
|
||||
|
@ -67,10 +81,9 @@ t_download_transfer(Config) ->
|
|||
|
||||
?assertMatch(
|
||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||
request(
|
||||
request_json(
|
||||
get,
|
||||
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
|
||||
fun json/1
|
||||
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>})
|
||||
)
|
||||
),
|
||||
|
||||
|
@ -99,7 +112,7 @@ t_download_transfer(Config) ->
|
|||
),
|
||||
|
||||
{ok, 200, #{<<"files">> := [File]}} =
|
||||
request(get, uri(["file_transfer", "files"]), fun json/1),
|
||||
request_json(get, uri(["file_transfer", "files"])),
|
||||
|
||||
{ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)),
|
||||
|
||||
|
@ -108,6 +121,58 @@ t_download_transfer(Config) ->
|
|||
Response
|
||||
).
|
||||
|
||||
t_list_files_paging(Config) ->
|
||||
ClientId = client_id(Config),
|
||||
NFiles = 20,
|
||||
Uploads = [{mk_file_id("file:", N), mk_file_name(N)} || N <- lists:seq(1, NFiles)],
|
||||
ok = lists:foreach(
|
||||
fun({FileId, Name}) ->
|
||||
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, <<"data">>)
|
||||
end,
|
||||
Uploads
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
|
||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}))
|
||||
),
|
||||
|
||||
{ok, 200, #{<<"files">> := Files}} =
|
||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})),
|
||||
|
||||
?assert(length(Files) >= NFiles),
|
||||
|
||||
?assertNotMatch(
|
||||
{ok, 200, #{<<"cursor">> := _}},
|
||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}))
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}))
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
|
||||
request_json(
|
||||
get,
|
||||
uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>})
|
||||
)
|
||||
),
|
||||
|
||||
PageThrough = fun PageThrough(Query, Acc) ->
|
||||
case request_json(get, uri(["file_transfer", "files"]) ++ query(Query)) of
|
||||
{ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
|
||||
PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
|
||||
{ok, 200, #{<<"files">> := FilesPage}} ->
|
||||
Acc ++ FilesPage
|
||||
end
|
||||
end,
|
||||
|
||||
?assertEqual(Files, PageThrough(#{limit => 1}, [])),
|
||||
?assertEqual(Files, PageThrough(#{limit => 8}, [])),
|
||||
?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -115,13 +180,19 @@ t_download_transfer(Config) ->
|
|||
client_id(Config) ->
|
||||
atom_to_binary(?config(tc, Config), utf8).
|
||||
|
||||
mk_file_id(Prefix, N) ->
|
||||
iolist_to_binary([Prefix, integer_to_list(N)]).
|
||||
|
||||
mk_file_name(N) ->
|
||||
"file." ++ integer_to_list(N).
|
||||
|
||||
request(Method, Url) ->
|
||||
emqx_mgmt_api_test_util:request(Method, Url, []).
|
||||
|
||||
request(Method, Url, Decoder) when is_function(Decoder) ->
|
||||
request_json(Method, Url) ->
|
||||
case emqx_mgmt_api_test_util:request(Method, Url, []) of
|
||||
{ok, Code, Body} ->
|
||||
{ok, Code, Decoder(Body)};
|
||||
{ok, Code, json(Body)};
|
||||
Otherwise ->
|
||||
Otherwise
|
||||
end.
|
||||
|
@ -138,6 +209,8 @@ uri_encode(T) ->
|
|||
|
||||
to_list(A) when is_atom(A) ->
|
||||
atom_to_list(A);
|
||||
to_list(A) when is_integer(A) ->
|
||||
integer_to_list(A);
|
||||
to_list(B) when is_binary(B) ->
|
||||
binary_to_list(B);
|
||||
to_list(L) when is_list(L) ->
|
||||
|
|
|
@ -240,7 +240,7 @@ list_exports(Config) ->
|
|||
|
||||
list_exports(Config, Transfer) ->
|
||||
{emqx_ft_storage_exporter_fs, Options} = exporter(Config),
|
||||
emqx_ft_storage_exporter_fs:list_local(Options, Transfer).
|
||||
emqx_ft_storage_exporter_fs:list_local_transfer(Options, Transfer).
|
||||
|
||||
exporter(Config) ->
|
||||
emqx_ft_storage_exporter:exporter(storage(Config)).
|
||||
|
|
|
@ -87,5 +87,5 @@ storage(Config) ->
|
|||
emqx_ft_test_helpers:local_storage(Config).
|
||||
|
||||
list_files(Config) ->
|
||||
{ok, Files} = emqx_ft_storage_fs:files(storage(Config)),
|
||||
{ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}),
|
||||
Files.
|
||||
|
|
|
@ -3,6 +3,9 @@ emqx_ft_api {
|
|||
file_list.desc:
|
||||
"""List all uploaded files."""
|
||||
|
||||
file_list_transfer.desc
|
||||
"""List a file uploaded during specified transfer, identified by client id and file id."""
|
||||
|
||||
}
|
||||
|
||||
emqx_ft_storage_exporter_fs_api {
|
||||
|
|
Loading…
Reference in New Issue