feat(ft-s3): add initial integration

This commit is contained in:
Ilya Averyanov 2023-03-30 22:05:21 +03:00
parent 31b441a46e
commit 818a5cacf2
11 changed files with 362 additions and 112 deletions

View File

@ -58,7 +58,7 @@ emqx_ft_schema {
local_storage_exporter_type { local_storage_exporter_type {
desc { desc {
en: "Type of the Exporter to use." en: "Exporter type for the exporter to the local file system"
zh: "" zh: ""
} }
label: { label: {
@ -67,6 +67,17 @@ emqx_ft_schema {
} }
} }
s3_exporter_type {
desc {
en: "Exporter type for the exporter to S3"
zh: ""
}
label: {
en: "S3 Exporter Type"
zh: ""
}
}
local_storage_exporter_root { local_storage_exporter_root {
desc { desc {
en: "File system path to keep uploaded files." en: "File system path to keep uploaded files."

View File

@ -336,7 +336,7 @@ transfer(Msg, FileId) ->
{clientid_to_binary(ClientId), FileId}. {clientid_to_binary(ClientId), FileId}.
on_complete(Op, {ChanPid, PacketId}, Transfer, Result) -> on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
?SLOG(debug, #{ ?SLOG(warning, #{
msg => "on_complete", msg => "on_complete",
operation => Op, operation => Op,
packet_id => PacketId, packet_id => PacketId,

View File

@ -136,13 +136,12 @@ handle_event(
) -> ) ->
Filemeta = emqx_ft_assembly:filemeta(Asm), Filemeta = emqx_ft_assembly:filemeta(Asm),
Coverage = emqx_ft_assembly:coverage(Asm), Coverage = emqx_ft_assembly:coverage(Asm),
% TODO: better error handling case emqx_ft_storage_exporter:start_export(Storage, Transfer, Filemeta) of
{ok, Export} = emqx_ft_storage_exporter:start_export( {ok, Export} ->
Storage, {next_state, {assemble, Coverage}, St#{export => Export}, ?internal([])};
Transfer, {error, _} = Error ->
Filemeta {stop, {shutdown, Error}}
), end;
{next_state, {assemble, Coverage}, St#{export => Export}, ?internal([])};
handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := Export}) -> handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := Export}) ->
% TODO % TODO
% Currently, race is possible between getting segment info from the remote node and % Currently, race is possible between getting segment info from the remote node and
@ -150,8 +149,12 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export :=
% TODO: pipelining % TODO: pipelining
% TODO: better error handling % TODO: better error handling
{ok, Content} = pread(Node, Segment, St), {ok, Content} = pread(Node, Segment, St),
{ok, NExport} = emqx_ft_storage_exporter:write(Export, Content), case emqx_ft_storage_exporter:write(Export, Content) of
{next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])}; {ok, NExport} ->
{next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])};
{error, _} = Error ->
{stop, {shutdown, Error}, maps:remove(export, St)}
end;
handle_event(internal, _, {assemble, []}, St = #{}) -> handle_event(internal, _, {assemble, []}, St = #{}) ->
{next_state, complete, St, ?internal([])}; {next_state, complete, St, ?internal([])};
handle_event(internal, _, complete, St = #{export := Export}) -> handle_event(internal, _, complete, St = #{export := Export}) ->

View File

@ -16,7 +16,7 @@
-module(emqx_ft_schema). -module(emqx_ft_schema).
-behaviour(hocon_schema). % -behaviour(hocon_schema).
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
@ -35,7 +35,7 @@
-reflect_type([json_value/0]). -reflect_type([json_value/0]).
%% -import(hoconsc, [ref/1, ref/2, mk/2]).
namespace() -> file_transfer. namespace() -> file_transfer.
@ -46,84 +46,130 @@ roots() -> [file_transfer].
fields(file_transfer) -> fields(file_transfer) ->
[ [
{storage, #{ {storage,
type => hoconsc:union([ mk(
hoconsc:ref(?MODULE, local_storage) hoconsc:union([
]), ref(local_storage)
desc => ?DESC("storage") ]),
}} #{
required => true,
desc => ?DESC("storage")
}
)}
]; ];
fields(local_storage) -> fields(local_storage) ->
[ [
{type, #{ {type,
type => local, mk(
default => local, local,
required => false, #{
desc => ?DESC("local_type") default => local,
}}, required => false,
{segments, #{ desc => ?DESC("local_type")
type => ?REF(local_storage_segments), }
desc => ?DESC("local_storage_segments"), )},
required => false {segments,
}}, mk(
{exporter, #{ ref(local_storage_segments),
type => hoconsc:union([ #{
?REF(local_storage_exporter) desc => ?DESC("local_storage_segments"),
]), required => false
desc => ?DESC("local_storage_exporter"), }
required => true )},
}} {exporter,
mk(
hoconsc:union([
ref(local_storage_exporter),
ref(s3_exporter)
]),
#{
desc => ?DESC("local_storage_exporter"),
required => true
}
)}
]; ];
fields(local_storage_segments) -> fields(local_storage_segments) ->
[ [
{root, #{ {root,
type => binary(), mk(
desc => ?DESC("local_storage_segments_root"), binary(),
required => false #{
}}, desc => ?DESC("local_storage_segments_root"),
{gc, #{ required => false
type => ?REF(local_storage_segments_gc), }
desc => ?DESC("local_storage_segments_gc"), )},
required => false {gc,
}} mk(
ref(local_storage_segments_gc), #{
desc => ?DESC("local_storage_segments_gc"),
required => false
}
)}
]; ];
fields(local_storage_exporter) -> fields(local_storage_exporter) ->
[ [
{type, #{ {type,
type => local, mk(
default => local, local,
required => false, #{
desc => ?DESC("local_storage_exporter_type") default => local,
}}, required => false,
{root, #{ desc => ?DESC("local_storage_exporter_type")
type => binary(), }
desc => ?DESC("local_storage_exporter_root"), )},
required => false {root,
}} mk(
binary(),
#{
desc => ?DESC("local_storage_exporter_root"),
required => false
}
)}
]; ];
fields(s3_exporter) ->
[
{type,
mk(
s3,
#{
default => s3,
required => false,
desc => ?DESC("s3_exporter_type")
}
)}
] ++
emqx_s3_schema:fields(s3);
fields(local_storage_segments_gc) -> fields(local_storage_segments_gc) ->
[ [
{interval, #{ {interval,
type => emqx_schema:duration_ms(), mk(
desc => ?DESC("storage_gc_interval"), emqx_schema:duration_ms(),
required => false, #{
default => "1h" desc => ?DESC("storage_gc_interval"),
}}, required => false,
{maximum_segments_ttl, #{ default => "1h"
type => emqx_schema:duration_s(), }
desc => ?DESC("storage_gc_max_segments_ttl"), )},
required => false, {maximum_segments_ttl,
default => "24h" mk(
}}, emqx_schema:duration_s(),
{minimum_segments_ttl, #{ #{
type => emqx_schema:duration_s(), desc => ?DESC("storage_gc_max_segments_ttl"),
% desc => ?DESC("storage_gc_min_segments_ttl"), required => false,
required => false, default => "24h"
default => "5m", }
% NOTE )},
% This setting does not seem to be useful to an end-user. {minimum_segments_ttl,
hidden => true mk(
}} emqx_schema:duration_s(),
#{
required => false,
default => "5m",
% NOTE
% This setting does not seem to be useful to an end-user.
hidden => true
}
)}
]. ].
desc(file_transfer) -> desc(file_transfer) ->
@ -133,7 +179,9 @@ desc(local_storage) ->
desc(local_storage_segments) -> desc(local_storage_segments) ->
"File transfer local segments storage settings"; "File transfer local segments storage settings";
desc(local_storage_exporter) -> desc(local_storage_exporter) ->
"Exporter settings for the File transfer local storage backend"; "Local Exporter settings for the File transfer local storage backend";
desc(s3_exporter) ->
"S3 Exporter settings for the File transfer local storage backend";
desc(local_storage_segments_gc) -> desc(local_storage_segments_gc) ->
"Garbage collection settings for the File transfer local segments storage". "Garbage collection settings for the File transfer local segments storage".

View File

@ -34,42 +34,47 @@
-export([exporter/1]). -export([exporter/1]).
-export_type([options/0]).
-export_type([export/0]). -export_type([export/0]).
-type storage() :: emxt_ft_storage_fs:storage(). -type storage() :: emxt_ft_storage_fs:storage().
-type transfer() :: emqx_ft:transfer(). -type transfer() :: emqx_ft:transfer().
-type filemeta() :: emqx_ft:filemeta(). -type filemeta() :: emqx_ft:filemeta().
-type options() :: map(). -type exporter_conf() :: map().
-type export() :: term(). -type export_st() :: term().
-opaque export() :: {module(), export_st()}.
-callback start_export(options(), transfer(), filemeta()) -> -callback start_export(exporter_conf(), transfer(), filemeta()) ->
{ok, export()} | {error, _Reason}. {ok, export_st()} | {error, _Reason}.
-callback write(ExportSt :: export(), iodata()) -> %% Exprter must discard the export itself in case of error
{ok, ExportSt :: export()} | {error, _Reason}. -callback write(ExportSt :: export_st(), iodata()) ->
{ok, ExportSt :: export_st()} | {error, _Reason}.
-callback complete(ExportSt :: export()) -> -callback complete(ExportSt :: export_st()) ->
ok | {error, _Reason}. ok | {error, _Reason}.
-callback discard(ExportSt :: export()) -> -callback discard(ExportSt :: export_st()) ->
ok | {error, _Reason}. ok | {error, _Reason}.
-callback list(options()) -> -callback list(storage()) ->
{ok, [emqx_ft_storage:file_info()]} | {error, _Reason}. {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
%% %%
-spec start_export(storage(), transfer(), filemeta()) ->
{ok, export()} | {error, _Reason}.
start_export(Storage, Transfer, Filemeta) -> start_export(Storage, Transfer, Filemeta) ->
{ExporterMod, Exporter} = exporter(Storage), {ExporterMod, ExporterConf} = exporter(Storage),
case ExporterMod:start_export(Exporter, Transfer, Filemeta) of case ExporterMod:start_export(ExporterConf, Transfer, Filemeta) of
{ok, ExportSt} -> {ok, ExportSt} ->
{ok, {ExporterMod, ExportSt}}; {ok, {ExporterMod, ExportSt}};
{error, _} = Error -> {error, _} = Error ->
Error Error
end. end.
-spec write(export(), iodata()) ->
{ok, export()} | {error, _Reason}.
write({ExporterMod, ExportSt}, Content) -> write({ExporterMod, ExportSt}, Content) ->
case ExporterMod:write(ExportSt, Content) of case ExporterMod:write(ExportSt, Content) of
{ok, ExportStNext} -> {ok, ExportStNext} ->
@ -78,23 +83,31 @@ write({ExporterMod, ExportSt}, Content) ->
Error Error
end. end.
-spec complete(export()) ->
ok | {error, _Reason}.
complete({ExporterMod, ExportSt}) -> complete({ExporterMod, ExportSt}) ->
ExporterMod:complete(ExportSt). ExporterMod:complete(ExportSt).
-spec discard(export()) ->
ok | {error, _Reason}.
discard({ExporterMod, ExportSt}) -> discard({ExporterMod, ExportSt}) ->
ExporterMod:discard(ExportSt). ExporterMod:discard(ExportSt).
%% -spec list(storage()) ->
{ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
list(Storage) -> list(Storage) ->
{ExporterMod, ExporterOpts} = exporter(Storage), {ExporterMod, ExporterOpts} = exporter(Storage),
ExporterMod:list(ExporterOpts). ExporterMod:list(ExporterOpts).
%%
-spec exporter(storage()) -> {module(), _ExporterOptions}. -spec exporter(storage()) -> {module(), _ExporterOptions}.
exporter(Storage) -> exporter(Storage) ->
case maps:get(exporter, Storage) of case maps:get(exporter, Storage) of
#{type := local} = Options -> #{type := local} = Options ->
{emqx_ft_storage_exporter_fs, Options} {emqx_ft_storage_exporter_fs, without_type(Options)};
#{type := s3} = Options ->
{emqx_ft_storage_exporter_s3, without_type(Options)}
end. end.
-spec without_type(exporter_conf()) -> exporter_conf().
without_type(#{type := _} = Options) ->
maps:without([type], Options).

View File

@ -20,21 +20,22 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% Exporter API %% Exporter API
-behaviour(emqx_ft_storage_exporter).
-export([start_export/3]). -export([start_export/3]).
-export([write/2]). -export([write/2]).
-export([complete/1]). -export([complete/1]).
-export([discard/1]). -export([discard/1]).
-export([list/1]).
%% Internal API for RPC
-export([list_local/1]). -export([list_local/1]).
-export([list_local/2]). -export([list_local/2]).
-export([start_reader/3]). -export([start_reader/3]).
-export([list/1]).
% TODO % TODO
% -export([list/2]). % -export([list/2]).
-export_type([export/0]).
-type options() :: _TODO. -type options() :: _TODO.
-type transfer() :: emqx_ft:transfer(). -type transfer() :: emqx_ft:transfer().
-type filemeta() :: emqx_ft:filemeta(). -type filemeta() :: emqx_ft:filemeta().
@ -49,7 +50,7 @@
-type file_error() :: emqx_ft_storage_fs:file_error(). -type file_error() :: emqx_ft_storage_fs:file_error().
-opaque export() :: #{ -type export() :: #{
path := file:name(), path := file:name(),
handle := io:device(), handle := io:device(),
result := file:name(), result := file:name(),
@ -116,6 +117,7 @@ write(Export = #{handle := Handle, hash := Ctx}, IoData) ->
ok -> ok ->
{ok, Export#{hash := update_checksum(Ctx, IoData)}}; {ok, Export#{hash := update_checksum(Ctx, IoData)}};
{error, _} = Error -> {error, _} = Error ->
_ = discard(Export),
Error Error
end. end.
@ -370,4 +372,4 @@ mk_transfer_hash(Transfer) ->
crypto:hash(?BUCKET_HASH, term_to_binary(Transfer)). crypto:hash(?BUCKET_HASH, term_to_binary(Transfer)).
get_storage_root(Options) -> get_storage_root(Options) ->
maps:get(root, Options, filename:join([emqx:data_dir(), "ft", "exports"])). maps:get(root, Options, filename:join([emqx:data_dir(), file_transfer, exports])).

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_exporter_s3).
-include_lib("emqx/include/logger.hrl").
%% Exporter API
-export([start_export/3]).
-export([write/2]).
-export([complete/1]).
-export([discard/1]).
-export([list/1]).
-type options() :: emqx_s3:profile_config().
-type transfer() :: emqx_ft:transfer().
-type filemeta() :: emqx_ft:filemeta().
-type exportinfo() :: #{
transfer := transfer(),
name := file:name(),
uri := uri_string:uri_string(),
timestamp := emqx_datetime:epoch_second(),
size := _Bytes :: non_neg_integer(),
meta => filemeta()
}.
-type export_st() :: #{
pid := pid(),
meta := filemeta()
}.
-spec start_export(options(), transfer(), filemeta()) ->
{ok, export_st()} | {error, term()}.
start_export(_Options, _Transfer, Filemeta = #{name := Filename}) ->
Pid = spawn(fun() -> Filename end),
#{meta => Filemeta, pid => Pid}.
-spec write(export_st(), iodata()) ->
{ok, export_st()} | {error, term()}.
write(ExportSt, _IoData) ->
{ok, ExportSt}.
-spec complete(export_st()) ->
ok | {error, term()}.
complete(_ExportSt) ->
ok.
-spec discard(export_st()) ->
ok.
discard(_ExportSt) ->
ok.
-spec list(options()) ->
{ok, [exportinfo()]} | {error, term()}.
list(_Options) ->
{ok, []}.

View File

@ -86,14 +86,18 @@
-define(MANIFEST, "MANIFEST.json"). -define(MANIFEST, "MANIFEST.json").
-define(SEGMENT, "SEG"). -define(SEGMENT, "SEG").
-type storage() :: #{ -type segments() :: #{
root => file:name(), root := file:name(),
exporter => exporter() gc := #{
interval := non_neg_integer(),
maximum_segments_ttl := non_neg_integer(),
minimum_segments_ttl := non_neg_integer()
}
}. }.
-type exporter() :: #{ -type storage() :: #{
type := local, segments := segments(),
root => file:name() exporter := emqx_ft_storage_exporter:exporter()
}. }.
-type file_error() :: -type file_error() ::

View File

@ -0,0 +1,76 @@
emqx_s3_schema {
access_key_id {
desc {
en: "The access key id of the S3 bucket."
}
label {
en: "Access Key ID"
}
}
secret_access_key {
desc {
en: "The secret access key of the S3 bucket."
}
label {
en: "Secret Access Key"
}
}
bucket {
desc {
en: "The name of the S3 bucket."
}
label {
en: "Bucket"
}
}
host {
desc {
en: "The host of the S3 endpoint."
}
label {
en: "S3 endpoint Host"
}
}
port {
desc {
en: "The port of the S3 endpoint."
}
label {
en: "S3 endpoint port"
}
}
min_part_size {
desc {
en: """The minimum part size for multipart uploads.
Uploaded data will be accumulated in memory until this size is reached."""
}
label {
en: "Minimum multipart upload part size"
}
}
max_part_size {
desc {
en: """The maximum part size for multipart uploads.
S3 uploader won't try to upload parts larger than this size."""
}
label {
en: "Maximum multipart upload part size"
}
}
acl {
desc {
en: "The ACL to use for the uploaded objects."
}
label {
en: "ACL"
}
}
transport_options {
desc {
en: "Options for the HTTP transport layer used by the S3 client."
}
label {
en: "Transport Options"
}
}
}

View File

@ -21,8 +21,32 @@
-type profile_id() :: term(). -type profile_id() :: term().
%% TODO: define fields -type acl() ::
-type profile_config() :: map(). private
| public_read
| public_read_write
| authenticated_read
| bucket_owner_read
| bucket_owner_full_control.
-type transport_options() :: #{
connect_timeout => pos_integer(),
enable_pipelining => pos_integer(),
headers => map(),
max_retries => pos_integer(),
pool_size => pos_integer(),
pool_type => atom(),
ssl => map()
}.
-type profile_config() :: #{
bucket := string(),
host := string(),
port := pos_integer(),
acl => acl(),
min_part_size => pos_integer(),
transport_options => transport_options()
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API

View File

@ -7,7 +7,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, ref/1]). -import(hoconsc, [mk/2, ref/2]).
-export([roots/0, fields/1, namespace/0, tags/0]). -export([roots/0, fields/1, namespace/0, tags/0]).
@ -101,7 +101,7 @@ fields(s3) ->
)}, )},
{transport_options, {transport_options,
mk( mk(
ref(transport_options), ref(?MODULE, transport_options),
#{ #{
desc => ?DESC("transport_options"), desc => ?DESC("transport_options"),
required => false required => false