diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf index 941611424..c33ea447e 100644 --- a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf +++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf @@ -58,7 +58,7 @@ emqx_ft_schema { local_storage_exporter_type { desc { - en: "Type of the Exporter to use." + en: "Exporter type for the exporter to the local file system" zh: "" } label: { @@ -67,6 +67,17 @@ emqx_ft_schema { } } + s3_exporter_type { + desc { + en: "Exporter type for the exporter to S3" + zh: "" + } + label: { + en: "S3 Exporter Type" + zh: "" + } + } + local_storage_exporter_root { desc { en: "File system path to keep uploaded files." diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index b7c8f0eac..e3f6cbe1c 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -336,7 +336,7 @@ transfer(Msg, FileId) -> {clientid_to_binary(ClientId), FileId}. on_complete(Op, {ChanPid, PacketId}, Transfer, Result) -> - ?SLOG(debug, #{ + ?SLOG(warning, #{ msg => "on_complete", operation => Op, packet_id => PacketId, diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 17fed012a..3a352cd10 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -136,13 +136,12 @@ handle_event( ) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Coverage = emqx_ft_assembly:coverage(Asm), - % TODO: better error handling - {ok, Export} = emqx_ft_storage_exporter:start_export( - Storage, - Transfer, - Filemeta - ), - {next_state, {assemble, Coverage}, St#{export => Export}, ?internal([])}; + case emqx_ft_storage_exporter:start_export(Storage, Transfer, Filemeta) of + {ok, Export} -> + {next_state, {assemble, Coverage}, St#{export => Export}, ?internal([])}; + {error, _} = Error -> + {stop, {shutdown, Error}} + end; handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := Export}) -> % TODO % Currently, race is possible between getting segment info from the remote node and @@ -150,8 +149,12 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := % TODO: pipelining % TODO: better error handling {ok, Content} = pread(Node, Segment, St), - {ok, NExport} = emqx_ft_storage_exporter:write(Export, Content), - {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])}; + case emqx_ft_storage_exporter:write(Export, Content) of + {ok, NExport} -> + {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])}; + {error, _} = Error -> + {stop, {shutdown, Error}, maps:remove(export, St)} + end; handle_event(internal, _, {assemble, []}, St = #{}) -> {next_state, complete, St, ?internal([])}; handle_event(internal, _, complete, St = #{export := Export}) -> diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index 2abbe4c45..df28de218 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -16,7 +16,7 @@ -module(emqx_ft_schema). --behaviour(hocon_schema). +% -behaviour(hocon_schema). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). @@ -35,7 +35,7 @@ -reflect_type([json_value/0]). -%% +-import(hoconsc, [ref/1, ref/2, mk/2]). namespace() -> file_transfer. @@ -46,84 +46,130 @@ roots() -> [file_transfer]. fields(file_transfer) -> [ - {storage, #{ - type => hoconsc:union([ - hoconsc:ref(?MODULE, local_storage) - ]), - desc => ?DESC("storage") - }} + {storage, + mk( + hoconsc:union([ + ref(local_storage) + ]), + #{ + required => true, + desc => ?DESC("storage") + } + )} ]; fields(local_storage) -> [ - {type, #{ - type => local, - default => local, - required => false, - desc => ?DESC("local_type") - }}, - {segments, #{ - type => ?REF(local_storage_segments), - desc => ?DESC("local_storage_segments"), - required => false - }}, - {exporter, #{ - type => hoconsc:union([ - ?REF(local_storage_exporter) - ]), - desc => ?DESC("local_storage_exporter"), - required => true - }} + {type, + mk( + local, + #{ + default => local, + required => false, + desc => ?DESC("local_type") + } + )}, + {segments, + mk( + ref(local_storage_segments), + #{ + desc => ?DESC("local_storage_segments"), + required => false + } + )}, + {exporter, + mk( + hoconsc:union([ + ref(local_storage_exporter), + ref(s3_exporter) + ]), + #{ + desc => ?DESC("local_storage_exporter"), + required => true + } + )} ]; fields(local_storage_segments) -> [ - {root, #{ - type => binary(), - desc => ?DESC("local_storage_segments_root"), - required => false - }}, - {gc, #{ - type => ?REF(local_storage_segments_gc), - desc => ?DESC("local_storage_segments_gc"), - required => false - }} + {root, + mk( + binary(), + #{ + desc => ?DESC("local_storage_segments_root"), + required => false + } + )}, + {gc, + mk( + ref(local_storage_segments_gc), #{ + desc => ?DESC("local_storage_segments_gc"), + required => false + } + )} ]; fields(local_storage_exporter) -> [ - {type, #{ - type => local, - default => local, - required => false, - desc => ?DESC("local_storage_exporter_type") - }}, - {root, #{ - type => binary(), - desc => ?DESC("local_storage_exporter_root"), - required => false - }} + {type, + mk( + local, + #{ + default => local, + required => false, + desc => ?DESC("local_storage_exporter_type") + } + )}, + {root, + mk( + binary(), + #{ + desc => ?DESC("local_storage_exporter_root"), + required => false + } + )} ]; +fields(s3_exporter) -> + [ + {type, + mk( + s3, + #{ + default => s3, + required => false, + desc => ?DESC("s3_exporter_type") + } + )} + ] ++ + emqx_s3_schema:fields(s3); fields(local_storage_segments_gc) -> [ - {interval, #{ - type => emqx_schema:duration_ms(), - desc => ?DESC("storage_gc_interval"), - required => false, - default => "1h" - }}, - {maximum_segments_ttl, #{ - type => emqx_schema:duration_s(), - desc => ?DESC("storage_gc_max_segments_ttl"), - required => false, - default => "24h" - }}, - {minimum_segments_ttl, #{ - type => emqx_schema:duration_s(), - % desc => ?DESC("storage_gc_min_segments_ttl"), - required => false, - default => "5m", - % NOTE - % This setting does not seem to be useful to an end-user. - hidden => true - }} + {interval, + mk( + emqx_schema:duration_ms(), + #{ + desc => ?DESC("storage_gc_interval"), + required => false, + default => "1h" + } + )}, + {maximum_segments_ttl, + mk( + emqx_schema:duration_s(), + #{ + desc => ?DESC("storage_gc_max_segments_ttl"), + required => false, + default => "24h" + } + )}, + {minimum_segments_ttl, + mk( + emqx_schema:duration_s(), + #{ + required => false, + default => "5m", + % NOTE + % This setting does not seem to be useful to an end-user. + hidden => true + } + )} ]. desc(file_transfer) -> @@ -133,7 +179,9 @@ desc(local_storage) -> desc(local_storage_segments) -> "File transfer local segments storage settings"; desc(local_storage_exporter) -> - "Exporter settings for the File transfer local storage backend"; + "Local Exporter settings for the File transfer local storage backend"; +desc(s3_exporter) -> + "S3 Exporter settings for the File transfer local storage backend"; desc(local_storage_segments_gc) -> "Garbage collection settings for the File transfer local segments storage". diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index a7d1cd2e2..297762d28 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -34,42 +34,47 @@ -export([exporter/1]). --export_type([options/0]). -export_type([export/0]). -type storage() :: emxt_ft_storage_fs:storage(). -type transfer() :: emqx_ft:transfer(). -type filemeta() :: emqx_ft:filemeta(). --type options() :: map(). --type export() :: term(). +-type exporter_conf() :: map(). +-type export_st() :: term(). +-opaque export() :: {module(), export_st()}. --callback start_export(options(), transfer(), filemeta()) -> - {ok, export()} | {error, _Reason}. +-callback start_export(exporter_conf(), transfer(), filemeta()) -> + {ok, export_st()} | {error, _Reason}. --callback write(ExportSt :: export(), iodata()) -> - {ok, ExportSt :: export()} | {error, _Reason}. +%% Exprter must discard the export itself in case of error +-callback write(ExportSt :: export_st(), iodata()) -> + {ok, ExportSt :: export_st()} | {error, _Reason}. --callback complete(ExportSt :: export()) -> +-callback complete(ExportSt :: export_st()) -> ok | {error, _Reason}. --callback discard(ExportSt :: export()) -> +-callback discard(ExportSt :: export_st()) -> ok | {error, _Reason}. --callback list(options()) -> +-callback list(storage()) -> {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}. %% +-spec start_export(storage(), transfer(), filemeta()) -> + {ok, export()} | {error, _Reason}. start_export(Storage, Transfer, Filemeta) -> - {ExporterMod, Exporter} = exporter(Storage), - case ExporterMod:start_export(Exporter, Transfer, Filemeta) of + {ExporterMod, ExporterConf} = exporter(Storage), + case ExporterMod:start_export(ExporterConf, Transfer, Filemeta) of {ok, ExportSt} -> {ok, {ExporterMod, ExportSt}}; {error, _} = Error -> Error end. +-spec write(export(), iodata()) -> + {ok, export()} | {error, _Reason}. write({ExporterMod, ExportSt}, Content) -> case ExporterMod:write(ExportSt, Content) of {ok, ExportStNext} -> @@ -78,23 +83,31 @@ write({ExporterMod, ExportSt}, Content) -> Error end. +-spec complete(export()) -> + ok | {error, _Reason}. complete({ExporterMod, ExportSt}) -> ExporterMod:complete(ExportSt). +-spec discard(export()) -> + ok | {error, _Reason}. discard({ExporterMod, ExportSt}) -> ExporterMod:discard(ExportSt). -%% - +-spec list(storage()) -> + {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}. list(Storage) -> {ExporterMod, ExporterOpts} = exporter(Storage), ExporterMod:list(ExporterOpts). -%% - -spec exporter(storage()) -> {module(), _ExporterOptions}. exporter(Storage) -> case maps:get(exporter, Storage) of #{type := local} = Options -> - {emqx_ft_storage_exporter_fs, Options} + {emqx_ft_storage_exporter_fs, without_type(Options)}; + #{type := s3} = Options -> + {emqx_ft_storage_exporter_s3, without_type(Options)} end. + +-spec without_type(exporter_conf()) -> exporter_conf(). +without_type(#{type := _} = Options) -> + maps:without([type], Options). diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 647d84124..64c0e325a 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -20,21 +20,22 @@ -include_lib("emqx/include/logger.hrl"). %% Exporter API +-behaviour(emqx_ft_storage_exporter). + -export([start_export/3]). -export([write/2]). -export([complete/1]). -export([discard/1]). +-export([list/1]). +%% Internal API for RPC -export([list_local/1]). -export([list_local/2]). -export([start_reader/3]). --export([list/1]). % TODO % -export([list/2]). --export_type([export/0]). - -type options() :: _TODO. -type transfer() :: emqx_ft:transfer(). -type filemeta() :: emqx_ft:filemeta(). @@ -49,7 +50,7 @@ -type file_error() :: emqx_ft_storage_fs:file_error(). --opaque export() :: #{ +-type export() :: #{ path := file:name(), handle := io:device(), result := file:name(), @@ -116,6 +117,7 @@ write(Export = #{handle := Handle, hash := Ctx}, IoData) -> ok -> {ok, Export#{hash := update_checksum(Ctx, IoData)}}; {error, _} = Error -> + _ = discard(Export), Error end. @@ -370,4 +372,4 @@ mk_transfer_hash(Transfer) -> crypto:hash(?BUCKET_HASH, term_to_binary(Transfer)). get_storage_root(Options) -> - maps:get(root, Options, filename:join([emqx:data_dir(), "ft", "exports"])). + maps:get(root, Options, filename:join([emqx:data_dir(), file_transfer, exports])). diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl new file mode 100644 index 000000000..300f0dc85 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -0,0 +1,69 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_exporter_s3). + +-include_lib("emqx/include/logger.hrl"). + +%% Exporter API +-export([start_export/3]). +-export([write/2]). +-export([complete/1]). +-export([discard/1]). +-export([list/1]). + +-type options() :: emqx_s3:profile_config(). +-type transfer() :: emqx_ft:transfer(). +-type filemeta() :: emqx_ft:filemeta(). +-type exportinfo() :: #{ + transfer := transfer(), + name := file:name(), + uri := uri_string:uri_string(), + timestamp := emqx_datetime:epoch_second(), + size := _Bytes :: non_neg_integer(), + meta => filemeta() +}. + +-type export_st() :: #{ + pid := pid(), + meta := filemeta() +}. + +-spec start_export(options(), transfer(), filemeta()) -> + {ok, export_st()} | {error, term()}. +start_export(_Options, _Transfer, Filemeta = #{name := Filename}) -> + Pid = spawn(fun() -> Filename end), + #{meta => Filemeta, pid => Pid}. + +-spec write(export_st(), iodata()) -> + {ok, export_st()} | {error, term()}. +write(ExportSt, _IoData) -> + {ok, ExportSt}. + +-spec complete(export_st()) -> + ok | {error, term()}. +complete(_ExportSt) -> + ok. + +-spec discard(export_st()) -> + ok. +discard(_ExportSt) -> + ok. + +-spec list(options()) -> + {ok, [exportinfo()]} | {error, term()}. +list(_Options) -> + {ok, []}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 4e5cc9236..d871d1f32 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -86,14 +86,18 @@ -define(MANIFEST, "MANIFEST.json"). -define(SEGMENT, "SEG"). --type storage() :: #{ - root => file:name(), - exporter => exporter() +-type segments() :: #{ + root := file:name(), + gc := #{ + interval := non_neg_integer(), + maximum_segments_ttl := non_neg_integer(), + minimum_segments_ttl := non_neg_integer() + } }. --type exporter() :: #{ - type := local, - root => file:name() +-type storage() :: #{ + segments := segments(), + exporter := emqx_ft_storage_exporter:exporter() }. -type file_error() :: diff --git a/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf b/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf new file mode 100644 index 000000000..4e0870bae --- /dev/null +++ b/apps/emqx_s3/i18n/emqx_s3_schema_i18n.conf @@ -0,0 +1,76 @@ +emqx_s3_schema { + access_key_id { + desc { + en: "The access key id of the S3 bucket." + } + label { + en: "Access Key ID" + } + } + secret_access_key { + desc { + en: "The secret access key of the S3 bucket." + } + label { + en: "Secret Access Key" + } + } + bucket { + desc { + en: "The name of the S3 bucket." + } + label { + en: "Bucket" + } + } + host { + desc { + en: "The host of the S3 endpoint." + } + label { + en: "S3 endpoint Host" + } + } + port { + desc { + en: "The port of the S3 endpoint." + } + label { + en: "S3 endpoint port" + } + } + min_part_size { + desc { + en: """The minimum part size for multipart uploads. +Uploaded data will be accumulated in memory until this size is reached.""" + } + label { + en: "Minimum multipart upload part size" + } + } + max_part_size { + desc { + en: """The maximum part size for multipart uploads. +S3 uploader won't try to upload parts larger than this size.""" + } + label { + en: "Maximum multipart upload part size" + } + } + acl { + desc { + en: "The ACL to use for the uploaded objects." + } + label { + en: "ACL" + } + } + transport_options { + desc { + en: "Options for the HTTP transport layer used by the S3 client." + } + label { + en: "Transport Options" + } + } +} diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl index 6d2577dca..7c78de979 100644 --- a/apps/emqx_s3/src/emqx_s3.erl +++ b/apps/emqx_s3/src/emqx_s3.erl @@ -21,8 +21,32 @@ -type profile_id() :: term(). -%% TODO: define fields --type profile_config() :: map(). +-type acl() :: + private + | public_read + | public_read_write + | authenticated_read + | bucket_owner_read + | bucket_owner_full_control. + +-type transport_options() :: #{ + connect_timeout => pos_integer(), + enable_pipelining => pos_integer(), + headers => map(), + max_retries => pos_integer(), + pool_size => pos_integer(), + pool_type => atom(), + ssl => map() +}. + +-type profile_config() :: #{ + bucket := string(), + host := string(), + port := pos_integer(), + acl => acl(), + min_part_size => pos_integer(), + transport_options => transport_options() +}. %%-------------------------------------------------------------------- %% API diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index ceb0d1dd4..5d76e7120 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -7,7 +7,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, ref/1]). +-import(hoconsc, [mk/2, ref/2]). -export([roots/0, fields/1, namespace/0, tags/0]). @@ -101,7 +101,7 @@ fields(s3) -> )}, {transport_options, mk( - ref(transport_options), + ref(?MODULE, transport_options), #{ desc => ?DESC("transport_options"), required => false