From 04e5378bdaa320dec5d87489f81b16e2d1d316d3 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 6 Feb 2023 01:49:06 +0200 Subject: [PATCH] feat(ft): add API --- apps/emqx_ft/i18n/emqx_ft_api_i18n.conf | 25 +++ apps/emqx_ft/src/emqx_ft.erl | 8 +- apps/emqx_ft/src/emqx_ft_api.erl | 163 ++++++++++++++++++ apps/emqx_ft/src/emqx_ft_storage.erl | 97 ++++++----- apps/emqx_ft/src/emqx_ft_storage_dummy.erl | 10 +- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 159 ++++++++++------- .../src/proto/emqx_ft_storage_fs_proto_v1.erl | 25 ++- 7 files changed, 360 insertions(+), 127 deletions(-) create mode 100644 apps/emqx_ft/i18n/emqx_ft_api_i18n.conf create mode 100644 apps/emqx_ft/src/emqx_ft_api.erl diff --git a/apps/emqx_ft/i18n/emqx_ft_api_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_api_i18n.conf new file mode 100644 index 000000000..0bda935f8 --- /dev/null +++ b/apps/emqx_ft/i18n/emqx_ft_api_i18n.conf @@ -0,0 +1,25 @@ +emqx_ft_api { + + file_list { + desc { + en: "List all uploaded files." + zh: "列出所有上传的文件。" + } + label: { + en: "List all uploaded files" + zh: "列出所有上传的文件" + } + } + + file_get { + desc { + en: "Get a file by its id." + zh: "根据文件 id 获取文件。" + } + label: { + en: "Get a file by its id" + zh: "根据文件 id 获取文件" + } + } + +} diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index ce3a7a97d..f54119440 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -189,7 +189,7 @@ on_fin(PacketId, Msg, FileId, Checksum) -> %% We have new fin packet ok -> Callback = callback(FinPacketKey, FileId), - case assemble(transfer(Msg, FileId), Callback) of + case emqx_ft_storage:assemble(transfer(Msg, FileId), Callback) of %% Assembling started, packet will be acked by the callback or the responder {ok, _} -> undefined; @@ -214,12 +214,6 @@ on_fin(PacketId, Msg, FileId, Checksum) -> undefined end. -assemble(Transfer, Callback) -> - emqx_ft_storage:assemble( - Transfer, - Callback - ). - callback({ChanPid, PacketId} = Key, _FileId) -> fun(Result) -> case emqx_ft_responder:unregister(Key) of diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl new file mode 100644 index 000000000..b2a822f36 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -0,0 +1,163 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ft_api). + +-behaviour(minirest_api). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% Swagger specs from hocon schema +-export([ + api_spec/0, + paths/0, + schema/1, + namespace/0 +]). + +-export([ + fields/1, + roots/0 +]). + +%% API callbacks +-export([ + '/file_transfer/files'/2, + '/file_transfer/file'/2 +]). + +-import(hoconsc, [mk/2, ref/1, ref/2]). + +namespace() -> "file_transfer". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/file_transfer/files", + "/file_transfer/file" + ]. + +schema("/file_transfer/files") -> + #{ + 'operationId' => '/file_transfer/files', + get => #{ + tags => [<<"file_transfer">>], + summary => <<"List all uploaded files">>, + description => ?DESC("file_list"), + responses => #{ + 200 => <<"Operation success">>, + 503 => emqx_dashboard_swagger:error_codes( + ['SERVICE_UNAVAILABLE'], <<"Service unavailable">> + ) + } + } + }; +schema("/file_transfer/file") -> + #{ + 'operationId' => '/file_transfer/file', + get => #{ + tags => [<<"file_transfer">>], + summary => <<"Download a particular file">>, + description => ?DESC("file_get"), + parameters => [ + ref(file_node), + ref(file_clientid), + ref(file_id) + ], + responses => #{ + 200 => <<"Operation success">>, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Not found">>), + 503 => emqx_dashboard_swagger:error_codes( + ['SERVICE_UNAVAILABLE'], <<"Service unavailable">> + ) + } + } + }. + +'/file_transfer/files'(get, #{}) -> + case emqx_ft_storage:ready_transfers() of + {ok, Transfers} -> + FormattedTransfers = lists:map( + fun({Id, Info}) -> + #{id => Id, info => format_file_info(Info)} + end, + Transfers + ), + {200, #{<<"files">> => FormattedTransfers}}; + {error, _} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} + end. + +'/file_transfer/file'(get, #{query_string := Query}) -> + case emqx_ft_storage:get_ready_transfer(Query) of + {ok, FileData} -> + {200, #{<<"content-type">> => <<"application/data">>}, FileData}; + {error, enoent} -> + {404, error_msg('NOT_FOUND', <<"Not found">>)}; + {error, _} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} + end. + +error_msg(Code, Msg) -> + #{code => Code, message => emqx_misc:readable_error_msg(Msg)}. + +-spec fields(hocon_schema:name()) -> hocon_schema:fields(). +fields(file_node) -> + Desc = <<"File Node">>, + Meta = #{ + in => query, desc => Desc, example => <<"emqx@127.0.0.1">>, required => false + }, + [{node, hoconsc:mk(binary(), Meta)}]; +fields(file_clientid) -> + Desc = <<"File ClientId">>, + Meta = #{ + in => query, desc => Desc, example => <<"client1">>, required => false + }, + [{clientid, hoconsc:mk(binary(), Meta)}]; +fields(file_id) -> + Desc = <<"File">>, + Meta = #{ + in => query, desc => Desc, example => <<"file1">>, required => false + }, + [{fileid, hoconsc:mk(binary(), Meta)}]. + +roots() -> + [ + file_node, + file_clientid, + file_id + ]. + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +format_file_info(#{path := Path, size := Size, timestamp := Timestamp}) -> + #{ + path => Path, + size => Size, + timestamp => format_datetime(Timestamp) + }. + +format_datetime({{Year, Month, Day}, {Hour, Minute, Second}}) -> + iolist_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w", [ + Year, Month, Day, Hour, Minute, Second + ]) + ). diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 819656551..e8f1d9c47 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -20,24 +20,27 @@ [ store_filemeta/2, store_segment/2, - assemble/2 + assemble/2, + + parse_id/1, + + ready_transfers/0, + get_ready_transfer/1, + + with_storage_type/3 ] ). --export([list_local/2]). --export([pread_local/4]). - --export([local_transfers/0]). - --type offset() :: emqx_ft:offset(). --type transfer() :: emqx_ft:transfer(). - -type storage() :: emqx_config:config(). -export_type([assemble_callback/0]). -type assemble_callback() :: fun((ok | {error, term()}) -> any()). +-type ready_transfer_id() :: term(). +-type ready_transfer_info() :: map(). +-type ready_transfer_data() :: binary(). + %%-------------------------------------------------------------------- %% Behaviour %%-------------------------------------------------------------------- @@ -48,6 +51,10 @@ ok | {error, term()}. -callback assemble(storage(), emqx_ft:transfer(), assemble_callback()) -> {ok, pid()} | {error, term()}. +-callback ready_transfers(storage()) -> + {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. +-callback get_ready_transfer(storage(), ready_transfer_id()) -> + {ok, ready_transfer_data()} | {error, term()}. %%-------------------------------------------------------------------- %% API @@ -71,35 +78,46 @@ assemble(Transfer, Callback) -> Mod = mod(), Mod:assemble(storage(), Transfer, Callback). +-spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. +ready_transfers() -> + Mod = mod(), + Mod:ready_transfers(storage()). + +-spec get_ready_transfer(ready_transfer_id()) -> {ok, ready_transfer_data()} | {error, term()}. +get_ready_transfer(ReadyTransferId) -> + Mod = mod(), + Mod:get_ready_transfer(storage(), ReadyTransferId). + +-spec parse_id(map()) -> {ok, ready_transfer_id()} | {error, term()}. +parse_id(#{ + <<"type">> := local, <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"id">> := Id +}) -> + case emqx_misc:safe_to_existing_atom(NodeBin) of + {ok, Node} -> + {ok, {local, Node, ClientId, Id}}; + {error, _} -> + {error, {invalid_node, NodeBin}} + end; +parse_id(#{}) -> + {error, invalid_file_id}. + +-spec with_storage_type(atom(), atom(), list(term())) -> any(). +with_storage_type(Type, Fun, Args) -> + Storage = storage(), + case Storage of + #{type := Type} -> + Mod = mod(Storage), + apply(Mod, Fun, [Storage | Args]); + _ -> + {error, {invalid_storage_type, Type}} + end. + %%-------------------------------------------------------------------- %% Local FS API %%-------------------------------------------------------------------- --type filefrag() :: emqx_ft_storage_fs:filefrag(). --type transferinfo() :: emqx_ft_storage_fs:transferinfo(). - --spec list_local(transfer(), fragment | result) -> - {ok, [filefrag()]} | {error, term()}. -list_local(Transfer, What) -> - with_local_storage( - fun(Mod, Storage) -> Mod:list(Storage, Transfer, What) end - ). - --spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> - {ok, [filefrag()]} | {error, term()}. -pread_local(Transfer, Frag, Offset, Size) -> - with_local_storage( - fun(Mod, Storage) -> Mod:pread(Storage, Transfer, Frag, Offset, Size) end - ). - --spec local_transfers() -> - {ok, node(), #{transfer() => transferinfo()}} | {error, term()}. -local_transfers() -> - with_local_storage( - fun(Mod, Storage) -> Mod:transfers(Storage) end - ). - -%% +storage() -> + emqx_config:get([file_transfer, storage]). mod() -> mod(storage()). @@ -110,14 +128,3 @@ mod(Storage) -> emqx_ft_storage_fs % emqx_ft_storage_dummy end. - -storage() -> - emqx_config:get([file_transfer, storage]). - -with_local_storage(Fun) -> - case storage() of - #{type := local} = Storage -> - Fun(mod(Storage), Storage); - #{type := Type} -> - {error, {unsupported_storage_type, Type}} - end. diff --git a/apps/emqx_ft/src/emqx_ft_storage_dummy.erl b/apps/emqx_ft/src/emqx_ft_storage_dummy.erl index d486c0c29..4ed8ba487 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_dummy.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_dummy.erl @@ -21,7 +21,9 @@ -export([ store_filemeta/3, store_segment/3, - assemble/3 + assemble/3, + ready_transfers/1, + get_ready_transfer/2 ]). store_filemeta(_Storage, _Transfer, _Meta) -> @@ -33,3 +35,9 @@ store_segment(_Storage, _Transfer, _Segment) -> assemble(_Storage, _Transfer, Callback) -> Pid = spawn(fun() -> Callback({error, not_implemented}) end), {ok, Pid}. + +ready_transfers(_Storage) -> + {ok, []}. + +get_ready_transfer(_Storage, _Id) -> + {error, not_implemented}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 4afbc5276..37a433433 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -18,6 +18,8 @@ -behaviour(emqx_ft_storage). +-include_lib("emqx/include/logger.hrl"). + -export([store_filemeta/3]). -export([store_segment/3]). -export([list/3]). @@ -26,6 +28,14 @@ -export([transfers/1]). +-export([pread_local/4]). +-export([list_local/2]). +-export([ready_transfers_local/0, ready_transfers_local/1]). +-export([get_ready_transfer_local/1, get_ready_transfer_local/2]). + +-export([ready_transfers/1]). +-export([get_ready_transfer/2]). + -export([open_file/3]). -export([complete/4]). -export([write/2]). @@ -70,23 +80,8 @@ -define(MANIFEST, "MANIFEST.json"). -define(SEGMENT, "SEG"). --type root() :: file:name(). - -% -record(st, { -% root :: file:name() -% }). - %% TODO --type storage() :: root(). - -%% - -% -define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}). - -% -spec start_link(root()) -> -% {ok, pid()} | {error, already_started}. -% start_link(Root) -> -% gen_server:start_link(?PROCREF(Root), ?MODULE, [], []). +-type storage() :: emqx_config:config(). %% Store manifest in the backing filesystem. %% Atomic operation. @@ -178,7 +173,89 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> assemble(Storage, Transfer, Callback) -> emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). -%% +-spec list_local(transfer(), fragment | result) -> + {ok, [filefrag()]} | {error, term()}. +list_local(Transfer, What) -> + emqx_ft_storage:with_storage_type(local, list, [Transfer, What]). + +-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> + {ok, [filefrag()]} | {error, term()}. +pread_local(Transfer, Frag, Offset, Size) -> + emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]). + +get_ready_transfer(_Storage, ReadyTransferId) -> + case parse_ready_transfer_id(ReadyTransferId) of + {ok, {Node, Transfer}} -> + try + emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, Transfer) + catch + error:Error -> + {error, Error}; + C:Error -> + {error, {C, Error}} + end; + {error, _} = Error -> + Error + end. + +get_ready_transfer_local(Transfer) -> + emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]). + +get_ready_transfer_local(Storage, Transfer) -> + Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)), + case file:list_dir(Dirname) of + {ok, [Filename | _]} -> + file:read_file(filename:join([Dirname, Filename])); + {error, _} = Error -> + Error + end. + +ready_transfers(_Storage) -> + Nodes = mria_mnesia:running_nodes(), + Results = emqx_ft_storage_fs_proto_v1:ready_transfers(Nodes), + {GoodResults, BadResults} = lists:partition( + fun + ({ok, _}) -> true; + (_) -> false + end, + Results + ), + ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), + {ok, [File || {ok, Files} <- GoodResults, File <- Files]}. + +ready_transfers_local() -> + emqx_ft_storage:with_storage_type(local, ready_transfers_local, []). + +ready_transfers_local(Storage) -> + {ok, Transfers} = transfers(Storage), + lists:filtermap( + fun + ({Transfer, #{status := complete, result := [Result | _]}}) -> + {true, {ready_transfer_id(Transfer), maps:without([fragment], Result)}}; + (_) -> + false + end, + maps:to_list(Transfers) + ). + +ready_transfer_id({ClientId, FileId}) -> + #{ + <<"node">> => atom_to_binary(node()), + <<"clientid">> => ClientId, + <<"fileid">> => FileId + }. + +parse_ready_transfer_id(#{ + <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"fileid">> := FileId +}) -> + case emqx_misc:safe_to_existing_atom(NodeBin) of + {ok, Node} -> + {ok, {Node, {ClientId, FileId}}}; + {error, _} -> + {error, {invalid_node, NodeBin}} + end; +parse_ready_transfer_id(#{}) -> + {error, invalid_file_id}. -spec transfers(storage()) -> {ok, #{transfer() => transferinfo()}}. @@ -291,41 +368,8 @@ verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined -> verify_checksum(undefined, _) -> ok. -%% - -% -spec init(root()) -> {ok, storage()}. -% init(Root) -> -% % TODO: garbage_collect(...) -% {ok, Root}. - -% %% - -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). -% encode_filemeta(Meta) -> -% emqx_json:encode( -% ?PRELUDE( -% _Vsn = 1, -% maps:map( -% fun -% (name, Name) -> -% {<<"name">>, Name}; -% (size, Size) -> -% {<<"size">>, Size}; -% (checksum, {sha256, Hash}) -> -% {<<"checksum">>, <<"sha256:", (binary:encode_hex(Hash))/binary>>}; -% (expire_at, ExpiresAt) -> -% {<<"expire_at">>, ExpiresAt}; -% (segments_ttl, TTL) -> -% {<<"segments_ttl">>, TTL}; -% (user_data, UserData) -> -% {<<"user_data">>, UserData} -% end, -% Meta -% ) -% ) -% ). - encode_filemeta(Meta) -> % TODO: Looks like this should be hocon's responsibility. Schema = emqx_ft_schema:schema(filemeta), @@ -337,21 +381,6 @@ decode_filemeta(Binary) -> ?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]), hocon_tconf:check_plain(Schema, Term, #{atom_key => true, required => false}). -% map_into(Fun, Into, Ks, Map) -> -% map_foldr(map_into_fn(Fun, Into), Into, Ks, Map). - -% map_into_fn(Fun, L) when is_list(L) -> -% fun(K, V, Acc) -> [{K, Fun(K, V)} || Acc] end. - -% map_foldr(_Fun, Acc, [], _) -> -% Acc; -% map_foldr(Fun, Acc, [K | Ks], Map) when is_map_key(K, Map) -> -% Fun(K, maps:get(K, Map), map_foldr(Fun, Acc, Ks, Map)); -% map_foldr(Fun, Acc, [_ | Ks], Map) -> -% map_foldr(Fun, Acc, Ks, Map). - -%% - mk_segment_filename({Offset, Content}) -> lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index 45dd93ab8..4f354be63 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -23,12 +23,12 @@ -export([list/3]). -export([multilist/3]). -export([pread/5]). --export([transfers/1]). +-export([ready_transfers/1]). +-export([get_ready_transfer/2]). -type offset() :: emqx_ft:offset(). -type transfer() :: emqx_ft:transfer(). -type filefrag() :: emqx_ft_storage_fs:filefrag(). --type transferinfo() :: emqx_ft_storage_fs:transferinfo(). -include_lib("emqx/include/bpapi.hrl"). @@ -38,19 +38,26 @@ introduced_in() -> -spec list(node(), transfer(), fragment | result) -> {ok, [filefrag()]} | {error, term()}. list(Node, Transfer, What) -> - erpc:call(Node, emqx_ft_storage, list_local, [Transfer, What]). + erpc:call(Node, emqx_ft_storage_fs, list_local, [Transfer, What]). -spec multilist([node()], transfer(), fragment | result) -> emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}). multilist(Nodes, Transfer, What) -> - erpc:multicall(Nodes, emqx_ft_storage, list_local, [Transfer, What]). + erpc:multicall(Nodes, emqx_ft_storage_fs, list_local, [Transfer, What]). -spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> {ok, [filefrag()]} | {error, term()}. pread(Node, Transfer, Frag, Offset, Size) -> - erpc:call(Node, emqx_ft_storage, pread_local, [Transfer, Frag, Offset, Size]). + erpc:call(Node, emqx_ft_storage_fs, pread_local, [Transfer, Frag, Offset, Size]). --spec transfers([node()]) -> - emqx_rpc:erpc_multicall({ok, #{transfer() => transferinfo()}} | {error, term()}). -transfers(Nodes) -> - erpc:multicall(Nodes, emqx_ft_storage, local_transfers, []). +-spec ready_transfers([node()]) -> + {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]} + | {error, term()}. +ready_transfers(Nodes) -> + erpc:multicall(Nodes, emqx_ft_storage_fs, ready_transfers_local, []). + +-spec get_ready_transfer(node(), emqx_ft_storage:ready_transfer_id()) -> + {ok, emqx_ft_storage:ready_transfer_data()} + | {error, term()}. +get_ready_transfer(Node, ReadyTransferId) -> + erpc:call(Node, emqx_ft_storage_fs, get_ready_transfer_local, [ReadyTransferId]).