feat(ft): add API

This commit is contained in:
Ilya Averyanov 2023-02-06 01:49:06 +02:00
parent 92670bfe3d
commit 04e5378bda
7 changed files with 360 additions and 127 deletions

View File

@ -0,0 +1,25 @@
emqx_ft_api {
file_list {
desc {
en: "List all uploaded files."
zh: "列出所有上传的文件。"
}
label: {
en: "List all uploaded files"
zh: "列出所有上传的文件"
}
}
file_get {
desc {
en: "Get a file by its id."
zh: "根据文件 id 获取文件。"
}
label: {
en: "Get a file by its id"
zh: "根据文件 id 获取文件"
}
}
}

View File

@ -189,7 +189,7 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
%% We have new fin packet
ok ->
Callback = callback(FinPacketKey, FileId),
case assemble(transfer(Msg, FileId), Callback) of
case emqx_ft_storage:assemble(transfer(Msg, FileId), Callback) of
%% Assembling started, packet will be acked by the callback or the responder
{ok, _} ->
undefined;
@ -214,12 +214,6 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
undefined
end.
assemble(Transfer, Callback) ->
emqx_ft_storage:assemble(
Transfer,
Callback
).
callback({ChanPid, PacketId} = Key, _FileId) ->
fun(Result) ->
case emqx_ft_responder:unregister(Key) of

View File

@ -0,0 +1,163 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
%% Swagger specs from hocon schema
-export([
api_spec/0,
paths/0,
schema/1,
namespace/0
]).
-export([
fields/1,
roots/0
]).
%% API callbacks
-export([
'/file_transfer/files'/2,
'/file_transfer/file'/2
]).
-import(hoconsc, [mk/2, ref/1, ref/2]).
namespace() -> "file_transfer".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/file_transfer/files",
"/file_transfer/file"
].
schema("/file_transfer/files") ->
#{
'operationId' => '/file_transfer/files',
get => #{
tags => [<<"file_transfer">>],
summary => <<"List all uploaded files">>,
description => ?DESC("file_list"),
responses => #{
200 => <<"Operation success">>,
503 => emqx_dashboard_swagger:error_codes(
['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
)
}
}
};
schema("/file_transfer/file") ->
#{
'operationId' => '/file_transfer/file',
get => #{
tags => [<<"file_transfer">>],
summary => <<"Download a particular file">>,
description => ?DESC("file_get"),
parameters => [
ref(file_node),
ref(file_clientid),
ref(file_id)
],
responses => #{
200 => <<"Operation success">>,
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Not found">>),
503 => emqx_dashboard_swagger:error_codes(
['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
)
}
}
}.
'/file_transfer/files'(get, #{}) ->
case emqx_ft_storage:ready_transfers() of
{ok, Transfers} ->
FormattedTransfers = lists:map(
fun({Id, Info}) ->
#{id => Id, info => format_file_info(Info)}
end,
Transfers
),
{200, #{<<"files">> => FormattedTransfers}};
{error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
'/file_transfer/file'(get, #{query_string := Query}) ->
case emqx_ft_storage:get_ready_transfer(Query) of
{ok, FileData} ->
{200, #{<<"content-type">> => <<"application/data">>}, FileData};
{error, enoent} ->
{404, error_msg('NOT_FOUND', <<"Not found">>)};
{error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
error_msg(Code, Msg) ->
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
-spec fields(hocon_schema:name()) -> hocon_schema:fields().
fields(file_node) ->
Desc = <<"File Node">>,
Meta = #{
in => query, desc => Desc, example => <<"emqx@127.0.0.1">>, required => false
},
[{node, hoconsc:mk(binary(), Meta)}];
fields(file_clientid) ->
Desc = <<"File ClientId">>,
Meta = #{
in => query, desc => Desc, example => <<"client1">>, required => false
},
[{clientid, hoconsc:mk(binary(), Meta)}];
fields(file_id) ->
Desc = <<"File">>,
Meta = #{
in => query, desc => Desc, example => <<"file1">>, required => false
},
[{fileid, hoconsc:mk(binary(), Meta)}].
roots() ->
[
file_node,
file_clientid,
file_id
].
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
format_file_info(#{path := Path, size := Size, timestamp := Timestamp}) ->
#{
path => Path,
size => Size,
timestamp => format_datetime(Timestamp)
}.
format_datetime({{Year, Month, Day}, {Hour, Minute, Second}}) ->
iolist_to_binary(
io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w", [
Year, Month, Day, Hour, Minute, Second
])
).

View File

@ -20,24 +20,27 @@
[
store_filemeta/2,
store_segment/2,
assemble/2
assemble/2,
parse_id/1,
ready_transfers/0,
get_ready_transfer/1,
with_storage_type/3
]
).
-export([list_local/2]).
-export([pread_local/4]).
-export([local_transfers/0]).
-type offset() :: emqx_ft:offset().
-type transfer() :: emqx_ft:transfer().
-type storage() :: emqx_config:config().
-export_type([assemble_callback/0]).
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
-type ready_transfer_id() :: term().
-type ready_transfer_info() :: map().
-type ready_transfer_data() :: binary().
%%--------------------------------------------------------------------
%% Behaviour
%%--------------------------------------------------------------------
@ -48,6 +51,10 @@
ok | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer(), assemble_callback()) ->
{ok, pid()} | {error, term()}.
-callback ready_transfers(storage()) ->
{ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
-callback get_ready_transfer(storage(), ready_transfer_id()) ->
{ok, ready_transfer_data()} | {error, term()}.
%%--------------------------------------------------------------------
%% API
@ -71,35 +78,46 @@ assemble(Transfer, Callback) ->
Mod = mod(),
Mod:assemble(storage(), Transfer, Callback).
-spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
ready_transfers() ->
Mod = mod(),
Mod:ready_transfers(storage()).
-spec get_ready_transfer(ready_transfer_id()) -> {ok, ready_transfer_data()} | {error, term()}.
get_ready_transfer(ReadyTransferId) ->
Mod = mod(),
Mod:get_ready_transfer(storage(), ReadyTransferId).
-spec parse_id(map()) -> {ok, ready_transfer_id()} | {error, term()}.
parse_id(#{
<<"type">> := local, <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"id">> := Id
}) ->
case emqx_misc:safe_to_existing_atom(NodeBin) of
{ok, Node} ->
{ok, {local, Node, ClientId, Id}};
{error, _} ->
{error, {invalid_node, NodeBin}}
end;
parse_id(#{}) ->
{error, invalid_file_id}.
-spec with_storage_type(atom(), atom(), list(term())) -> any().
with_storage_type(Type, Fun, Args) ->
Storage = storage(),
case Storage of
#{type := Type} ->
Mod = mod(Storage),
apply(Mod, Fun, [Storage | Args]);
_ ->
{error, {invalid_storage_type, Type}}
end.
%%--------------------------------------------------------------------
%% Local FS API
%%--------------------------------------------------------------------
-type filefrag() :: emqx_ft_storage_fs:filefrag().
-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
-spec list_local(transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()}.
list_local(Transfer, What) ->
with_local_storage(
fun(Mod, Storage) -> Mod:list(Storage, Transfer, What) end
).
-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, [filefrag()]} | {error, term()}.
pread_local(Transfer, Frag, Offset, Size) ->
with_local_storage(
fun(Mod, Storage) -> Mod:pread(Storage, Transfer, Frag, Offset, Size) end
).
-spec local_transfers() ->
{ok, node(), #{transfer() => transferinfo()}} | {error, term()}.
local_transfers() ->
with_local_storage(
fun(Mod, Storage) -> Mod:transfers(Storage) end
).
%%
storage() ->
emqx_config:get([file_transfer, storage]).
mod() ->
mod(storage()).
@ -110,14 +128,3 @@ mod(Storage) ->
emqx_ft_storage_fs
% emqx_ft_storage_dummy
end.
storage() ->
emqx_config:get([file_transfer, storage]).
with_local_storage(Fun) ->
case storage() of
#{type := local} = Storage ->
Fun(mod(Storage), Storage);
#{type := Type} ->
{error, {unsupported_storage_type, Type}}
end.

View File

@ -21,7 +21,9 @@
-export([
store_filemeta/3,
store_segment/3,
assemble/3
assemble/3,
ready_transfers/1,
get_ready_transfer/2
]).
store_filemeta(_Storage, _Transfer, _Meta) ->
@ -33,3 +35,9 @@ store_segment(_Storage, _Transfer, _Segment) ->
assemble(_Storage, _Transfer, Callback) ->
Pid = spawn(fun() -> Callback({error, not_implemented}) end),
{ok, Pid}.
ready_transfers(_Storage) ->
{ok, []}.
get_ready_transfer(_Storage, _Id) ->
{error, not_implemented}.

View File

@ -18,6 +18,8 @@
-behaviour(emqx_ft_storage).
-include_lib("emqx/include/logger.hrl").
-export([store_filemeta/3]).
-export([store_segment/3]).
-export([list/3]).
@ -26,6 +28,14 @@
-export([transfers/1]).
-export([pread_local/4]).
-export([list_local/2]).
-export([ready_transfers_local/0, ready_transfers_local/1]).
-export([get_ready_transfer_local/1, get_ready_transfer_local/2]).
-export([ready_transfers/1]).
-export([get_ready_transfer/2]).
-export([open_file/3]).
-export([complete/4]).
-export([write/2]).
@ -70,23 +80,8 @@
-define(MANIFEST, "MANIFEST.json").
-define(SEGMENT, "SEG").
-type root() :: file:name().
% -record(st, {
% root :: file:name()
% }).
%% TODO
-type storage() :: root().
%%
% -define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}).
% -spec start_link(root()) ->
% {ok, pid()} | {error, already_started}.
% start_link(Root) ->
% gen_server:start_link(?PROCREF(Root), ?MODULE, [], []).
-type storage() :: emqx_config:config().
%% Store manifest in the backing filesystem.
%% Atomic operation.
@ -178,7 +173,89 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
assemble(Storage, Transfer, Callback) ->
emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
%%
-spec list_local(transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()}.
list_local(Transfer, What) ->
emqx_ft_storage:with_storage_type(local, list, [Transfer, What]).
-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, [filefrag()]} | {error, term()}.
pread_local(Transfer, Frag, Offset, Size) ->
emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
get_ready_transfer(_Storage, ReadyTransferId) ->
case parse_ready_transfer_id(ReadyTransferId) of
{ok, {Node, Transfer}} ->
try
emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, Transfer)
catch
error:Error ->
{error, Error};
C:Error ->
{error, {C, Error}}
end;
{error, _} = Error ->
Error
end.
get_ready_transfer_local(Transfer) ->
emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]).
get_ready_transfer_local(Storage, Transfer) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)),
case file:list_dir(Dirname) of
{ok, [Filename | _]} ->
file:read_file(filename:join([Dirname, Filename]));
{error, _} = Error ->
Error
end.
ready_transfers(_Storage) ->
Nodes = mria_mnesia:running_nodes(),
Results = emqx_ft_storage_fs_proto_v1:ready_transfers(Nodes),
{GoodResults, BadResults} = lists:partition(
fun
({ok, _}) -> true;
(_) -> false
end,
Results
),
?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
{ok, [File || {ok, Files} <- GoodResults, File <- Files]}.
ready_transfers_local() ->
emqx_ft_storage:with_storage_type(local, ready_transfers_local, []).
ready_transfers_local(Storage) ->
{ok, Transfers} = transfers(Storage),
lists:filtermap(
fun
({Transfer, #{status := complete, result := [Result | _]}}) ->
{true, {ready_transfer_id(Transfer), maps:without([fragment], Result)}};
(_) ->
false
end,
maps:to_list(Transfers)
).
ready_transfer_id({ClientId, FileId}) ->
#{
<<"node">> => atom_to_binary(node()),
<<"clientid">> => ClientId,
<<"fileid">> => FileId
}.
parse_ready_transfer_id(#{
<<"node">> := NodeBin, <<"clientid">> := ClientId, <<"fileid">> := FileId
}) ->
case emqx_misc:safe_to_existing_atom(NodeBin) of
{ok, Node} ->
{ok, {Node, {ClientId, FileId}}};
{error, _} ->
{error, {invalid_node, NodeBin}}
end;
parse_ready_transfer_id(#{}) ->
{error, invalid_file_id}.
-spec transfers(storage()) ->
{ok, #{transfer() => transferinfo()}}.
@ -291,41 +368,8 @@ verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined ->
verify_checksum(undefined, _) ->
ok.
%%
% -spec init(root()) -> {ok, storage()}.
% init(Root) ->
% % TODO: garbage_collect(...)
% {ok, Root}.
% %%
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
% encode_filemeta(Meta) ->
% emqx_json:encode(
% ?PRELUDE(
% _Vsn = 1,
% maps:map(
% fun
% (name, Name) ->
% {<<"name">>, Name};
% (size, Size) ->
% {<<"size">>, Size};
% (checksum, {sha256, Hash}) ->
% {<<"checksum">>, <<"sha256:", (binary:encode_hex(Hash))/binary>>};
% (expire_at, ExpiresAt) ->
% {<<"expire_at">>, ExpiresAt};
% (segments_ttl, TTL) ->
% {<<"segments_ttl">>, TTL};
% (user_data, UserData) ->
% {<<"user_data">>, UserData}
% end,
% Meta
% )
% )
% ).
encode_filemeta(Meta) ->
% TODO: Looks like this should be hocon's responsibility.
Schema = emqx_ft_schema:schema(filemeta),
@ -337,21 +381,6 @@ decode_filemeta(Binary) ->
?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]),
hocon_tconf:check_plain(Schema, Term, #{atom_key => true, required => false}).
% map_into(Fun, Into, Ks, Map) ->
% map_foldr(map_into_fn(Fun, Into), Into, Ks, Map).
% map_into_fn(Fun, L) when is_list(L) ->
% fun(K, V, Acc) -> [{K, Fun(K, V)} || Acc] end.
% map_foldr(_Fun, Acc, [], _) ->
% Acc;
% map_foldr(Fun, Acc, [K | Ks], Map) when is_map_key(K, Map) ->
% Fun(K, maps:get(K, Map), map_foldr(Fun, Acc, Ks, Map));
% map_foldr(Fun, Acc, [_ | Ks], Map) ->
% map_foldr(Fun, Acc, Ks, Map).
%%
mk_segment_filename({Offset, Content}) ->
lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]).

View File

@ -23,12 +23,12 @@
-export([list/3]).
-export([multilist/3]).
-export([pread/5]).
-export([transfers/1]).
-export([ready_transfers/1]).
-export([get_ready_transfer/2]).
-type offset() :: emqx_ft:offset().
-type transfer() :: emqx_ft:transfer().
-type filefrag() :: emqx_ft_storage_fs:filefrag().
-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
-include_lib("emqx/include/bpapi.hrl").
@ -38,19 +38,26 @@ introduced_in() ->
-spec list(node(), transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()}.
list(Node, Transfer, What) ->
erpc:call(Node, emqx_ft_storage, list_local, [Transfer, What]).
erpc:call(Node, emqx_ft_storage_fs, list_local, [Transfer, What]).
-spec multilist([node()], transfer(), fragment | result) ->
emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}).
multilist(Nodes, Transfer, What) ->
erpc:multicall(Nodes, emqx_ft_storage, list_local, [Transfer, What]).
erpc:multicall(Nodes, emqx_ft_storage_fs, list_local, [Transfer, What]).
-spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, [filefrag()]} | {error, term()}.
pread(Node, Transfer, Frag, Offset, Size) ->
erpc:call(Node, emqx_ft_storage, pread_local, [Transfer, Frag, Offset, Size]).
erpc:call(Node, emqx_ft_storage_fs, pread_local, [Transfer, Frag, Offset, Size]).
-spec transfers([node()]) ->
emqx_rpc:erpc_multicall({ok, #{transfer() => transferinfo()}} | {error, term()}).
transfers(Nodes) ->
erpc:multicall(Nodes, emqx_ft_storage, local_transfers, []).
-spec ready_transfers([node()]) ->
{ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]}
| {error, term()}.
ready_transfers(Nodes) ->
erpc:multicall(Nodes, emqx_ft_storage_fs, ready_transfers_local, []).
-spec get_ready_transfer(node(), emqx_ft_storage:ready_transfer_id()) ->
{ok, emqx_ft_storage:ready_transfer_data()}
| {error, term()}.
get_ready_transfer(Node, ReadyTransferId) ->
erpc:call(Node, emqx_ft_storage_fs, get_ready_transfer_local, [ReadyTransferId]).