From 0aefd4a8c7641635c852303fe923c249f134e699 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 6 Feb 2023 23:43:53 +0200 Subject: [PATCH] feat(ft): add streaming of file content when downloading --- apps/emqx_ft/src/emqx_ft_api.erl | 10 +- apps/emqx_ft/src/emqx_ft_storage.erl | 17 +-- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 37 ++++-- apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl | 6 +- .../emqx_ft/src/emqx_ft_storage_fs_reader.erl | 125 ++++++++++++++++++ .../src/emqx_ft_storage_fs_reader_sup.erl | 44 ++++++ apps/emqx_ft/src/emqx_ft_sup.erl | 11 +- .../src/proto/emqx_ft_storage_fs_proto_v1.erl | 8 +- .../emqx_ft_storage_fs_reader_proto_v1.erl | 33 +++++ mix.exs | 2 +- rebar.config | 2 +- 11 files changed, 259 insertions(+), 36 deletions(-) create mode 100644 apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl create mode 100644 apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl create mode 100644 apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index b2a822f36..ddc6e761a 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -107,10 +107,16 @@ schema("/file_transfer/file") -> '/file_transfer/file'(get, #{query_string := Query}) -> case emqx_ft_storage:get_ready_transfer(Query) of {ok, FileData} -> - {200, #{<<"content-type">> => <<"application/data">>}, FileData}; + {200, + #{ + <<"content-type">> => <<"application/data">>, + <<"content-disposition">> => <<"attachment">> + }, + FileData}; {error, enoent} -> {404, error_msg('NOT_FOUND', <<"Not found">>)}; - {error, _} -> + {error, Error} -> + ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}), {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} end. diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index e8f1d9c47..0dd9d7989 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -22,8 +22,6 @@ store_segment/2, assemble/2, - parse_id/1, - ready_transfers/0, get_ready_transfer/1, @@ -39,7 +37,7 @@ -type ready_transfer_id() :: term(). -type ready_transfer_info() :: map(). --type ready_transfer_data() :: binary(). +-type ready_transfer_data() :: binary() | qlc:query_handle(). %%-------------------------------------------------------------------- %% Behaviour @@ -88,19 +86,6 @@ get_ready_transfer(ReadyTransferId) -> Mod = mod(), Mod:get_ready_transfer(storage(), ReadyTransferId). --spec parse_id(map()) -> {ok, ready_transfer_id()} | {error, term()}. -parse_id(#{ - <<"type">> := local, <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"id">> := Id -}) -> - case emqx_misc:safe_to_existing_atom(NodeBin) of - {ok, Node} -> - {ok, {local, Node, ClientId, Id}}; - {error, _} -> - {error, {invalid_node, NodeBin}} - end; -parse_id(#{}) -> - {error, invalid_file_id}. - -spec with_storage_type(atom(), atom(), list(term())) -> any(). with_storage_type(Type, Fun, Args) -> Storage = storage(), diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 088c316ae..a120a4067 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -29,7 +29,7 @@ -export([transfers/1]). -export([ready_transfers_local/1]). --export([get_ready_transfer_local/2]). +-export([get_ready_transfer_local/3]). -export([ready_transfers/1]). -export([get_ready_transfer/2]). @@ -175,22 +175,43 @@ get_ready_transfer(_Storage, ReadyTransferId) -> case parse_ready_transfer_id(ReadyTransferId) of {ok, {Node, Transfer}} -> try - emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, Transfer) + case emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, self(), Transfer) of + {ok, ReaderPid} -> + {ok, emqx_ft_storage_fs_reader:table(ReaderPid)}; + {error, _} = Error -> + Error + end catch - error:Error -> - {error, Error}; - C:Error -> - {error, {C, Error}} + error:Exc:Stacktrace -> + ?SLOG(warning, #{ + msg => "get_ready_transfer_error", + node => Node, + transfer => Transfer, + exception => Exc, + stacktrace => Stacktrace + }), + {error, Exc}; + C:Exc:Stacktrace -> + ?SLOG(warning, #{ + msg => "get_ready_transfer_fail", + class => C, + node => Node, + transfer => Transfer, + exception => Exc, + stacktrace => Stacktrace + }), + {error, {C, Exc}} end; {error, _} = Error -> Error end. -get_ready_transfer_local(Storage, Transfer) -> +get_ready_transfer_local(Storage, CallerPid, Transfer) -> Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)), case file:list_dir(Dirname) of {ok, [Filename | _]} -> - file:read_file(filename:join([Dirname, Filename])); + FullFilename = filename:join([Dirname, Filename]), + emqx_ft_storage_fs_reader:start_supervised(CallerPid, FullFilename); {error, _} = Error -> Error end. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl index 0c30f5567..7e19dd322 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl @@ -23,7 +23,7 @@ -export([ list_local/2, pread_local/4, - get_ready_transfer_local/1, + get_ready_transfer_local/2, ready_transfers_local/0 ]). @@ -33,8 +33,8 @@ list_local(Transfer, What) -> pread_local(Transfer, Frag, Offset, Size) -> emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]). -get_ready_transfer_local(Transfer) -> - emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]). +get_ready_transfer_local(CallerPid, Transfer) -> + emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [CallerPid, Transfer]). ready_transfers_local() -> emqx_ft_storage:with_storage_type(local, ready_transfers_local, []). diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl new file mode 100644 index 000000000..a6307765e --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl @@ -0,0 +1,125 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_reader). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-export([ + start_link/2, + start_link/3, + start_supervised/2, + start_supervised/3, + read/1 +]). + +-export([ + table/1 +]). + +-define(DEFAULT_CHUNK_SIZE, 1024). + +table(ReaderPid) -> + NextFun = fun NextFun(Pid) -> + try + case emqx_ft_storage_fs_reader_proto_v1:read(node(Pid), Pid) of + eof -> + []; + {ok, Data} -> + [Data | fun() -> NextFun(Pid) end]; + {error, Reason} -> + ?SLOG(warning, #{msg => "file_read_error", reason => Reason}), + [] + end + catch + Class:Error:Stacktrace -> + ?SLOG(warning, #{ + msg => "file_read_error", + class => Class, + reason => Error, + stacktrace => Stacktrace + }), + [] + end + end, + qlc:table(fun() -> NextFun(ReaderPid) end, []). + +start_link(CallerPid, Filename) -> + start_link(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE). + +start_link(CallerPid, Filename, ChunkSize) -> + gen_server:start_link(?MODULE, [CallerPid, Filename, ChunkSize], []). + +start_supervised(CallerPid, Filename) -> + start_supervised(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE). + +start_supervised(CallerPid, Filename, ChunkSize) -> + emqx_ft_storage_fs_reader_sup:start_child(CallerPid, Filename, ChunkSize). + +read(Pid) -> + gen_server:call(Pid, read). + +init([CallerPid, Filename, ChunkSize]) -> + true = link(CallerPid), + case file:open(Filename, [read, raw, binary]) of + {ok, File} -> + {ok, #{ + filename => Filename, + file => File, + chunk_size => ChunkSize + }}; + {error, Reason} -> + {stop, Reason} + end. + +handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) -> + case file:read(File, ChunkSize) of + {ok, Data} -> + ?SLOG(warning, #{msg => "read", bytes => byte_size(Data)}), + {reply, {ok, Data}, State}; + eof -> + ?SLOG(warning, #{msg => "read", eof => true}), + {stop, normal, eof, State}; + {error, Reason} = Error -> + {stop, Reason, Error, State} + end; +handle_call(Msg, _From, State) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, State}. + +handle_info(Msg, State) -> + ?SLOG(warning, #{msg => "unexpected_message", info_msg => Msg}), + {noreply, State}. + +handle_cast(Msg, State) -> + ?SLOG(warning, #{msg => "unexpected_message", case_msg => Msg}), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl new file mode 100644 index 000000000..7435d7e1c --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl @@ -0,0 +1,44 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_reader_sup). + +-behaviour(supervisor). + +-export([ + init/1, + start_link/0, + start_child/3 +]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(CallerPid, Filename, ChunkSize) -> + Childspec = #{ + id => {CallerPid, Filename}, + start => {emqx_ft_storage_fs_reader, start_link, [CallerPid, Filename, ChunkSize]}, + restart => temporary + }, + supervisor:start_child(?MODULE, Childspec). + +init(_) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 1000 + }, + {ok, {SupFlags, []}}. diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index b4ce52edb..5c2025860 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -43,6 +43,15 @@ init([]) -> modules => [emqx_ft_assembler_sup] }, + FileReaderSup = #{ + id => emqx_ft_storage_fs_reader_sup, + start => {emqx_ft_storage_fs_reader_sup, start_link, []}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [emqx_ft_storage_fs_reader_sup] + }, + Responder = #{ id => emqx_ft_responder, start => {emqx_ft_responder, start_link, []}, @@ -52,5 +61,5 @@ init([]) -> modules => [emqx_ft_responder] }, - ChildSpecs = [Responder, AssemblerSup], + ChildSpecs = [Responder, AssemblerSup, FileReaderSup], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index 2e3dc8632..082df9ac0 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -24,7 +24,7 @@ -export([multilist/3]). -export([pread/5]). -export([ready_transfers/1]). --export([get_ready_transfer/2]). +-export([get_ready_transfer/3]). -type offset() :: emqx_ft:offset(). -type transfer() :: emqx_ft:transfer(). @@ -56,9 +56,9 @@ pread(Node, Transfer, Frag, Offset, Size) -> ready_transfers(Nodes) -> erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []). --spec get_ready_transfer(node(), emqx_ft_storage:ready_transfer_id()) -> +-spec get_ready_transfer(node(), pid(), emqx_ft_storage:ready_transfer_id()) -> {ok, emqx_ft_storage:ready_transfer_data()} | {error, term()} | no_return(). -get_ready_transfer(Node, ReadyTransferId) -> - erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [ReadyTransferId]). +get_ready_transfer(Node, CallerPid, ReadyTransferId) -> + erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [CallerPid, ReadyTransferId]). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl new file mode 100644 index 000000000..1bbb05471 --- /dev/null +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_reader_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([read/2]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.17". + +-spec read(node(), pid()) -> + {ok, binary()} | eof | {error, term()} | no_return(). +read(Node, Pid) -> + erpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]). diff --git a/mix.exs b/mix.exs index 92024f48d..9a8c22e6b 100644 --- a/mix.exs +++ b/mix.exs @@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do {:ekka, github: "emqx/ekka", tag: "0.14.6", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, - {:minirest, github: "emqx/minirest", tag: "1.3.8", override: true}, + {:minirest, github: "emqx/minirest", tag: "1.3.9", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, diff --git a/rebar.config b/rebar.config index f084f9827..0cc143e71 100644 --- a/rebar.config +++ b/rebar.config @@ -65,9 +65,9 @@ , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.6"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.9"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.5"}}}