Merge pull request #12934 from keynslug/feat/EMQX-12204/aggreg-s3-bridge

feat(s3-bridge): implement aggregated upload action
This commit is contained in:
Andrew Mayorov 2024-04-30 13:50:19 +02:00 committed by GitHub
commit 279169105b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 2921 additions and 458 deletions

View File

@ -118,7 +118,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_pulsar_action_info, emqx_bridge_pulsar_action_info,
emqx_bridge_greptimedb_action_info, emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info, emqx_bridge_tdengine_action_info,
emqx_bridge_s3_action_info emqx_bridge_s3_upload_action_info,
emqx_bridge_s3_aggreg_upload_action_info
]. ].
-else. -else.
hard_coded_action_info_modules_ee() -> hard_coded_action_info_modules_ee() ->

View File

@ -10,9 +10,15 @@
emqx_s3 emqx_s3
]}, ]},
{env, [ {env, [
{emqx_action_info_modules, [emqx_bridge_s3_action_info]}, {emqx_action_info_modules, [
{emqx_connector_info_modules, [emqx_bridge_s3_connector_info]} emqx_bridge_s3_upload_action_info,
emqx_bridge_s3_aggreg_upload_action_info
]},
{emqx_connector_info_modules, [
emqx_bridge_s3_connector_info
]}
]}, ]},
{mod, {emqx_bridge_s3_app, []}},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -19,7 +19,6 @@
]). ]).
-export([ -export([
bridge_v2_examples/1,
connector_examples/1 connector_examples/1
]). ]).
@ -39,58 +38,11 @@ fields(Field) when
Field == "post_connector" Field == "post_connector"
-> ->
emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config)); emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config));
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
fields(action) ->
{?ACTION,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
#{
desc => <<"S3 Action Config">>,
required => false
}
)};
fields("config_connector") -> fields("config_connector") ->
emqx_connector_schema:common_fields() ++ fields(s3_connector_config); emqx_connector_schema:common_fields() ++ fields(s3_connector_config);
fields(?ACTION) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
?R_REF(s3_upload_parameters),
#{
required => true,
desc => ?DESC(s3_upload)
}
),
#{
resource_opts_ref => ?R_REF(s3_action_resource_opts)
}
);
fields(s3_connector_config) -> fields(s3_connector_config) ->
emqx_s3_schema:fields(s3_client) ++ emqx_s3_schema:fields(s3_client) ++
emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts); emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts);
fields(s3_upload_parameters) ->
emqx_s3_schema:fields(s3_upload) ++
[
{content,
hoconsc:mk(
emqx_schema:template(),
#{
required => false,
default => <<"${.}">>,
desc => ?DESC(s3_object_content)
}
)}
];
fields(s3_action_resource_opts) ->
UnsupportedOpts = [batch_size, batch_time],
lists:filter(
fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
emqx_bridge_v2_schema:action_resource_opts_fields()
);
fields(s3_connector_resource_opts) -> fields(s3_connector_resource_opts) ->
CommonOpts = emqx_connector_schema:common_resource_opts_subfields(), CommonOpts = emqx_connector_schema:common_resource_opts_subfields(),
lists:filter( lists:filter(
@ -100,14 +52,6 @@ fields(s3_connector_resource_opts) ->
desc("config_connector") -> desc("config_connector") ->
?DESC(config_connector); ?DESC(config_connector);
desc(?ACTION) ->
?DESC(s3_upload);
desc(s3_upload) ->
?DESC(s3_upload);
desc(s3_upload_parameters) ->
?DESC(s3_upload_parameters);
desc(s3_action_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(s3_connector_resource_opts) -> desc(s3_connector_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts); ?DESC(emqx_resource_schema, resource_opts);
desc(_Name) -> desc(_Name) ->
@ -115,54 +59,6 @@ desc(_Name) ->
%% Examples %% Examples
bridge_v2_examples(Method) ->
[
#{
<<"s3">> => #{
summary => <<"S3 Simple Upload">>,
value => action_example(Method)
}
}
].
action_example(post) ->
maps:merge(
action_example(put),
#{
type => atom_to_binary(?ACTION),
name => <<"my_s3_action">>
}
);
action_example(get) ->
maps:merge(
action_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
action_example(put) ->
#{
enable => true,
connector => <<"my_s3_connector">>,
description => <<"My action">>,
parameters => #{
bucket => <<"${clientid}">>,
key => <<"${topic}">>,
content => <<"${payload}">>,
acl => <<"public_read">>
},
resource_opts => #{
query_mode => <<"sync">>,
inflight_window => 10
}
}.
connector_examples(Method) -> connector_examples(Method) ->
[ [
#{ #{

View File

@ -5,7 +5,12 @@
-ifndef(__EMQX_BRIDGE_S3_HRL__). -ifndef(__EMQX_BRIDGE_S3_HRL__).
-define(__EMQX_BRIDGE_S3_HRL__, true). -define(__EMQX_BRIDGE_S3_HRL__, true).
-define(ACTION, s3). %% Actions
-define(ACTION_UPLOAD, s3).
-define(BRIDGE_TYPE_UPLOAD, <<"s3">>).
-define(ACTION_AGGREGATED_UPLOAD, s3_aggregated_upload).
-define(BRIDGE_TYPE_AGGREGATED_UPLOAD, <<"s3_aggregated_upload">>).
-define(CONNECTOR, s3). -define(CONNECTOR, s3).
-endif. -endif.

View File

@ -0,0 +1,138 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% This module provides pretty stupid interface for writing and reading
%% Erlang terms to/from a file descriptor (i.e. IO device), with a goal
%% of being able to write and read terms in a streaming fashion, and to
%% survive partial corruption.
%%
%% Layout of the file is as follows:
%% ```
%% ETF(Header { Metadata })
%% ETF(Record1 ByteSize)
%% ETF(Record1)
%% ETF(Record2 ByteSize)
%% ETF(Record2)
%% ...
%% ```
%% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`).
-module(emqx_bridge_s3_aggreg_buffer).
-export([
new_writer/2,
write/2,
takeover/1
]).
-export([
new_reader/1,
read/1
]).
-export_type([writer/0, reader/0]).
-record(reader, {
fd :: file:io_device() | eof,
buffer :: binary(),
hasread = 0 :: non_neg_integer()
}).
-type writer() :: file:io_device().
-type reader() :: #reader{}.
%%
-define(VSN, 1).
-define(HEADER(MD), [?MODULE, ?VSN, MD]).
-define(READAHEAD_BYTES, 64 * 4096).
-define(SANE_TERM_SIZE, 256 * 1024 * 1024).
%%
-spec new_writer(file:io_device(), _Meta) -> writer().
new_writer(FD, Meta) ->
%% TODO: Validate header is not too big?
Header = term_to_iovec(?HEADER(Meta)),
case file:write(FD, Header) of
ok ->
FD;
{error, Reason} ->
error({buffer_write_failed, Reason})
end.
-spec write(_Term, writer()) -> ok | {error, file:posix()}.
write(Term, FD) ->
IOData = term_to_iovec(Term),
Marker = term_to_binary(iolist_size(IOData)),
file:write(FD, [Marker | IOData]).
%%
-spec new_reader(file:io_device()) -> {_Meta, reader()}.
new_reader(FD) ->
Reader0 = #reader{fd = FD, buffer = <<>>},
Reader1 = read_buffered(?READAHEAD_BYTES, Reader0),
case read_next_term(Reader1) of
{?HEADER(MD), Reader} ->
{MD, Reader};
{UnexpectedHeader, _Reader} ->
error({buffer_unexpected_header, UnexpectedHeader});
eof ->
error({buffer_incomplete, header})
end.
-spec read(reader()) -> {_Term, reader()} | eof.
read(Reader0) ->
case read_next_term(read_buffered(_LargeEnough = 16, Reader0)) of
{Size, Reader1} when is_integer(Size) andalso Size > 0 andalso Size < ?SANE_TERM_SIZE ->
case read_next_term(read_buffered(Size, Reader1)) of
{Term, Reader} ->
{Term, Reader};
eof ->
error({buffer_incomplete, Size})
end;
{UnexpectedSize, _Reader} ->
error({buffer_unexpected_record_size, UnexpectedSize});
eof ->
eof
end.
-spec takeover(reader()) -> writer().
takeover(#reader{fd = FD, hasread = HasRead}) ->
case file:position(FD, HasRead) of
{ok, HasRead} ->
case file:truncate(FD) of
ok ->
FD;
{error, Reason} ->
error({buffer_takeover_failed, Reason})
end;
{error, Reason} ->
error({buffer_takeover_failed, Reason})
end.
read_next_term(#reader{fd = eof, buffer = <<>>}) ->
eof;
read_next_term(Reader = #reader{buffer = Buffer, hasread = HasRead}) ->
{Term, UsedBytes} = erlang:binary_to_term(Buffer, [safe, used]),
BufferSize = byte_size(Buffer),
BufferLeft = binary:part(Buffer, UsedBytes, BufferSize - UsedBytes),
{Term, Reader#reader{buffer = BufferLeft, hasread = HasRead + UsedBytes}}.
read_buffered(_Size, Reader = #reader{fd = eof}) ->
Reader;
read_buffered(Size, Reader = #reader{fd = FD, buffer = Buffer0}) ->
BufferSize = byte_size(Buffer0),
ReadSize = erlang:max(Size, ?READAHEAD_BYTES),
case BufferSize < Size andalso file:read(FD, ReadSize) of
false ->
Reader;
{ok, Data} ->
Reader#reader{buffer = <<Buffer0/binary, Data/binary>>};
eof ->
Reader#reader{fd = eof};
{error, Reason} ->
error({buffer_read_failed, Reason})
end.

View File

@ -0,0 +1,105 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% CSV container implementation for `emqx_bridge_s3_aggregator`.
-module(emqx_bridge_s3_aggreg_csv).
%% Container API
-export([
new/1,
fill/2,
close/1
]).
-export_type([container/0]).
-record(csv, {
columns :: [binary()] | undefined,
column_order :: [binary()],
%% A string or character that separates each field in a record from the next.
%% Default: ","
field_separator :: char() | iodata(),
%% A string or character that delimits boundaries of a record.
%% Default: "\n"
record_delimiter :: char() | iodata(),
quoting_mp :: _ReMP
}).
-type container() :: #csv{}.
-type options() :: #{
%% Which columns have to be ordered first in the resulting CSV?
column_order => [column()]
}.
-type record() :: emqx_bridge_s3_aggregator:record().
-type column() :: binary().
%%
-spec new(options()) -> container().
new(Opts) ->
{ok, MP} = re:compile("[\\[\\],\\r\\n\"]", [unicode]),
#csv{
column_order = maps:get(column_order, Opts, []),
field_separator = $,,
record_delimiter = $\n,
quoting_mp = MP
}.
-spec fill([record()], container()) -> {iodata(), container()}.
fill(Records = [Record | _], CSV0 = #csv{columns = undefined}) ->
Columns = mk_columns(Record, CSV0),
Header = emit_header(Columns, CSV0),
{Writes, CSV} = fill(Records, CSV0#csv{columns = Columns}),
{[Header | Writes], CSV};
fill(Records, CSV) ->
Writes = [emit_row(R, CSV) || R <- Records],
{Writes, CSV}.
-spec close(container()) -> iodata().
close(#csv{}) ->
[].
%%
mk_columns(Record, #csv{column_order = ColumnOrder}) ->
Columns = [emqx_utils_conv:bin(C) || C <- lists:sort(maps:keys(Record))],
Unoredered = Columns -- ColumnOrder,
ColumnOrder ++ Unoredered.
-spec emit_header([column()], container()) -> iodata().
emit_header([C], #csv{record_delimiter = Delim}) ->
[C, Delim];
emit_header([C | Rest], CSV = #csv{field_separator = Sep}) ->
[C, Sep | emit_header(Rest, CSV)];
emit_header([], #csv{record_delimiter = Delim}) ->
[Delim].
-spec emit_row(record(), container()) -> iodata().
emit_row(Record, CSV = #csv{columns = Columns}) ->
emit_row(Record, Columns, CSV).
emit_row(Record, [C], CSV = #csv{record_delimiter = Delim}) ->
[emit_cell(C, Record, CSV), Delim];
emit_row(Record, [C | Rest], CSV = #csv{field_separator = Sep}) ->
[emit_cell(C, Record, CSV), Sep | emit_row(Record, Rest, CSV)];
emit_row(#{}, [], #csv{record_delimiter = Delim}) ->
[Delim].
emit_cell(Column, Record, CSV) ->
case emqx_template:lookup(Column, Record) of
{ok, Value} ->
encode_cell(emqx_template:to_string(Value), CSV);
{error, undefined} ->
_Empty = ""
end.
encode_cell(V, #csv{quoting_mp = MP}) ->
case re:run(V, MP, []) of
nomatch ->
V;
_ ->
[$", re:replace(V, <<"\"">>, <<"\"\"">>, [global, unicode]), $"]
end.

View File

@ -0,0 +1,212 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% This module takes aggregated records from a buffer and delivers them to S3,
%% wrapped in a configurable container (though currently there's only CSV).
-module(emqx_bridge_s3_aggreg_delivery).
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_bridge_s3_aggregator.hrl").
-export([start_link/3]).
%% Internal exports
-export([
init/4,
loop/3
]).
-behaviour(emqx_template).
-export([lookup/2]).
%% Sys
-export([
system_continue/3,
system_terminate/4,
format_status/2
]).
-record(delivery, {
name :: _Name,
container :: emqx_bridge_s3_aggreg_csv:container(),
reader :: emqx_bridge_s3_aggreg_buffer:reader(),
upload :: emqx_s3_upload:t(),
empty :: boolean()
}).
-type state() :: #delivery{}.
%%
start_link(Name, Buffer, Opts) ->
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
%%
-spec init(pid(), _Name, buffer(), _Opts :: map()) -> no_return().
init(Parent, Name, Buffer, Opts) ->
?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
Reader = open_buffer(Buffer),
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
_ = erlang:process_flag(trap_exit, true),
ok = proc_lib:init_ack({ok, self()}),
loop(Delivery, Parent, []).
init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) ->
#delivery{
name = Name,
container = mk_container(ContainerOpts),
reader = Reader,
upload = mk_upload(Buffer, Opts),
empty = true
}.
open_buffer(#buffer{filename = Filename}) ->
case file:open(Filename, [read, binary, raw]) of
{ok, FD} ->
{_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD),
Reader;
{error, Reason} ->
error({buffer_open_failed, Reason})
end.
mk_container(#{type := csv, column_order := OrderOpt}) ->
%% TODO: Deduplicate?
ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt),
emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}).
mk_upload(
Buffer,
Opts = #{
bucket := Bucket,
upload_options := UploadOpts,
client_config := Config,
uploader_config := UploaderConfig
}
) ->
Client = emqx_s3_client:create(Bucket, Config),
Key = mk_object_key(Buffer, Opts),
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
mk_object_key(Buffer, #{action := Name, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}).
%%
-spec loop(state(), pid(), [sys:debug_option()]) -> no_return().
loop(Delivery, Parent, Debug) ->
%% NOTE: This function is mocked in tests.
receive
Msg -> handle_msg(Msg, Delivery, Parent, Debug)
after 0 ->
process_delivery(Delivery, Parent, Debug)
end.
process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) ->
case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
{Records = [#{} | _], Reader} ->
Delivery1 = Delivery0#delivery{reader = Reader},
Delivery2 = process_append_records(Records, Delivery1),
Delivery = process_write(Delivery2),
loop(Delivery, Parent, Debug);
{[], Reader} ->
Delivery = Delivery0#delivery{reader = Reader},
loop(Delivery, Parent, Debug);
eof ->
process_complete(Delivery0);
{Unexpected, _Reader} ->
exit({buffer_unexpected_record, Unexpected})
end.
process_append_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) ->
{Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0),
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
Delivery#delivery{
container = Container,
upload = Upload,
empty = false
}.
process_write(Delivery = #delivery{upload = Upload0}) ->
case emqx_s3_upload:write(Upload0) of
{ok, Upload} ->
Delivery#delivery{upload = Upload};
{cont, Upload} ->
process_write(Delivery#delivery{upload = Upload});
{error, Reason} ->
_ = emqx_s3_upload:abort(Upload0),
exit({upload_failed, Reason})
end.
process_complete(#delivery{name = Name, empty = true}) ->
?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}),
exit({shutdown, {skipped, empty}});
process_complete(#delivery{name = Name, container = Container, upload = Upload0}) ->
Trailer = emqx_bridge_s3_aggreg_csv:close(Container),
{ok, Upload} = emqx_s3_upload:append(Trailer, Upload0),
case emqx_s3_upload:complete(Upload) of
{ok, Completed} ->
?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}),
ok;
{error, Reason} ->
_ = emqx_s3_upload:abort(Upload),
exit({upload_failed, Reason})
end.
%%
handle_msg({system, From, Msg}, Delivery, Parent, Debug) ->
sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery);
handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) ->
system_terminate(Reason, Parent, Debug, Delivery);
handle_msg(_Msg, Delivery, Parent, Debug) ->
loop(Parent, Debug, Delivery).
-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return().
system_continue(Parent, Debug, Delivery) ->
loop(Delivery, Parent, Debug).
-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _.
system_terminate(_Reason, _Parent, _Debug, #delivery{upload = Upload}) ->
emqx_s3_upload:abort(Upload).
-spec format_status(normal, Args :: [term()]) -> _StateFormatted.
format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) ->
Delivery#delivery{
upload = emqx_s3_upload:format(Delivery#delivery.upload)
}.
%%
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {Name, _Buffer}) ->
{ok, mk_fs_safe_string(Name)};
lookup(Accessor, {_Name, Buffer = #buffer{}}) ->
lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) ->
{ok, format_timestamp(Since, Format)};
lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) ->
{ok, format_timestamp(Until, Format)};
lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) ->
{ok, Seq};
lookup_buffer_var([<<"node">>], #buffer{}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup_buffer_var(_Binding, _Context) ->
{error, undefined}.
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"rfc3339">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).

View File

@ -0,0 +1,275 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_upload).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_s3.hrl").
-define(ACTION, ?ACTION_AGGREGATED_UPLOAD).
-define(DEFAULT_BATCH_SIZE, 100).
-define(DEFAULT_BATCH_TIME, <<"10ms">>).
-behaviour(hocon_schema).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% Interpreting options
-export([
mk_key_template/1,
mk_upload_options/1
]).
%% emqx_bridge_v2_schema API
-export([bridge_v2_examples/1]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"bridge_s3".
roots() ->
[].
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
fields(action) ->
{?ACTION,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
#{
desc => <<"S3 Aggregated Upload Action Config">>,
required => false
}
)};
fields(?ACTION) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
?R_REF(s3_aggregated_upload_parameters),
#{
required => true,
desc => ?DESC(s3_aggregated_upload_parameters)
}
),
#{
resource_opts_ref => ?R_REF(s3_aggreg_upload_resource_opts)
}
);
fields(s3_aggregated_upload_parameters) ->
lists:append([
[
{container,
hoconsc:mk(
%% TODO: Support selectors once there are more than one container.
hoconsc:union(fun
(all_union_members) -> [?REF(s3_aggregated_container_csv)];
({value, _Valur}) -> [?REF(s3_aggregated_container_csv)]
end),
#{
required => true,
default => #{<<"type">> => <<"csv">>},
desc => ?DESC(s3_aggregated_container)
}
)},
{aggregation,
hoconsc:mk(
?REF(s3_aggregation),
#{
required => true,
desc => ?DESC(s3_aggregation)
}
)}
],
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
]),
emqx_s3_schema:fields(s3_uploader)
]);
fields(s3_aggregated_container_csv) ->
[
{type,
hoconsc:mk(
csv,
#{
required => true,
desc => ?DESC(s3_aggregated_container_csv)
}
)},
{column_order,
hoconsc:mk(
hoconsc:array(string()),
#{
required => false,
default => [],
desc => ?DESC(s3_aggregated_container_csv_column_order)
}
)}
];
fields(s3_aggregation) ->
[
%% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
{time_interval,
hoconsc:mk(
emqx_schema:duration_s(),
#{
required => false,
default => <<"1h">>,
desc => ?DESC(s3_aggregation_interval)
}
)},
{max_records,
hoconsc:mk(
pos_integer(),
#{
required => false,
default => <<"1000000">>,
desc => ?DESC(s3_aggregation_max_records)
}
)}
];
fields(s3_aggreg_upload_resource_opts) ->
%% NOTE: This action should benefit from generous batching defaults.
emqx_bridge_v2_schema:action_resource_opts_fields([
{batch_size, #{default => ?DEFAULT_BATCH_SIZE}},
{batch_time, #{default => ?DEFAULT_BATCH_TIME}}
]).
desc(Name) when
Name == s3_aggregated_upload;
Name == s3_aggregated_upload_parameters;
Name == s3_aggregation;
Name == s3_aggregated_container_csv
->
?DESC(Name);
desc(s3_aggreg_upload_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
%% Interpreting options
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
mk_key_template(#{key := Key}) ->
Template = emqx_template:parse(Key),
{_, BindingErrors} = emqx_template:render(Template, #{}),
{UsedBindings, _} = lists:unzip(BindingErrors),
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
Template;
false ->
Template ++ SuffixTemplate
end.
mk_suffix_template(UsedBindings) ->
RequiredBindings = ["action", "node", "datetime.", "sequence"],
SuffixBindings = [
mk_default_binding(RB)
|| RB <- RequiredBindings,
lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
],
SuffixTemplate = [["/", B] || B <- SuffixBindings],
emqx_template:parse(SuffixTemplate).
mk_default_binding("datetime.") ->
"${datetime.rfc3339utc}";
mk_default_binding(Binding) ->
"${" ++ Binding ++ "}".
-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options().
mk_upload_options(Parameters) ->
Headers = mk_upload_headers(Parameters),
#{
headers => Headers,
acl => maps:get(acl, Parameters, undefined)
}.
mk_upload_headers(Parameters = #{container := Container}) ->
Headers = normalize_headers(maps:get(headers, Parameters, #{})),
ContainerHeaders = mk_container_headers(Container),
maps:merge(ContainerHeaders, Headers).
normalize_headers(Headers) ->
maps:fold(
fun(Header, Value, Acc) ->
maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc)
end,
#{},
Headers
).
mk_container_headers(#{type := csv}) ->
#{"content-type" => "text/csv"};
mk_container_headers(#{}) ->
#{}.
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"s3_aggregated_upload">> => #{
summary => <<"S3 Aggregated Upload">>,
value => s3_action_example(Method)
}
}
].
s3_action_example(post) ->
maps:merge(
s3_action_example(put),
#{
type => atom_to_binary(?ACTION_UPLOAD),
name => <<"my_s3_action">>
}
);
s3_action_example(get) ->
maps:merge(
s3_action_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
s3_action_example(put) ->
#{
enable => true,
connector => <<"my_s3_connector">>,
description => <<"My action">>,
parameters => #{
bucket => <<"mqtt-aggregated">>,
key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>,
acl => <<"public_read">>,
aggregation => #{
time_interval => <<"15m">>,
max_records => 100_000
},
<<"container">> => #{
type => <<"csv">>,
column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>]
}
},
resource_opts => #{
health_check_interval => <<"10s">>,
query_mode => <<"async">>,
inflight_window => 100
}
}.

View File

@ -0,0 +1,21 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_upload_action_info).
-behaviour(emqx_action_info).
-include("emqx_bridge_s3.hrl").
-export([
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
action_type_name() -> ?ACTION_AGGREGATED_UPLOAD.
connector_type_name() -> s3.
schema_module() -> emqx_bridge_s3_aggreg_upload.

View File

@ -0,0 +1,72 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_upload_sup).
-export([
start_link/3,
start_link_delivery_sup/2
]).
-export([
start_delivery/2,
start_delivery_proc/3
]).
-behaviour(supervisor).
-export([init/1]).
-define(SUPREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}).
%%
start_link(Name, AggregOpts, DeliveryOpts) ->
supervisor:start_link(?MODULE, {root, Name, AggregOpts, DeliveryOpts}).
start_link_delivery_sup(Name, DeliveryOpts) ->
supervisor:start_link(?SUPREF(Name), ?MODULE, {delivery, Name, DeliveryOpts}).
%%
start_delivery(Name, Buffer) ->
supervisor:start_child(?SUPREF(Name), [Buffer]).
start_delivery_proc(Name, DeliveryOpts, Buffer) ->
emqx_bridge_s3_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts).
%%
init({root, Name, AggregOpts, DeliveryOpts}) ->
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 5
},
AggregatorChildSpec = #{
id => aggregator,
start => {emqx_bridge_s3_aggregator, start_link, [Name, AggregOpts]},
type => worker,
restart => permanent
},
DeliverySupChildSpec = #{
id => delivery_sup,
start => {?MODULE, start_link_delivery_sup, [Name, DeliveryOpts]},
type => supervisor,
restart => permanent
},
{ok, {SupFlags, [DeliverySupChildSpec, AggregatorChildSpec]}};
init({delivery, Name, DeliveryOpts}) ->
SupFlags = #{
strategy => simple_one_for_one,
intensity => 100,
period => 5
},
ChildSpec = #{
id => delivery,
start => {?MODULE, start_delivery_proc, [Name, DeliveryOpts]},
type => worker,
restart => temporary,
shutdown => 1000
},
{ok, {SupFlags, [ChildSpec]}}.

View File

@ -0,0 +1,486 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% This module manages buffers for aggregating records and offloads them
%% to separate "delivery" processes when they are full or time interval
%% is over.
-module(emqx_bridge_s3_aggregator).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_bridge_s3_aggregator.hrl").
-export([
start_link/2,
push_records/3,
tick/2,
take_error/1
]).
-behaviour(gen_server).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
-export_type([
record/0,
timestamp/0
]).
%% Record.
-type record() :: #{binary() => _}.
%% Unix timestamp, seconds since epoch.
-type timestamp() :: _Seconds :: non_neg_integer().
%%
-define(VSN, 1).
-define(SRVREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}).
%%
start_link(Name, Opts) ->
gen_server:start_link(?SRVREF(Name), ?MODULE, mk_state(Name, Opts), []).
push_records(Name, Timestamp, Records = [_ | _]) ->
%% FIXME: Error feedback.
case pick_buffer(Name, Timestamp) of
undefined ->
BufferNext = next_buffer(Name, Timestamp),
write_records_limited(Name, BufferNext, Records);
Buffer ->
write_records_limited(Name, Buffer, Records)
end;
push_records(_Name, _Timestamp, []) ->
ok.
tick(Name, Timestamp) ->
case pick_buffer(Name, Timestamp) of
#buffer{} ->
ok;
_Outdated ->
send_close_buffer(Name, Timestamp)
end.
take_error(Name) ->
gen_server:call(?SRVREF(Name), take_error).
%%
write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->
write_records(Name, Buffer, Records);
write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records) ->
NR = length(Records),
case inc_num_records(Buffer, NR) of
NR ->
%% NOTE: Allow unconditionally if it's the first write.
write_records(Name, Buffer, Records);
NWritten when NWritten > MaxRecords ->
NextBuffer = rotate_buffer(Name, Buffer),
write_records_limited(Name, NextBuffer, Records);
_ ->
write_records(Name, Buffer, Records)
end.
write_records(Name, Buffer = #buffer{fd = Writer}, Records) ->
case emqx_bridge_s3_aggreg_buffer:write(Records, Writer) of
ok ->
?tp(s3_aggreg_records_written, #{action => Name, records => Records}),
ok;
{error, terminated} ->
BufferNext = rotate_buffer(Name, Buffer),
write_records(Name, BufferNext, Records);
{error, _} = Error ->
Error
end.
inc_num_records(#buffer{cnt_records = Counter}, Size) ->
inc_counter(Counter, Size).
next_buffer(Name, Timestamp) ->
gen_server:call(?SRVREF(Name), {next_buffer, Timestamp}).
rotate_buffer(Name, #buffer{fd = FD}) ->
gen_server:call(?SRVREF(Name), {rotate_buffer, FD}).
send_close_buffer(Name, Timestamp) ->
gen_server:cast(?SRVREF(Name), {close_buffer, Timestamp}).
%%
-record(st, {
name :: _Name,
tab :: ets:tid() | undefined,
buffer :: buffer() | undefined,
queued :: buffer() | undefined,
deliveries = #{} :: #{reference() => buffer()},
errors = queue:new() :: queue:queue(_Error),
interval :: emqx_schema:duration_s(),
max_records :: pos_integer(),
work_dir :: file:filename()
}).
-type state() :: #st{}.
mk_state(Name, Opts) ->
Interval = maps:get(time_interval, Opts),
MaxRecords = maps:get(max_records, Opts),
WorkDir = maps:get(work_dir, Opts),
ok = ensure_workdir(WorkDir),
#st{
name = Name,
interval = Interval,
max_records = MaxRecords,
work_dir = WorkDir
}.
ensure_workdir(WorkDir) ->
%% NOTE
%% Writing MANIFEST as a means to ensure the work directory is writable. It's not
%% (yet) read back because there's only one version of the implementation.
ok = filelib:ensure_path(WorkDir),
ok = write_manifest(WorkDir).
write_manifest(WorkDir) ->
Manifest = #{<<"version">> => ?VSN},
file:write_file(filename:join(WorkDir, "MANIFEST"), hocon_pp:do(Manifest, #{})).
%%
-spec init(state()) -> {ok, state()}.
init(St0 = #st{name = Name}) ->
_ = erlang:process_flag(trap_exit, true),
St1 = St0#st{tab = create_tab(Name)},
St = recover(St1),
_ = announce_current_buffer(St),
{ok, St}.
handle_call({next_buffer, Timestamp}, _From, St0) ->
St = #st{buffer = Buffer} = handle_next_buffer(Timestamp, St0),
{reply, Buffer, St, 0};
handle_call({rotate_buffer, FD}, _From, St0) ->
St = #st{buffer = Buffer} = handle_rotate_buffer(FD, St0),
{reply, Buffer, St, 0};
handle_call(take_error, _From, St0) ->
{MaybeError, St} = handle_take_error(St0),
{reply, MaybeError, St}.
handle_cast({close_buffer, Timestamp}, St) ->
{noreply, handle_close_buffer(Timestamp, St)};
handle_cast(_Cast, St) ->
{noreply, St}.
handle_info(timeout, St) ->
{noreply, handle_queued_buffer(St)};
handle_info({'DOWN', MRef, _, Pid, Reason}, St0 = #st{name = Name, deliveries = Ds0}) ->
case maps:take(MRef, Ds0) of
{Buffer, Ds} ->
St = St0#st{deliveries = Ds},
{noreply, handle_delivery_exit(Buffer, Reason, St)};
error ->
?SLOG(notice, #{
msg => "unexpected_down_signal",
action => Name,
pid => Pid,
reason => Reason
}),
{noreply, St0}
end;
handle_info(_Msg, St) ->
{noreply, St}.
terminate(_Reason, #st{name = Name}) ->
cleanup_tab(Name).
%%
handle_next_buffer(Timestamp, St = #st{buffer = #buffer{until = Until}}) when Timestamp < Until ->
St;
handle_next_buffer(Timestamp, St0 = #st{buffer = Buffer = #buffer{since = PrevSince}}) ->
BufferClosed = close_buffer(Buffer),
St = enqueue_closed_buffer(BufferClosed, St0),
handle_next_buffer(Timestamp, PrevSince, St);
handle_next_buffer(Timestamp, St = #st{buffer = undefined}) ->
handle_next_buffer(Timestamp, Timestamp, St).
handle_next_buffer(Timestamp, PrevSince, St0) ->
NextBuffer = allocate_next_buffer(Timestamp, PrevSince, St0),
St = St0#st{buffer = NextBuffer},
_ = announce_current_buffer(St),
St.
handle_rotate_buffer(
FD,
St0 = #st{buffer = Buffer = #buffer{since = Since, seq = Seq, fd = FD}}
) ->
BufferClosed = close_buffer(Buffer),
NextBuffer = allocate_buffer(Since, Seq + 1, St0),
St = enqueue_closed_buffer(BufferClosed, St0#st{buffer = NextBuffer}),
_ = announce_current_buffer(St),
St;
handle_rotate_buffer(_ClosedFD, St) ->
St.
enqueue_closed_buffer(Buffer, St = #st{queued = undefined}) ->
St#st{queued = Buffer};
enqueue_closed_buffer(Buffer, St0) ->
%% NOTE: Should never really happen unless interval / max records are too tight.
St = handle_queued_buffer(St0),
St#st{queued = Buffer}.
handle_queued_buffer(St = #st{queued = undefined}) ->
St;
handle_queued_buffer(St = #st{queued = Buffer}) ->
enqueue_delivery(Buffer, St#st{queued = undefined}).
allocate_next_buffer(Timestamp, PrevSince, St = #st{interval = Interval}) ->
Since = compute_since(Timestamp, PrevSince, Interval),
allocate_buffer(Since, 0, St).
compute_since(Timestamp, PrevSince, Interval) ->
Timestamp - (Timestamp - PrevSince) rem Interval.
allocate_buffer(Since, Seq, St = #st{name = Name}) ->
Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St),
{ok, FD} = file:open(Filename, [write, binary]),
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(FD, _Meta = []),
_ = add_counter(Counter),
?tp(s3_aggreg_buffer_allocated, #{action => Name, filename => Filename}),
Buffer#buffer{fd = Writer}.
recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
{ok, FD} = file:open(Filename, [read, write, binary]),
case recover_buffer_writer(FD, Filename) of
{ok, Writer, NWritten} ->
_ = add_counter(Counter, NWritten),
Buffer#buffer{fd = Writer};
{error, Reason} ->
?SLOG(warning, #{
msg => "existing_buffer_recovery_failed",
filename => Filename,
reason => Reason,
details => "Buffer is corrupted beyond repair, will be discarded."
}),
_ = file:close(FD),
_ = file:delete(Filename),
undefined
end.
recover_buffer_writer(FD, Filename) ->
try emqx_bridge_s3_aggreg_buffer:new_reader(FD) of
{_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0)
catch
error:Reason ->
{error, Reason}
end.
recover_buffer_writer(FD, Filename, Reader0, NWritten) ->
try emqx_bridge_s3_aggreg_buffer:read(Reader0) of
{Records, Reader} when is_list(Records) ->
recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records));
{Unexpected, _Reader} ->
%% Buffer is corrupted, should be discarded.
{error, {buffer_unexpected_record, Unexpected}};
eof ->
%% Buffer is fine, continue writing at the end.
{ok, FD, NWritten}
catch
error:Reason ->
%% Buffer is truncated or corrupted somewhere in the middle.
%% Continue writing after the last valid record.
?SLOG(warning, #{
msg => "existing_buffer_recovered_partially",
filename => Filename,
reason => Reason,
details =>
"Buffer is truncated or corrupted somewhere in the middle. "
"Corrupted records will be discarded."
}),
Writer = emqx_bridge_s3_aggreg_buffer:takeover(Reader0),
{ok, Writer, NWritten}
end.
mk_buffer(
Since,
Seq,
#st{tab = Tab, interval = Interval, max_records = MaxRecords, work_dir = WorkDir}
) ->
Name = mk_filename(Since, Seq),
Counter = {Tab, {Since, Seq}},
#buffer{
since = Since,
until = Since + Interval,
seq = Seq,
filename = filename:join(WorkDir, Name),
max_records = MaxRecords,
cnt_records = Counter
}.
handle_close_buffer(
Timestamp,
St0 = #st{buffer = Buffer = #buffer{until = Until}}
) when Timestamp >= Until ->
St = St0#st{buffer = undefined},
_ = announce_current_buffer(St),
enqueue_delivery(close_buffer(Buffer), St);
handle_close_buffer(_Timestamp, St = #st{buffer = undefined}) ->
St.
close_buffer(Buffer = #buffer{fd = FD}) ->
ok = file:close(FD),
Buffer#buffer{fd = undefined}.
discard_buffer(#buffer{filename = Filename, cnt_records = Counter}) ->
%% NOTE: Hopefully, no process is touching this counter anymore.
_ = del_counter(Counter),
file:delete(Filename).
pick_buffer(Name, Timestamp) ->
case lookup_current_buffer(Name) of
#buffer{until = Until} = Buffer when Timestamp < Until ->
Buffer;
#buffer{since = Since} when Timestamp < Since ->
%% TODO: Support timestamps going back.
error({invalid_timestamp, Timestamp});
_Outdated ->
undefined
end.
announce_current_buffer(#st{tab = Tab, buffer = Buffer}) ->
ets:insert(Tab, {buffer, Buffer}).
lookup_current_buffer(Name) ->
ets:lookup_element(lookup_tab(Name), buffer, 2).
%%
enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
{ok, Pid} = emqx_bridge_s3_aggreg_upload_sup:start_delivery(Name, Buffer),
MRef = erlang:monitor(process, Pid),
St#st{deliveries = Ds#{MRef => Buffer}}.
handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when
Normal == normal; Normal == noproc
->
?SLOG(debug, #{
msg => "aggregated_buffer_delivery_completed",
action => Name,
buffer => Buffer#buffer.filename
}),
ok = discard_buffer(Buffer),
St;
handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name}) ->
?SLOG(info, #{
msg => "aggregated_buffer_delivery_skipped",
action => Name,
buffer => {Buffer#buffer.since, Buffer#buffer.seq},
reason => Reason
}),
ok = discard_buffer(Buffer),
St;
handle_delivery_exit(Buffer, Error, St = #st{name = Name}) ->
?SLOG(error, #{
msg => "aggregated_buffer_delivery_failed",
action => Name,
buffer => {Buffer#buffer.since, Buffer#buffer.seq},
filename => Buffer#buffer.filename,
reason => Error
}),
%% TODO: Retries?
enqueue_status_error(Error, St).
enqueue_status_error({upload_failed, Error}, St = #st{errors = QErrors}) ->
%% TODO
%% This code feels too specific, errors probably need classification.
St#st{errors = queue:in(Error, QErrors)};
enqueue_status_error(_AnotherError, St) ->
St.
handle_take_error(St = #st{errors = QErrors0}) ->
case queue:out(QErrors0) of
{{value, Error}, QErrors} ->
{[Error], St#st{errors = QErrors}};
{empty, QErrors} ->
{[], St#st{errors = QErrors}}
end.
%%
recover(St0 = #st{work_dir = WorkDir}) ->
{ok, Filenames} = file:list_dir(WorkDir),
ExistingBuffers = lists:flatmap(fun(FN) -> read_existing_file(FN, St0) end, Filenames),
case lists:reverse(lists:keysort(#buffer.since, ExistingBuffers)) of
[Buffer | ClosedBuffers] ->
St = lists:foldl(fun enqueue_delivery/2, St0, ClosedBuffers),
St#st{buffer = recover_buffer(Buffer)};
[] ->
St0
end.
read_existing_file("MANIFEST", _St) ->
[];
read_existing_file(Name, St) ->
case parse_filename(Name) of
{Since, Seq} ->
[read_existing_buffer(Since, Seq, Name, St)];
error ->
%% TODO: log?
[]
end.
read_existing_buffer(Since, Seq, Name, St = #st{work_dir = WorkDir}) ->
Filename = filename:join(WorkDir, Name),
Buffer = mk_buffer(Since, Seq, St),
Buffer#buffer{filename = Filename}.
%%
mk_filename(Since, Seq) ->
"T" ++ integer_to_list(Since) ++ "_" ++ pad_number(Seq, 4).
parse_filename(Filename) ->
case re:run(Filename, "^T(\\d+)_(\\d+)$", [{capture, all_but_first, list}]) of
{match, [Since, Seq]} ->
{list_to_integer(Since), list_to_integer(Seq)};
nomatch ->
error
end.
%%
add_counter({Tab, Counter}) ->
add_counter({Tab, Counter}, 0).
add_counter({Tab, Counter}, N) ->
ets:insert(Tab, {Counter, N}).
inc_counter({Tab, Counter}, Size) ->
ets:update_counter(Tab, Counter, {2, Size}).
del_counter({Tab, Counter}) ->
ets:delete(Tab, Counter).
%%
create_tab(Name) ->
Tab = ets:new(?MODULE, [public, set, {write_concurrency, auto}]),
ok = persistent_term:put({?MODULE, Name}, Tab),
Tab.
lookup_tab(Name) ->
persistent_term:get({?MODULE, Name}).
cleanup_tab(Name) ->
persistent_term:erase({?MODULE, Name}).
%%
pad_number(I, L) ->
string:pad(integer_to_list(I), L, leading, $0).

View File

@ -0,0 +1,15 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-record(buffer, {
since :: emqx_bridge_s3_aggregator:timestamp(),
until :: emqx_bridge_s3_aggregator:timestamp(),
seq :: non_neg_integer(),
filename :: file:filename(),
fd :: file:io_device() | undefined,
max_records :: pos_integer() | undefined,
cnt_records :: {ets:tab(), _Counter} | undefined
}).
-type buffer() :: #buffer{}.

View File

@ -0,0 +1,16 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_app).
-behaviour(application).
-export([start/2, stop/1]).
%%
start(_StartType, _StartArgs) ->
emqx_bridge_s3_sup:start_link().
stop(_State) ->
ok.

View File

@ -7,6 +7,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include("emqx_bridge_s3.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
-export([ -export([
@ -17,7 +18,7 @@
on_remove_channel/3, on_remove_channel/3,
on_get_channels/1, on_get_channels/1,
on_query/3, on_query/3,
% on_batch_query/3, on_batch_query/3,
on_get_status/2, on_get_status/2,
on_get_channel_status/3 on_get_channel_status/3
]). ]).
@ -31,12 +32,31 @@
}. }.
-type channel_config() :: #{ -type channel_config() :: #{
parameters := #{ bridge_type := binary(),
bucket := string(), parameters := s3_upload_parameters() | s3_aggregated_upload_parameters()
key := string(), }.
content := string(),
acl => emqx_s3:acl() -type s3_upload_parameters() :: #{
} bucket := string(),
key := string(),
content := string(),
acl => emqx_s3:acl()
}.
-type s3_aggregated_upload_parameters() :: #{
bucket := string(),
key := string(),
acl => emqx_s3:acl(),
aggregation => #{
time_interval := emqx_schema:duration_s(),
max_records := pos_integer()
},
container := #{
type := csv,
column_order => [string()]
},
min_part_size := emqx_schema:bytesize(),
max_part_size := emqx_schema:bytesize()
}. }.
-type channel_state() :: #{ -type channel_state() :: #{
@ -123,12 +143,13 @@ on_get_status(_InstId, State = #{client_config := Config}) ->
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
{ok, state()} | {error, _Reason}. {ok, state()} | {error, _Reason}.
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
ChannelState = init_channel_state(Config), ChannelState = start_channel(State, Config),
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}. {ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
{ok, state()}. {ok, state()}.
on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) -> on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) ->
ok = stop_channel(maps:get(ChannelId, Channels, undefined)),
{ok, State#{channels => maps:remove(ChannelId, Channels)}}. {ok, State#{channels => maps:remove(ChannelId, Channels)}}.
-spec on_get_channels(_InstanceId :: resource_id()) -> -spec on_get_channels(_InstanceId :: resource_id()) ->
@ -138,27 +159,121 @@ on_get_channels(InstId) ->
-spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) -> -spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) ->
channel_status(). channel_status().
on_get_channel_status(_InstId, ChannelId, #{channels := Channels}) -> on_get_channel_status(_InstId, ChannelId, State = #{channels := Channels}) ->
case maps:get(ChannelId, Channels, undefined) of case maps:get(ChannelId, Channels, undefined) of
_ChannelState = #{} -> ChannelState = #{} ->
%% TODO channel_status(ChannelState, State);
%% Since bucket name may be templated, we can't really provide any
%% additional information regarding the channel health.
?status_connected;
undefined -> undefined ->
?status_disconnected ?status_disconnected
end. end.
init_channel_state(#{parameters := Parameters}) -> start_channel(_State, #{
bridge_type := ?BRIDGE_TYPE_UPLOAD,
parameters := Parameters = #{
bucket := Bucket,
key := Key,
content := Content
}
}) ->
#{ #{
bucket => emqx_template:parse(maps:get(bucket, Parameters)), type => ?ACTION_UPLOAD,
key => emqx_template:parse(maps:get(key, Parameters)), bucket => emqx_template:parse(Bucket),
content => emqx_template:parse(maps:get(content, Parameters)), key => emqx_template:parse(Key),
upload_options => #{ content => emqx_template:parse(Content),
acl => maps:get(acl, Parameters, undefined) upload_options => upload_options(Parameters)
} };
start_channel(State, #{
bridge_type := Type = ?BRIDGE_TYPE_AGGREGATED_UPLOAD,
bridge_name := Name,
parameters := Parameters = #{
aggregation := #{
time_interval := TimeInterval,
max_records := MaxRecords
},
container := Container,
bucket := Bucket
}
}) ->
AggregOpts = #{
time_interval => TimeInterval,
max_records => MaxRecords,
work_dir => work_dir(Type, Name)
},
DeliveryOpts = #{
bucket => Bucket,
key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters),
container => Container,
upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters),
client_config => maps:get(client_config, State),
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
},
_ = emqx_bridge_s3_sup:delete_child({Type, Name}),
{ok, SupPid} = emqx_bridge_s3_sup:start_child(#{
id => {Type, Name},
start => {emqx_bridge_s3_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]},
type => supervisor,
restart => permanent
}),
#{
type => ?ACTION_AGGREGATED_UPLOAD,
name => Name,
bucket => Bucket,
supervisor => SupPid,
on_stop => fun() -> emqx_bridge_s3_sup:delete_child({Type, Name}) end
}. }.
upload_options(Parameters) ->
#{acl => maps:get(acl, Parameters, undefined)}.
work_dir(Type, Name) ->
filename:join([emqx:data_dir(), bridge, Type, Name]).
stop_channel(#{on_stop := OnStop}) ->
OnStop();
stop_channel(_ChannelState) ->
ok.
channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
%% TODO
%% Since bucket name may be templated, we can't really provide any additional
%% information regarding the channel health.
?status_connected;
channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) ->
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second),
ok = emqx_bridge_s3_aggregator:tick(Name, Timestamp),
ok = check_bucket_accessible(Bucket, State),
ok = check_aggreg_upload_errors(Name),
?status_connected.
check_bucket_accessible(Bucket, #{client_config := Config}) ->
case emqx_s3_client:aws_config(Config) of
{error, Reason} ->
throw({unhealthy_target, Reason});
AWSConfig ->
try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of
Props when is_list(Props) ->
ok
catch
error:{aws_error, {http_error, 404, _, _Reason}} ->
throw({unhealthy_target, "Bucket does not exist"});
error:{aws_error, {socket_error, Reason}} ->
throw({unhealthy_target, emqx_utils:format(Reason)})
end
end.
check_aggreg_upload_errors(Name) ->
case emqx_bridge_s3_aggregator:take_error(Name) of
[Error] ->
%% TODO
%% This approach means that, for example, 3 upload failures will cause
%% the channel to be marked as unhealthy for 3 consecutive health checks.
ErrorMessage = emqx_utils:format(Error),
throw({unhealthy_target, ErrorMessage});
[] ->
ok
end.
%% Queries %% Queries
-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}. -type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
@ -167,8 +282,21 @@ init_channel_state(#{parameters := Parameters}) ->
{ok, _Result} | {error, _Reason}. {ok, _Result} | {error, _Reason}.
on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) -> on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
case maps:get(Tag, Channels, undefined) of case maps:get(Tag, Channels, undefined) of
ChannelState = #{} -> ChannelState = #{type := ?ACTION_UPLOAD} ->
run_simple_upload(InstId, Tag, Data, ChannelState, Config); run_simple_upload(InstId, Tag, Data, ChannelState, Config);
ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
run_aggregated_upload(InstId, [Data], ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
-spec on_batch_query(_InstanceId :: resource_id(), [query()], state()) ->
{ok, _Result} | {error, _Reason}.
on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) ->
case maps:get(Tag, Channels, undefined) of
ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
Records = [Data0 | [Data || {_, Data} <- Rest]],
run_aggregated_upload(InstId, Records, ChannelState);
undefined -> undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}} {error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end. end.
@ -206,6 +334,16 @@ run_simple_upload(
{error, map_error(Reason)} {error, map_error(Reason)}
end. end.
run_aggregated_upload(InstId, Records, #{name := Name}) ->
Timestamp = erlang:system_time(second),
case emqx_bridge_s3_aggregator:push_records(Name, Timestamp, Records) of
ok ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}),
ok;
{error, Reason} ->
{error, {unrecoverable_error, Reason}}
end.
map_error({socket_error, _} = Reason) -> map_error({socket_error, _} = Reason) ->
{recoverable_error, Reason}; {recoverable_error, Reason};
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->

View File

@ -20,7 +20,7 @@ type_name() ->
s3. s3.
bridge_types() -> bridge_types() ->
[s3]. [s3, s3_aggregated_upload].
resource_callback_module() -> resource_callback_module() ->
emqx_bridge_s3_connector. emqx_bridge_s3_connector.

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_sup).
-export([
start_link/0,
start_child/1,
delete_child/1
]).
-behaviour(supervisor).
-export([init/1]).
-define(SUPREF, ?MODULE).
%%
start_link() ->
supervisor:start_link({local, ?SUPREF}, ?MODULE, root).
start_child(ChildSpec) ->
supervisor:start_child(?SUPREF, ChildSpec).
delete_child(ChildId) ->
case supervisor:terminate_child(?SUPREF, ChildId) of
ok ->
supervisor:delete_child(?SUPREF, ChildId);
Error ->
Error
end.
%%
init(root) ->
SupFlags = #{
strategy => one_for_one,
intensity => 1,
period => 1
},
{ok, {SupFlags, []}}.

View File

@ -0,0 +1,143 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_upload).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_s3.hrl").
-define(ACTION, ?ACTION_UPLOAD).
-behaviour(hocon_schema).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-export([
bridge_v2_examples/1
]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"bridge_s3".
roots() ->
[].
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
fields(action) ->
{?ACTION,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
#{
desc => <<"S3 Upload Action Config">>,
required => false
}
)};
fields(?ACTION) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
?R_REF(s3_upload_parameters),
#{
required => true,
desc => ?DESC(s3_upload)
}
),
#{
resource_opts_ref => ?R_REF(s3_action_resource_opts)
}
);
fields(s3_upload_parameters) ->
emqx_s3_schema:fields(s3_upload) ++
[
{content,
hoconsc:mk(
emqx_schema:template(),
#{
required => false,
default => <<"${.}">>,
desc => ?DESC(s3_object_content)
}
)}
];
fields(s3_action_resource_opts) ->
UnsupportedOpts = [batch_size, batch_time],
lists:filter(
fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
emqx_bridge_v2_schema:action_resource_opts_fields()
).
desc(s3) ->
?DESC(s3_upload);
desc(Name) when
Name == s3_upload;
Name == s3_upload_parameters
->
?DESC(Name);
desc(s3_action_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"s3">> => #{
summary => <<"S3 Simple Upload">>,
value => s3_upload_action_example(Method)
}
}
].
s3_upload_action_example(post) ->
maps:merge(
s3_upload_action_example(put),
#{
type => atom_to_binary(?ACTION_UPLOAD),
name => <<"my_s3_action">>
}
);
s3_upload_action_example(get) ->
maps:merge(
s3_upload_action_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
s3_upload_action_example(put) ->
#{
enable => true,
connector => <<"my_s3_connector">>,
description => <<"My action">>,
parameters => #{
bucket => <<"${clientid}">>,
key => <<"${topic}">>,
content => <<"${payload}">>,
acl => <<"public_read">>
},
resource_opts => #{
query_mode => <<"sync">>,
inflight_window => 10
}
}.

View File

@ -2,18 +2,20 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_s3_action_info). -module(emqx_bridge_s3_upload_action_info).
-behaviour(emqx_action_info). -behaviour(emqx_action_info).
-include("emqx_bridge_s3.hrl").
-export([ -export([
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0 schema_module/0
]). ]).
action_type_name() -> s3. action_type_name() -> ?ACTION_UPLOAD.
connector_type_name() -> s3. connector_type_name() -> s3.
schema_module() -> emqx_bridge_s3. schema_module() -> emqx_bridge_s3_upload.

View File

@ -11,8 +11,6 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl").
-import(emqx_utils_conv, [bin/1]).
%% See `emqx_bridge_s3.hrl`. %% See `emqx_bridge_s3.hrl`.
-define(BRIDGE_TYPE, <<"s3">>). -define(BRIDGE_TYPE, <<"s3">>).
-define(CONNECTOR_TYPE, <<"s3">>). -define(CONNECTOR_TYPE, <<"s3">>).
@ -79,67 +77,56 @@ end_per_testcase(_TestCase, _Config) ->
connector_config(Name, _Config) -> connector_config(Name, _Config) ->
BaseConf = emqx_s3_test_helpers:base_raw_config(tcp), BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{ emqx_bridge_s3_test_helpers:parse_and_check_config(
<<"enable">> => true, <<"connectors">>, ?CONNECTOR_TYPE, Name, #{
<<"description">> => <<"S3 Connector">>, <<"enable">> => true,
<<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)), <<"description">> => <<"S3 Connector">>,
<<"port">> => maps:get(<<"port">>, BaseConf), <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), <<"port">> => maps:get(<<"port">>, BaseConf),
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
<<"transport_options">> => #{ <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
<<"headers">> => #{ <<"transport_options">> => #{
<<"content-type">> => <<?CONTENT_TYPE>> <<"headers">> => #{
<<"content-type">> => <<?CONTENT_TYPE>>
},
<<"connect_timeout">> => <<"500ms">>,
<<"request_timeout">> => <<"1s">>,
<<"pool_size">> => 4,
<<"max_retries">> => 0,
<<"enable_pipelining">> => 1
}, },
<<"connect_timeout">> => <<"500ms">>, <<"resource_opts">> => #{
<<"request_timeout">> => <<"1s">>, <<"health_check_interval">> => <<"5s">>,
<<"pool_size">> => 4, <<"start_timeout">> => <<"5s">>
<<"max_retries">> => 0, }
<<"enable_pipelining">> => 1
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"5s">>,
<<"start_timeout">> => <<"5s">>
} }
}). ).
action_config(Name, ConnectorId) -> action_config(Name, ConnectorId) ->
parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{ emqx_bridge_s3_test_helpers:parse_and_check_config(
<<"enable">> => true, <<"actions">>, ?BRIDGE_TYPE, Name, #{
<<"connector">> => ConnectorId, <<"enable">> => true,
<<"parameters">> => #{ <<"connector">> => ConnectorId,
<<"bucket">> => <<"${clientid}">>, <<"parameters">> => #{
<<"key">> => <<"${topic}">>, <<"bucket">> => <<"${clientid}">>,
<<"content">> => <<"${payload}">>, <<"key">> => <<"${topic}">>,
<<"acl">> => <<"public_read">> <<"content">> => <<"${payload}">>,
}, <<"acl">> => <<"public_read">>
<<"resource_opts">> => #{ },
<<"buffer_mode">> => <<"memory_only">>, <<"resource_opts">> => #{
<<"buffer_seg_bytes">> => <<"10MB">>, <<"buffer_mode">> => <<"memory_only">>,
<<"health_check_interval">> => <<"3s">>, <<"buffer_seg_bytes">> => <<"10MB">>,
<<"inflight_window">> => 40, <<"health_check_interval">> => <<"3s">>,
<<"max_buffer_bytes">> => <<"256MB">>, <<"inflight_window">> => 40,
<<"metrics_flush_interval">> => <<"1s">>, <<"max_buffer_bytes">> => <<"256MB">>,
<<"query_mode">> => <<"sync">>, <<"metrics_flush_interval">> => <<"1s">>,
<<"request_ttl">> => <<"60s">>, <<"query_mode">> => <<"sync">>,
<<"resume_interval">> => <<"3s">>, <<"request_ttl">> => <<"60s">>,
<<"worker_pool_size">> => <<"4">> <<"resume_interval">> => <<"3s">>,
<<"worker_pool_size">> => <<"4">>
}
} }
}). ).
parse_and_check_config(Root, Type, Name, ConfigIn) ->
Schema =
case Root of
<<"connectors">> -> emqx_connector_schema;
<<"actions">> -> emqx_bridge_v2_schema
end,
#{Root := #{Type := #{Name := Config}}} =
hocon_tconf:check_plain(
Schema,
#{Root => #{Type => #{Name => ConfigIn}}},
#{required => false, atom_key => false}
),
ct:pal("parsed config: ~p", [Config]),
ConfigIn.
t_start_stop(Config) -> t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
@ -190,7 +177,7 @@ t_sync_query(Config) ->
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig), ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
ok = emqx_bridge_v2_testlib:t_sync_query( ok = emqx_bridge_v2_testlib:t_sync_query(
Config, Config,
fun() -> mk_message(Bucket, Topic, Payload) end, fun() -> emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload) end,
fun(Res) -> ?assertMatch(ok, Res) end, fun(Res) -> ?assertMatch(ok, Res) end,
s3_bridge_connector_upload_ok s3_bridge_connector_upload_ok
), ),
@ -224,15 +211,10 @@ t_query_retry_recoverable(Config) ->
heal_failure, heal_failure,
[timeout, ?PROXY_NAME, ProxyHost, ProxyPort] [timeout, ?PROXY_NAME, ProxyHost, ProxyPort]
), ),
Message = mk_message(Bucket, Topic, Payload), Message = emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload),
%% Verify that the message is sent eventually. %% Verify that the message is sent eventually.
ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}), ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}),
?assertMatch( ?assertMatch(
#{content := Payload}, #{content := Payload},
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig)) maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
). ).
mk_message(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
Event.

View File

@ -0,0 +1,181 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_buffer_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
%% CT Setup
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
WorkDir = emqx_cth_suite:work_dir(Config),
ok = filelib:ensure_path(WorkDir),
[{work_dir, WorkDir} | Config].
end_per_suite(_Config) ->
ok.
%% Testcases
t_write_read_cycle(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config),
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
{ok, WFD} = file:open(Filename, [write, binary]),
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
Terms = [
[],
[[[[[[[[]]]]]]]],
123,
lists:seq(1, 100),
lists:seq(1, 1000),
lists:seq(1, 10000),
lists:seq(1, 100000),
#{<<"id">> => 123456789, <<"ts">> => <<"2028-02-29T12:34:56Z">>, <<"gauge">> => 42.42},
{<<"text/plain">>, _Huge = rand:bytes(1048576)},
{<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}
],
ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end,
Terms
),
ok = file:close(WFD),
{ok, RFD} = file:open(Filename, [read, binary, raw]),
{MetadataRead, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD),
?assertEqual(Metadata, MetadataRead),
TermsRead = read_until_eof(Reader),
?assertEqual(Terms, TermsRead).
t_read_empty(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config),
{ok, WFD} = file:open(Filename, [write, binary]),
ok = file:close(WFD),
{ok, RFD} = file:open(Filename, [read, binary]),
?assertError(
{buffer_incomplete, header},
emqx_bridge_s3_aggreg_buffer:new_reader(RFD)
).
t_read_garbage(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config),
{ok, WFD} = file:open(Filename, [write, binary]),
ok = file:write(WFD, rand:bytes(1048576)),
ok = file:close(WFD),
{ok, RFD} = file:open(Filename, [read, binary]),
?assertError(
badarg,
emqx_bridge_s3_aggreg_buffer:new_reader(RFD)
).
t_read_truncated(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config),
{ok, WFD} = file:open(Filename, [write, binary]),
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
Terms = [
[[[[[[[[[[[]]]]]]]]]]],
lists:seq(1, 100000),
#{<<"id">> => 123456789, <<"ts">> => <<"2029-02-30T12:34:56Z">>, <<"gauge">> => 42.42},
{<<"text/plain">>, _Huge = rand:bytes(1048576)}
],
LastTerm =
{<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})},
ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end,
Terms
),
{ok, WPos} = file:position(WFD, cur),
?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(LastTerm, Writer)),
ok = file:close(WFD),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos + 1),
{ok, RFD1} = file:open(Filename, [read, binary]),
{Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD1),
{ReadTerms1, Reader1} = read_terms(length(Terms), Reader0),
?assertEqual(Terms, ReadTerms1),
?assertError(
badarg,
emqx_bridge_s3_aggreg_buffer:read(Reader1)
),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2),
{ok, RFD2} = file:open(Filename, [read, binary]),
{Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD2),
{ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2),
?assertEqual(lists:sublist(Terms, 3), ReadTerms2),
?assertError(
badarg,
emqx_bridge_s3_aggreg_buffer:read(Reader3)
).
t_read_truncated_takeover_write(Config) ->
Filename = mk_filename(?FUNCTION_NAME, Config),
{ok, WFD} = file:open(Filename, [write, binary]),
Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}},
Writer1 = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata),
Terms1 = [
[[[[[[[[[[[]]]]]]]]]]],
lists:seq(1, 10000),
lists:duplicate(1000, ?FUNCTION_NAME),
{<<"text/plain">>, _Huge = rand:bytes(1048576)}
],
Terms2 = [
{<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})},
{<<"application/x-octet-stream">>, rand:bytes(102400)}
],
ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer1)) end,
Terms1
),
{ok, WPos} = file:position(WFD, cur),
ok = file:close(WFD),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2),
{ok, RWFD} = file:open(Filename, [read, write, binary]),
{Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RWFD),
{ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0),
?assertEqual(
lists:sublist(Terms1, 3),
ReadTerms1
),
?assertError(
badarg,
emqx_bridge_s3_aggreg_buffer:read(Reader1)
),
Writer2 = emqx_bridge_s3_aggreg_buffer:takeover(Reader1),
ok = lists:foreach(
fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer2)) end,
Terms2
),
ok = file:close(RWFD),
{ok, RFD} = file:open(Filename, [read, binary]),
{Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD),
ReadTerms2 = read_until_eof(Reader2),
?assertEqual(
lists:sublist(Terms1, 3) ++ Terms2,
ReadTerms2
).
%%
mk_filename(Name, Config) ->
filename:join(?config(work_dir, Config), Name).
read_terms(0, Reader) ->
{[], Reader};
read_terms(N, Reader0) ->
{Term, Reader1} = emqx_bridge_s3_aggreg_buffer:read(Reader0),
{Terms, Reader} = read_terms(N - 1, Reader1),
{[Term | Terms], Reader}.
read_until_eof(Reader0) ->
case emqx_bridge_s3_aggreg_buffer:read(Reader0) of
{Term, Reader} ->
[Term | read_until_eof(Reader)];
eof ->
[]
end.

View File

@ -0,0 +1,72 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_csv_tests).
-include_lib("eunit/include/eunit.hrl").
encoding_test() ->
CSV = emqx_bridge_s3_aggreg_csv:new(#{}),
?assertEqual(
"A,B,Ç\n"
"1.2345,string,0.0\n"
"0.3333333333,\"[]\",-0.0\n"
"111111,🫠,0.0\n"
"111.111,\"\"\"quoted\"\"\",\"line\r\nbreak\"\n"
"222.222,,\n",
fill_close(CSV, [
[
#{<<"A">> => 1.2345, <<"B">> => "string", <<"Ç"/utf8>> => +0.0},
#{<<"A">> => 1 / 3, <<"B">> => "[]", <<"Ç"/utf8>> => -0.0},
#{<<"A">> => 111111, <<"B">> => "🫠", <<"Ç"/utf8>> => 0.0},
#{<<"A">> => 111.111, <<"B">> => "\"quoted\"", <<"Ç"/utf8>> => "line\r\nbreak"},
#{<<"A">> => 222.222, <<"B">> => "", <<"Ç"/utf8>> => undefined}
]
])
).
column_order_test() ->
Order = [<<"ID">>, <<"TS">>],
CSV = emqx_bridge_s3_aggreg_csv:new(#{column_order => Order}),
?assertEqual(
"ID,TS,A,B,D\n"
"1,2024-01-01,12.34,str,\"[]\"\n"
"2,2024-01-02,23.45,ing,\n"
"3,,45,,'\n"
"4,2024-01-04,,,\n",
fill_close(CSV, [
[
#{
<<"A">> => 12.34,
<<"B">> => "str",
<<"ID">> => 1,
<<"TS">> => "2024-01-01",
<<"D">> => <<"[]">>
},
#{
<<"TS">> => "2024-01-02",
<<"C">> => <<"null">>,
<<"ID">> => 2,
<<"A">> => 23.45,
<<"B">> => "ing"
}
],
[
#{<<"A">> => 45, <<"D">> => <<"'">>, <<"ID">> => 3},
#{<<"ID">> => 4, <<"TS">> => "2024-01-04"}
]
])
).
fill_close(CSV, LRecords) ->
string(fill_close_(CSV, LRecords)).
fill_close_(CSV0, [Records | LRest]) ->
{Writes, CSV} = emqx_bridge_s3_aggreg_csv:fill(Records, CSV0),
[Writes | fill_close_(CSV, LRest)];
fill_close_(CSV, []) ->
[emqx_bridge_s3_aggreg_csv:close(CSV)].
string(Writes) ->
unicode:characters_to_list(Writes).

View File

@ -0,0 +1,465 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_upload_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").
-import(emqx_utils_conv, [bin/1]).
%% See `emqx_bridge_s3.hrl`.
-define(BRIDGE_TYPE, <<"s3_aggregated_upload">>).
-define(CONNECTOR_TYPE, <<"s3">>).
-define(PROXY_NAME, "minio_tcp").
-define(CONF_TIME_INTERVAL, 4000).
-define(CONF_MAX_RECORDS, 100).
-define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])).
-define(CONF_COLUMN_ORDER(T), [
<<"publish_received_at">>,
<<"clientid">>,
<<"topic">>,
<<"payload">>,
<<"empty">>
| T
]).
-define(LIMIT_TOLERANCE, 1.1).
%% CT Setup
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
% Setup toxiproxy
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
_ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_s3,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[
{apps, Apps},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{proxy_name, ?PROXY_NAME}
| Config
].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
%% Testcases
init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(15)),
ok = snabbkaffe:start_trace(),
TS = erlang:system_time(),
Name = iolist_to_binary(io_lib:format("~s-~p", [TestCase, TS])),
Bucket = unicode:characters_to_list(string:replace(Name, "_", "-", all)),
ConnectorConfig = connector_config(Name, Config),
ActionConfig = action_config(Name, Name, Bucket),
ok = emqx_bridge_s3_test_helpers:create_bucket(Bucket),
[
{connector_type, ?CONNECTOR_TYPE},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name},
{bridge_config, ActionConfig},
{s3_bucket, Bucket}
| Config
].
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
ok.
connector_config(Name, _Config) ->
BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
emqx_bridge_s3_test_helpers:parse_and_check_config(
<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
<<"enable">> => true,
<<"description">> => <<"S3 Connector">>,
<<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)),
<<"port">> => maps:get(<<"port">>, BaseConf),
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
<<"transport_options">> => #{
<<"connect_timeout">> => <<"500ms">>,
<<"request_timeout">> => <<"1s">>,
<<"pool_size">> => 4,
<<"max_retries">> => 0
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"1s">>
}
}
).
action_config(Name, ConnectorId, Bucket) ->
emqx_bridge_s3_test_helpers:parse_and_check_config(
<<"actions">>, ?BRIDGE_TYPE, Name, #{
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> => #{
<<"bucket">> => unicode:characters_to_binary(Bucket),
<<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>,
<<"acl">> => <<"public_read">>,
<<"headers">> => #{
<<"X-AMZ-Meta-Version">> => <<"42">>
},
<<"aggregation">> => #{
<<"time_interval">> => <<"4s">>,
<<"max_records">> => ?CONF_MAX_RECORDS
},
<<"container">> => #{
<<"type">> => <<"csv">>,
<<"column_order">> => ?CONF_COLUMN_ORDER
}
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"1s">>,
<<"max_buffer_bytes">> => <<"64MB">>,
<<"query_mode">> => <<"async">>,
<<"worker_pool_size">> => 4
}
}
).
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config).
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
t_aggreg_upload(Config) ->
Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config),
BridgeNameString = unicode:characters_to_list(BridgeName),
NodeString = atom_to_list(node()),
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Prepare some sample messages that look like Rule SQL productions.
MessageEvents = lists:map(fun mk_message_event/1, [
{<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
{<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>},
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
]),
ok = send_messages(BridgeName, MessageEvents),
%% Wait until the delivery is completed.
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
%% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
?assertMatch(
[BridgeNameString, NodeString, _Datetime, _Seq = "0"],
string:split(Key, "/", all)
),
Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
?assertMatch(
#{content_type := "text/csv", "x-amz-meta-version" := "42"},
Upload
),
%% Verify that column order is respected.
?assertMatch(
{ok, [
?CONF_COLUMN_ORDER(_),
[TS, <<"C1">>, T1, P1, <<>> | _],
[TS, <<"C2">>, T2, P2, <<>> | _],
[TS, <<"C3">>, T3, P3, <<>> | _]
]},
erl_csv:decode(Content)
).
t_aggreg_upload_rule(Config) ->
Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config),
ClientID = emqx_utils_conv:bin(?FUNCTION_NAME),
%% Create a bridge with the sample configuration and a simple SQL rule.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
?assertMatch(
{ok, _Rule},
emqx_bridge_v2_testlib:create_rule_and_action_http(?BRIDGE_TYPE, <<>>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 's3/#'"
>>
})
),
ok = lists:foreach(fun emqx:publish/1, [
emqx_message:make(?FUNCTION_NAME, T1 = <<"s3/m1">>, P1 = <<"[HELLO]">>),
emqx_message:make(?FUNCTION_NAME, T2 = <<"s3/m2">>, P2 = <<"[WORLD]">>),
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
]),
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
%% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
%% Verify that column order is respected and event fields are preserved.
?assertMatch(?CONF_COLUMN_ORDER(_), Header),
?assertEqual(
[<<"event">>, <<"qos">>, <<"psize">>],
[C || C <- [<<"event">>, <<"qos">>, <<"psize">>], lists:member(C, Header)]
),
%% Verify that all the matching messages are present.
?assertMatch(
[
[_TS1, ClientID, T1, P1 | _],
[_TS2, ClientID, T2, P2 | _],
[_TS3, ClientID, T3, P3 | _]
],
Rows
),
%% Verify that timestamp column now has RFC3339 format.
[_Row = [TS1 | _] | _Rest] = Rows,
?assert(
is_integer(emqx_rule_funcs:rfc3339_to_unix_ts(TS1, millisecond)),
TS1
).
t_aggreg_upload_restart(Config) ->
%% NOTE
%% This test verifies that the bridge will reuse existing aggregation buffer
%% after a restart.
Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config),
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Send some sample messages that look like Rule SQL productions.
MessageEvents = lists:map(fun mk_message_event/1, [
{<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
{<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>},
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
]),
ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}),
%% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Send some more messages.
ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}),
%% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
%% Check there's still only one upload.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
?assertMatch(
{ok, [
_Header = [_ | _],
[TS1, <<"C1">>, T1, P1 | _],
[TS1, <<"C2">>, T2, P2 | _],
[TS1, <<"C3">>, T3, P3 | _],
[TS2, <<"C1">>, T1, P1 | _],
[TS2, <<"C2">>, T2, P2 | _],
[TS2, <<"C3">>, T3, P3 | _]
]},
erl_csv:decode(Content)
).
t_aggreg_upload_restart_corrupted(Config) ->
%% NOTE
%% This test verifies that the bridge can recover from a buffer file corruption,
%% and does so while preserving uncompromised data.
Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config),
BatchSize = ?CONF_MAX_RECORDS div 2,
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Send some sample messages that look like Rule SQL productions.
Messages1 = [
{integer_to_binary(N), <<"a/b/c">>, <<"{\"hello\":\"world\"}">>}
|| N <- lists:seq(1, BatchSize)
],
%% Ensure that they span multiple batch queries.
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
{ok, _} = ?block_until(
#{?snk_kind := s3_aggreg_records_written, action := BridgeName},
infinity,
0
),
%% Find out the buffer file.
{ok, #{filename := Filename}} = ?block_until(
#{?snk_kind := s3_aggreg_buffer_allocated, action := BridgeName}
),
%% Stop the bridge, corrupt the buffer file, and restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
BufferFileSize = filelib:file_size(Filename),
ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, BufferFileSize div 2),
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Send some more messages.
Messages2 = [
{integer_to_binary(N), <<"c/d/e">>, <<"{\"hello\":\"world\"}">>}
|| N <- lists:seq(1, BatchSize)
],
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
%% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
%% Check that upload contains part of the first batch and all of the second batch.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
NRows = length(Rows),
?assert(
NRows > BatchSize,
CSV
),
?assertEqual(
lists:sublist(Messages1, NRows - BatchSize) ++ Messages2,
[{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
CSV
).
t_aggreg_pending_upload_restart(Config) ->
%% NOTE
%% This test verifies that the bridge will finish uploading a buffer file after
%% a restart.
Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config),
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Send few large messages that will require multipart upload.
%% Ensure that they span multiple batch queries.
Payload = iolist_to_binary(lists:duplicate(128 * 1024, "PAYLOAD!")),
Messages = [{integer_to_binary(N), <<"a/b/c">>, Payload} || N <- lists:seq(1, 10)],
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages), 10),
%% Wait until the multipart upload is started.
{ok, #{key := ObjectKey}} =
?block_until(#{?snk_kind := s3_client_multipart_started, bucket := Bucket}),
%% Stop the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
%% Verify that pending uploads have been gracefully aborted.
%% NOTE: Minio does not support multipart upload listing w/o prefix.
?assertEqual(
[],
emqx_bridge_s3_test_helpers:list_pending_uploads(Bucket, ObjectKey)
),
%% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
%% Check that delivery contains all the messages.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
[_Header | Rows] = fetch_parse_csv(Bucket, Key),
?assertEqual(
Messages,
[{CID, Topic, PL} || [_TS, CID, Topic, PL | _] <- Rows]
).
t_aggreg_next_rotate(Config) ->
%% NOTE
%% This is essentially a stress test that tries to verify that buffer rotation
%% and windowing work correctly under high rate, high concurrency conditions.
Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config),
NSenders = 4,
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Start separate processes to send messages.
Senders = [
spawn_link(fun() -> run_message_sender(BridgeName, N) end)
|| N <- lists:seq(1, NSenders)
],
%% Give them some time to send messages so that rotation and windowing will happen.
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 1.5)),
%% Stop the senders.
_ = [Sender ! {stop, self()} || Sender <- Senders],
NSent = receive_sender_reports(Senders),
%% Wait for the last delivery to complete.
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}, infinity, 0),
%% There should be at least 2 time windows of aggregated records.
Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]],
?assert(
ordsets:size(ordsets:from_list(DTs)) > 1,
Uploads
),
%% Uploads should not contain more than max allowed records.
CSVs = [{K, fetch_parse_csv(Bucket, K)} || K <- Uploads],
NRecords = [{K, length(CSV) - 1} || {K, CSV} <- CSVs],
?assertEqual(
[],
[{K, NR} || {K, NR} <- NRecords, NR > ?CONF_MAX_RECORDS * ?LIMIT_TOLERANCE]
),
%% No message should be lost.
?assertEqual(
NSent,
lists:sum([NR || {_, NR} <- NRecords])
).
run_message_sender(BridgeName, N) ->
ClientID = integer_to_binary(N),
Topic = <<"a/b/c/", ClientID/binary>>,
run_message_sender(BridgeName, N, ClientID, Topic, N, 0).
run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent) ->
Payload = integer_to_binary(N * 1_000_000 + NSent),
Message = emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload),
_ = send_message(BridgeName, Message),
receive
{stop, From} ->
From ! {sent, self(), NSent + 1}
after Delay ->
run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent + 1)
end.
receive_sender_reports([Sender | Rest]) ->
receive
{sent, Sender, NSent} -> NSent + receive_sender_reports(Rest)
end;
receive_sender_reports([]) ->
0.
%%
mk_message_event({ClientID, Topic, Payload}) ->
emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload).
send_messages(BridgeName, MessageEvents) ->
lists:foreach(
fun(M) -> send_message(BridgeName, M) end,
MessageEvents
).
send_messages_delayed(BridgeName, MessageEvents, Delay) ->
lists:foreach(
fun(M) ->
send_message(BridgeName, M),
timer:sleep(Delay)
end,
MessageEvents
).
send_message(BridgeName, Message) ->
?assertEqual(ok, emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{})).
fetch_parse_csv(Bucket, Key) ->
#{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
{ok, CSV} = erl_csv:decode(Content),
CSV.

View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_test_helpers).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_utils_conv, [bin/1]).
parse_and_check_config(Root, Type, Name, Config) ->
Schema =
case Root of
<<"connectors">> -> emqx_connector_schema;
<<"actions">> -> emqx_bridge_v2_schema
end,
#{Root := #{Type := #{Name := _ConfigParsed}}} =
hocon_tconf:check_plain(
Schema,
#{Root => #{Type => #{Name => Config}}},
#{required => false, atom_key => false}
),
Config.
mk_message_event(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
emqx_utils_maps:binary_key_map(Event).
create_bucket(Bucket) ->
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
erlcloud_s3:create_bucket(Bucket, AwsConfig).
list_objects(Bucket) ->
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
Response = erlcloud_s3:list_objects(Bucket, AwsConfig),
false = proplists:get_value(is_truncated, Response),
Contents = proplists:get_value(contents, Response),
lists:map(fun maps:from_list/1, Contents).
get_object(Bucket, Key) ->
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
maps:from_list(erlcloud_s3:get_object(Bucket, Key, AwsConfig)).
list_pending_uploads(Bucket, Key) ->
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
{ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig),
Uploads = proplists:get_value(uploads, Props),
lists:map(fun maps:from_list/1, Uploads).
%% File utilities
truncate_at(Filename, Pos) ->
{ok, FD} = file:open(Filename, [read, write, binary]),
{ok, Pos} = file:position(FD, Pos),
ok = file:truncate(FD),
ok = file:close(FD).

View File

@ -6,6 +6,7 @@
-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("erlcloud/include/erlcloud_aws.hrl"). -include_lib("erlcloud/include/erlcloud_aws.hrl").
-export([ -export([
@ -133,7 +134,13 @@ start_multipart(
Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)), Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)),
case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of
{ok, Props} -> {ok, Props} ->
{ok, response_property('uploadId', Props)}; UploadId = response_property('uploadId', Props),
?tp(s3_client_multipart_started, #{
bucket => Bucket,
key => Key,
upload_id => UploadId
}),
{ok, UploadId};
{error, Reason} -> {error, Reason} ->
?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}),
{error, Reason} {error, Reason}
@ -177,6 +184,11 @@ complete_multipart(
) )
of of
ok -> ok ->
?tp(s3_client_multipart_completed, #{
bucket => Bucket,
key => Key,
upload_id => UploadId
}),
ok; ok;
{error, Reason} -> {error, Reason} ->
?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}), ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}),
@ -193,6 +205,11 @@ abort_multipart(
) -> ) ->
case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of
ok -> ok ->
?tp(s3_client_multipart_aborted, #{
bucket => Bucket,
key => Key,
upload_id => UploadId
}),
ok; ok;
{error, Reason} -> {error, Reason} ->
?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}), ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}),

View File

@ -53,7 +53,7 @@
emqx_s3_client:bucket(), emqx_s3_client:bucket(),
emqx_s3_client:config(), emqx_s3_client:config(),
emqx_s3_client:upload_options(), emqx_s3_client:upload_options(),
emqx_s3_uploader:config() emqx_s3_upload:config()
}. }.
-define(DEFAULT_CALL_TIMEOUT, 5000). -define(DEFAULT_CALL_TIMEOUT, 5000).

View File

@ -102,6 +102,14 @@ fields(s3_upload) ->
desc => ?DESC("acl"), desc => ?DESC("acl"),
required => false required => false
} }
)},
{headers,
hoconsc:mk(
map(),
#{
required => false,
desc => ?DESC("upload_headers")
}
)} )}
]; ];
fields(s3_uploader) -> fields(s3_uploader) ->

View File

@ -0,0 +1,217 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_s3_upload).
-include_lib("emqx/include/types.hrl").
-export([
new/4,
append/2,
write/1,
complete/1,
abort/1
]).
-export([format/1]).
-export_type([t/0, config/0]).
-type config() :: #{
min_part_size => pos_integer(),
max_part_size => pos_integer()
}.
-type t() :: #{
started := boolean(),
client := emqx_s3_client:client(),
key := emqx_s3_client:key(),
upload_opts := emqx_s3_client:upload_options(),
buffer := iodata(),
buffer_size := non_neg_integer(),
min_part_size := pos_integer(),
max_part_size := pos_integer(),
upload_id := undefined | emqx_s3_client:upload_id(),
etags := [emqx_s3_client:etag()],
part_number := emqx_s3_client:part_number()
}.
%% 5MB
-define(DEFAULT_MIN_PART_SIZE, 5242880).
%% 5GB
-define(DEFAULT_MAX_PART_SIZE, 5368709120).
%%
-spec new(
emqx_s3_client:client(),
emqx_s3_client:key(),
emqx_s3_client:upload_options(),
config()
) ->
t().
new(Client, Key, UploadOpts, Config) ->
#{
started => false,
client => Client,
key => Key,
upload_opts => UploadOpts,
buffer => [],
buffer_size => 0,
min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE),
max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE),
upload_id => undefined,
etags => [],
part_number => 1
}.
-spec append(iodata(), t()) -> {ok, t()} | {error, term()}.
append(WriteData, #{buffer := Buffer, buffer_size := BufferSize} = Upload) ->
case is_valid_part(WriteData, Upload) of
true ->
{ok, Upload#{
buffer => [Buffer, WriteData],
buffer_size => BufferSize + iolist_size(WriteData)
}};
false ->
{error, {too_large, iolist_size(WriteData)}}
end.
-spec write(t()) -> {ok, t()} | {cont, t()} | {error, term()}.
write(U0 = #{started := false}) ->
case maybe_start_upload(U0) of
not_started ->
{ok, U0};
{started, U1} ->
{cont, U1#{started := true}};
{error, _} = Error ->
Error
end;
write(U0 = #{started := true}) ->
maybe_upload_part(U0).
-spec complete(t()) -> {ok, t()} | {error, term()}.
complete(
#{
started := true,
client := Client,
key := Key,
upload_id := UploadId
} = U0
) ->
case upload_part(U0) of
{ok, #{etags := ETagsRev} = U1} ->
ETags = lists:reverse(ETagsRev),
case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of
ok ->
{ok, U1};
{error, _} = Error ->
Error
end;
{error, _} = Error ->
Error
end;
complete(#{started := false} = Upload) ->
put_object(Upload).
-spec abort(t()) -> ok_or_error(term()).
abort(#{
started := true,
client := Client,
key := Key,
upload_id := UploadId
}) ->
case emqx_s3_client:abort_multipart(Client, Key, UploadId) of
ok ->
ok;
{error, _} = Error ->
Error
end;
abort(#{started := false}) ->
ok.
%%--------------------------------------------------------------------
-spec format(t()) -> map().
format(Upload = #{client := Client}) ->
Upload#{
client => emqx_s3_client:format(Client),
buffer => [<<"...">>]
}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec maybe_start_upload(t()) -> not_started | {started, t()} | {error, term()}.
maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
start_upload(Data);
false ->
not_started
end.
-spec start_upload(t()) -> {started, t()} | {error, term()}.
start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) ->
case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of
{ok, UploadId} ->
NewData = Data#{upload_id => UploadId},
{started, NewData};
{error, _} = Error ->
Error
end.
-spec maybe_upload_part(t()) -> ok_or_error(t(), term()).
maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
upload_part(Data);
false ->
{ok, Data}
end.
-spec upload_part(t()) -> ok_or_error(t(), term()).
upload_part(#{buffer_size := 0} = Upload) ->
{ok, Upload};
upload_part(
#{
client := Client,
key := Key,
upload_id := UploadId,
buffer := Buffer,
part_number := PartNumber,
etags := ETags
} = Upload
) ->
case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of
{ok, ETag} ->
{ok, Upload#{
buffer => [],
buffer_size => 0,
part_number => PartNumber + 1,
etags => [{PartNumber, ETag} | ETags]
}};
{error, _} = Error ->
Error
end.
-spec put_object(t()) -> ok_or_error(t(), term()).
put_object(
#{
client := Client,
key := Key,
upload_opts := UploadOpts,
buffer := Buffer
} = Upload
) ->
case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of
ok ->
{ok, Upload};
{error, _} = Error ->
Error
end.
is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
BufferSize + iolist_size(WriteData) =< MaxPartSize.

View File

@ -6,7 +6,7 @@
-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/types.hrl").
-behaviour(gen_statem). -behaviour(gen_server).
-export([ -export([
start_link/3, start_link/3,
@ -25,46 +25,23 @@
-export([ -export([
init/1, init/1,
callback_mode/0, handle_call/3,
handle_event/4, handle_cast/2,
terminate/3, terminate/2,
code_change/4, format_status/1
format_status/1,
format_status/2
]). ]).
-export_type([config/0]).
-type config() :: #{
min_part_size => pos_integer(),
max_part_size => pos_integer()
}.
-type data() :: #{ -type data() :: #{
profile_id => emqx_s3:profile_id(), profile_id => emqx_s3:profile_id(),
client := emqx_s3_client:client(), upload := emqx_s3_upload:t() | aborted
key := emqx_s3_client:key(),
upload_opts := emqx_s3_client:upload_options(),
buffer := iodata(),
buffer_size := non_neg_integer(),
min_part_size := pos_integer(),
max_part_size := pos_integer(),
upload_id := undefined | emqx_s3_client:upload_id(),
etags := [emqx_s3_client:etag()],
part_number := emqx_s3_client:part_number()
}. }.
%% 5MB
-define(DEFAULT_MIN_PART_SIZE, 5242880).
%% 5GB
-define(DEFAULT_MAX_PART_SIZE, 5368709120).
-define(DEFAULT_TIMEOUT, 30000). -define(DEFAULT_TIMEOUT, 30000).
-spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) -> -spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
gen_statem:start_ret(). gen_server:start_ret().
start_link(ProfileId, Key, UploadOpts) when is_list(Key) -> start_link(ProfileId, Key, UploadOpts) when is_list(Key) ->
gen_statem:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []). gen_server:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []).
-spec write(pid(), iodata()) -> ok_or_error(term()). -spec write(pid(), iodata()) -> ok_or_error(term()).
write(Pid, WriteData) -> write(Pid, WriteData) ->
@ -72,7 +49,7 @@ write(Pid, WriteData) ->
-spec write(pid(), iodata(), timeout()) -> ok_or_error(term()). -spec write(pid(), iodata(), timeout()) -> ok_or_error(term()).
write(Pid, WriteData, Timeout) -> write(Pid, WriteData, Timeout) ->
gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout). gen_server:call(Pid, {write, wrap(WriteData)}, Timeout).
-spec complete(pid()) -> ok_or_error(term()). -spec complete(pid()) -> ok_or_error(term()).
complete(Pid) -> complete(Pid) ->
@ -80,7 +57,7 @@ complete(Pid) ->
-spec complete(pid(), timeout()) -> ok_or_error(term()). -spec complete(pid(), timeout()) -> ok_or_error(term()).
complete(Pid, Timeout) -> complete(Pid, Timeout) ->
gen_statem:call(Pid, complete, Timeout). gen_server:call(Pid, complete, Timeout).
-spec abort(pid()) -> ok_or_error(term()). -spec abort(pid()) -> ok_or_error(term()).
abort(Pid) -> abort(Pid) ->
@ -88,7 +65,7 @@ abort(Pid) ->
-spec abort(pid(), timeout()) -> ok_or_error(term()). -spec abort(pid(), timeout()) -> ok_or_error(term()).
abort(Pid, Timeout) -> abort(Pid, Timeout) ->
gen_statem:call(Pid, abort, Timeout). gen_server:call(Pid, abort, Timeout).
-spec shutdown(pid()) -> ok. -spec shutdown(pid()) -> ok.
shutdown(Pid) -> shutdown(Pid) ->
@ -99,231 +76,73 @@ shutdown(Pid) ->
%% gen_statem callbacks %% gen_statem callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
callback_mode() -> handle_event_function.
init({profile, ProfileId, Key, UploadOpts}) -> init({profile, ProfileId, Key, UploadOpts}) ->
_ = process_flag(trap_exit, true),
{Bucket, ClientConfig, BaseOpts, UploaderConfig} = {Bucket, ClientConfig, BaseOpts, UploaderConfig} =
emqx_s3_profile_conf:checkout_config(ProfileId), emqx_s3_profile_conf:checkout_config(ProfileId),
Upload = #{ Upload = emqx_s3_upload:new(
profile_id => ProfileId, client(Bucket, ClientConfig),
client => client(Bucket, ClientConfig), Key,
key => Key, maps:merge(BaseOpts, UploadOpts),
upload_opts => maps:merge(BaseOpts, UploadOpts) UploaderConfig
}, ),
init({upload, UploaderConfig, Upload}); {ok, #{profile_id => ProfileId, upload => Upload}}.
init({upload, Config, Upload}) ->
process_flag(trap_exit, true),
{ok, upload_not_started, Upload#{
buffer => [],
buffer_size => 0,
min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE),
max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE),
upload_id => undefined,
etags => [],
part_number => 1
}}.
handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) -> -spec handle_call(_Call, gen_server:from(), data()) ->
{reply, _Result, data()} | {stop, _Reason, _Result, data()}.
handle_call({write, WriteDataWrapped}, _From, St0 = #{upload := U0}) ->
WriteData = unwrap(WriteDataWrapped), WriteData = unwrap(WriteDataWrapped),
case is_valid_part(WriteData, Data0) of case emqx_s3_upload:append(WriteData, U0) of
true -> {ok, U1} ->
handle_write(State, From, WriteData, Data0); handle_write(St0#{upload := U1});
false -> {error, _} = Error ->
{keep_state_and_data, {reply, From, {error, {too_large, iolist_size(WriteData)}}}} {reply, Error, St0}
end; end;
handle_event({call, From}, complete, upload_not_started, Data0) -> handle_call(complete, _From, St0 = #{upload := U0}) ->
case put_object(Data0) of case emqx_s3_upload:complete(U0) of
{ok, U1} ->
{stop, normal, ok, St0#{upload := U1}};
{error, _} = Error ->
{stop, Error, Error, St0}
end;
handle_call(abort, _From, St = #{upload := Upload}) ->
case emqx_s3_upload:abort(Upload) of
ok -> ok ->
{stop_and_reply, normal, {reply, From, ok}}; {stop, normal, ok, St};
{error, _} = Error -> {error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data0} {stop, Error, Error, St}
end;
handle_event({call, From}, complete, upload_started, Data0) ->
case complete_upload(Data0) of
{ok, Data1} ->
{stop_and_reply, normal, {reply, From, ok}, Data1};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data0}
end;
handle_event({call, From}, abort, upload_not_started, _Data) ->
{stop_and_reply, normal, {reply, From, ok}};
handle_event({call, From}, abort, upload_started, Data0) ->
case abort_upload(Data0) of
ok ->
{stop_and_reply, normal, {reply, From, ok}};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data0}
end. end.
handle_write(upload_not_started, From, WriteData, Data0) -> handle_write(St = #{upload := U0}) ->
Data1 = append_buffer(Data0, WriteData), case emqx_s3_upload:write(U0) of
case maybe_start_upload(Data1) of {ok, U1} ->
not_started -> {reply, ok, St#{upload := U1}};
{keep_state, Data1, {reply, From, ok}}; {cont, U1} ->
{started, Data2} -> handle_write(St#{upload := U1});
case upload_part(Data2) of
{ok, Data3} ->
{next_state, upload_started, Data3, {reply, From, ok}};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data2}
end;
{error, _} = Error -> {error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data1} {stop, Error, Error, St}
end;
handle_write(upload_started, From, WriteData, Data0) ->
Data1 = append_buffer(Data0, WriteData),
case maybe_upload_part(Data1) of
{ok, Data2} ->
{keep_state, Data2, {reply, From, ok}};
{error, _} = Error ->
{stop_and_reply, Error, {reply, From, Error}, Data1}
end. end.
terminate(Reason, _State, #{client := Client, upload_id := UploadId, key := Key}) when -spec handle_cast(_Cast, data()) -> {noreply, data()}.
(UploadId =/= undefined) andalso (Reason =/= normal) handle_cast(_Cast, St) ->
-> {noreply, St}.
emqx_s3_client:abort_multipart(Client, Key, UploadId);
terminate(_Reason, _State, _Data) ->
ok.
code_change(_OldVsn, StateName, State, _Extra) -> -spec terminate(_Reason, data()) -> ok.
{ok, StateName, State}. terminate(normal, _St) ->
ok;
terminate({shutdown, _}, _St) ->
ok;
terminate(_Reason, #{upload := Upload}) ->
emqx_s3_upload:abort(Upload).
format_status(#{data := #{client := Client} = Data} = Status) -> format_status(#{state := State = #{upload := Upload}} = Status) ->
Status#{ StateRedacted = State#{upload := emqx_s3_upload:format(Upload)},
data => Data#{ Status#{state := StateRedacted}.
client => emqx_s3_client:format(Client),
buffer => [<<"...">>]
}
}.
format_status(_Opt, [PDict, State, #{client := Client} = Data]) ->
#{
data => Data#{
client => emqx_s3_client:format(Client),
buffer => [<<"...">>]
},
state => State,
pdict => PDict
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec maybe_start_upload(data()) -> not_started | {started, data()} | {error, term()}.
maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
start_upload(Data);
false ->
not_started
end.
-spec start_upload(data()) -> {started, data()} | {error, term()}.
start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) ->
case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of
{ok, UploadId} ->
NewData = Data#{upload_id => UploadId},
{started, NewData};
{error, _} = Error ->
Error
end.
-spec maybe_upload_part(data()) -> ok_or_error(data(), term()).
maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) ->
case BufferSize >= MinPartSize of
true ->
upload_part(Data);
false ->
{ok, Data}
end.
-spec upload_part(data()) -> ok_or_error(data(), term()).
upload_part(#{buffer_size := 0} = Data) ->
{ok, Data};
upload_part(
#{
client := Client,
key := Key,
upload_id := UploadId,
buffer := Buffer,
part_number := PartNumber,
etags := ETags
} = Data
) ->
case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of
{ok, ETag} ->
NewData = Data#{
buffer => [],
buffer_size => 0,
part_number => PartNumber + 1,
etags => [{PartNumber, ETag} | ETags]
},
{ok, NewData};
{error, _} = Error ->
Error
end.
-spec complete_upload(data()) -> ok_or_error(data(), term()).
complete_upload(
#{
client := Client,
key := Key,
upload_id := UploadId
} = Data0
) ->
case upload_part(Data0) of
{ok, #{etags := ETagsRev} = Data1} ->
ETags = lists:reverse(ETagsRev),
case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of
ok ->
{ok, Data1};
{error, _} = Error ->
Error
end;
{error, _} = Error ->
Error
end.
-spec abort_upload(data()) -> ok_or_error(term()).
abort_upload(
#{
client := Client,
key := Key,
upload_id := UploadId
}
) ->
case emqx_s3_client:abort_multipart(Client, Key, UploadId) of
ok ->
ok;
{error, _} = Error ->
Error
end.
-spec put_object(data()) -> ok_or_error(term()).
put_object(
#{
client := Client,
key := Key,
upload_opts := UploadOpts,
buffer := Buffer
}
) ->
case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of
ok ->
ok;
{error, _} = Error ->
Error
end.
-spec append_buffer(data(), iodata()) -> data().
append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) ->
Data#{
buffer => [Buffer, WriteData],
buffer_size => BufferSize + iolist_size(WriteData)
}.
-compile({inline, [wrap/1, unwrap/1]}). -compile({inline, [wrap/1, unwrap/1]}).
wrap(Data) -> wrap(Data) ->
fun() -> Data end. fun() -> Data end.
@ -331,8 +150,5 @@ wrap(Data) ->
unwrap(WrappedData) -> unwrap(WrappedData) ->
WrappedData(). WrappedData().
is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
BufferSize + iolist_size(WriteData) =< MaxPartSize.
client(Bucket, Config) -> client(Bucket, Config) ->
emqx_s3_client:create(Bucket, Config). emqx_s3_client:create(Bucket, Config).

View File

@ -5,19 +5,4 @@ config_connector.label:
config_connector.desc: config_connector.desc:
"""Configuration for a connector to S3 API compatible storage service.""" """Configuration for a connector to S3 API compatible storage service."""
s3_upload.label:
"""S3 Simple Upload"""
s3_upload.desc:
"""Action to upload a single object to the S3 service."""
s3_upload_parameters.label:
"""S3 Upload action parameters"""
s3_upload_parameters.desc:
"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content."""
s3_object_content.label:
"""S3 Object Content"""
s3_object_content.desc:
"""Content of the S3 object being uploaded. Supports templates."""
} }

View File

@ -0,0 +1,64 @@
emqx_bridge_s3_aggreg_upload {
s3_aggregated_upload.label:
"""S3 Aggregated Upload"""
s3_aggregated_upload.desc:
"""Action that enables time-based aggregation of incoming events and uploading them to the S3 service as a single object."""
s3_aggregated_upload_parameters.label:
"""S3 Aggregated Upload action parameters"""
s3_aggregated_upload_parameters.desc:
"""Set of parameters for the aggregated upload action."""
s3_aggregation.label:
"""Aggregation parameters"""
s3_aggregation.desc:
"""Set of parameters governing the aggregation process."""
s3_aggregation_interval.label:
"""Time interval"""
s3_aggregation_interval.desc:
"""Amount of time events will be aggregated in a single object before uploading."""
s3_aggregation_max_records.label:
"""Maximum number of records"""
s3_aggregation_max_records.desc:
"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.<br/>
If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key."""
s3_aggregated_container.label:
"""Container for aggregated events"""
s3_aggregated_container.desc:
"""Settings governing the file format of an upload containing aggregated events."""
s3_aggregated_container_csv.label:
"""CSV container"""
s3_aggregated_container_csv.desc:
"""Records (events) will be aggregated and uploaded as a CSV file."""
s3_aggregated_container_csv_column_order.label:
"""CSV column order"""
s3_aggregated_container_csv_column_order.desc:
"""Event fields that will be ordered first as columns in the resulting CSV file.<br/>
Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order."""
s3_aggregated_upload_key.label:
"""S3 object key template"""
s3_aggregated_upload_key.desc:
"""Template for the S3 object key of an aggregated upload.<br/>
Template may contain placeholders for the following variables:
<ul>
<li><code>${action}</code>: name of the action (required).<li/>
<li><code>${node}</code>: name of the EMQX node conducting the upload (required).<li/>
<li><code>${datetime.{format}}</code>: date and time when aggregation started, formatted according to the <code>{format}</code> string (required):
<ul>
<li><code>${datetime.rfc3339utc}</code>: RFC3339-formatted date and time in UTC,<li/>
<li><code>${datetime.rfc3339}</code>: RFC3339-formatted date and time in local timezone,<li/>
<li><code>${datetime.unix}</code>: Unix timestamp.<li/>
</ul>
<li/>
<li><code>${datetime_until.{format}}</code>: date and time when aggregation ended, with the same formatting options.<li/>
<li><code>${sequence}</code>: sequence number of the aggregated upload within the same time interval (required).<li/>
</ul>
All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template."""
}

View File

@ -0,0 +1,18 @@
emqx_bridge_s3_upload {
s3_upload.label:
"""S3 Simple Upload"""
s3_upload.desc:
"""Action to upload a single object to the S3 service."""
s3_upload_parameters.label:
"""S3 Upload action parameters"""
s3_upload_parameters.desc:
"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content."""
s3_object_content.label:
"""S3 Object Content"""
s3_object_content.desc:
"""Content of the S3 object being uploaded. Supports templates."""
}

View File

@ -18,6 +18,13 @@ key.desc:
key.label: key.label:
"""Object Key""" """Object Key"""
upload_headers.label:
"""S3 object HTTP headers"""
upload_headers.desc:
"""HTTP headers to include in the S3 object upload request.<br/>
Useful to specify content type, content encoding, etc. of the S3 object."""
host.desc: host.desc:
"""The host of the S3 endpoint.""" """The host of the S3 endpoint."""