diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index a8aaf9fdd..dc71d5806 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -118,7 +118,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_pulsar_action_info, emqx_bridge_greptimedb_action_info, emqx_bridge_tdengine_action_info, - emqx_bridge_s3_action_info + emqx_bridge_s3_upload_action_info, + emqx_bridge_s3_aggreg_upload_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index a6000067a..5cdf3fb82 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -10,9 +10,15 @@ emqx_s3 ]}, {env, [ - {emqx_action_info_modules, [emqx_bridge_s3_action_info]}, - {emqx_connector_info_modules, [emqx_bridge_s3_connector_info]} + {emqx_action_info_modules, [ + emqx_bridge_s3_upload_action_info, + emqx_bridge_s3_aggreg_upload_action_info + ]}, + {emqx_connector_info_modules, [ + emqx_bridge_s3_connector_info + ]} ]}, + {mod, {emqx_bridge_s3_app, []}}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl index 79cc560d2..49f033554 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl @@ -19,7 +19,6 @@ ]). -export([ - bridge_v2_examples/1, connector_examples/1 ]). @@ -39,58 +38,11 @@ fields(Field) when Field == "post_connector" -> emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config)); -fields(Field) when - Field == "get_bridge_v2"; - Field == "put_bridge_v2"; - Field == "post_bridge_v2" --> - emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); -fields(action) -> - {?ACTION, - hoconsc:mk( - hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), - #{ - desc => <<"S3 Action Config">>, - required => false - } - )}; fields("config_connector") -> emqx_connector_schema:common_fields() ++ fields(s3_connector_config); -fields(?ACTION) -> - emqx_bridge_v2_schema:make_producer_action_schema( - hoconsc:mk( - ?R_REF(s3_upload_parameters), - #{ - required => true, - desc => ?DESC(s3_upload) - } - ), - #{ - resource_opts_ref => ?R_REF(s3_action_resource_opts) - } - ); fields(s3_connector_config) -> emqx_s3_schema:fields(s3_client) ++ emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts); -fields(s3_upload_parameters) -> - emqx_s3_schema:fields(s3_upload) ++ - [ - {content, - hoconsc:mk( - emqx_schema:template(), - #{ - required => false, - default => <<"${.}">>, - desc => ?DESC(s3_object_content) - } - )} - ]; -fields(s3_action_resource_opts) -> - UnsupportedOpts = [batch_size, batch_time], - lists:filter( - fun({N, _}) -> not lists:member(N, UnsupportedOpts) end, - emqx_bridge_v2_schema:action_resource_opts_fields() - ); fields(s3_connector_resource_opts) -> CommonOpts = emqx_connector_schema:common_resource_opts_subfields(), lists:filter( @@ -100,14 +52,6 @@ fields(s3_connector_resource_opts) -> desc("config_connector") -> ?DESC(config_connector); -desc(?ACTION) -> - ?DESC(s3_upload); -desc(s3_upload) -> - ?DESC(s3_upload); -desc(s3_upload_parameters) -> - ?DESC(s3_upload_parameters); -desc(s3_action_resource_opts) -> - ?DESC(emqx_resource_schema, resource_opts); desc(s3_connector_resource_opts) -> ?DESC(emqx_resource_schema, resource_opts); desc(_Name) -> @@ -115,54 +59,6 @@ desc(_Name) -> %% Examples -bridge_v2_examples(Method) -> - [ - #{ - <<"s3">> => #{ - summary => <<"S3 Simple Upload">>, - value => action_example(Method) - } - } - ]. - -action_example(post) -> - maps:merge( - action_example(put), - #{ - type => atom_to_binary(?ACTION), - name => <<"my_s3_action">> - } - ); -action_example(get) -> - maps:merge( - action_example(put), - #{ - status => <<"connected">>, - node_status => [ - #{ - node => <<"emqx@localhost">>, - status => <<"connected">> - } - ] - } - ); -action_example(put) -> - #{ - enable => true, - connector => <<"my_s3_connector">>, - description => <<"My action">>, - parameters => #{ - bucket => <<"${clientid}">>, - key => <<"${topic}">>, - content => <<"${payload}">>, - acl => <<"public_read">> - }, - resource_opts => #{ - query_mode => <<"sync">>, - inflight_window => 10 - } - }. - connector_examples(Method) -> [ #{ diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl index 6d500d056..62a80d260 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl @@ -5,7 +5,12 @@ -ifndef(__EMQX_BRIDGE_S3_HRL__). -define(__EMQX_BRIDGE_S3_HRL__, true). --define(ACTION, s3). +%% Actions +-define(ACTION_UPLOAD, s3). +-define(BRIDGE_TYPE_UPLOAD, <<"s3">>). +-define(ACTION_AGGREGATED_UPLOAD, s3_aggregated_upload). +-define(BRIDGE_TYPE_AGGREGATED_UPLOAD, <<"s3_aggregated_upload">>). + -define(CONNECTOR, s3). -endif. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl new file mode 100644 index 000000000..503b864a7 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl @@ -0,0 +1,138 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% This module provides pretty stupid interface for writing and reading +%% Erlang terms to/from a file descriptor (i.e. IO device), with a goal +%% of being able to write and read terms in a streaming fashion, and to +%% survive partial corruption. +%% +%% Layout of the file is as follows: +%% ``` +%% ETF(Header { Metadata }) +%% ETF(Record1 ByteSize) +%% ETF(Record1) +%% ETF(Record2 ByteSize) +%% ETF(Record2) +%% ... +%% ``` +%% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`). +-module(emqx_bridge_s3_aggreg_buffer). + +-export([ + new_writer/2, + write/2, + takeover/1 +]). + +-export([ + new_reader/1, + read/1 +]). + +-export_type([writer/0, reader/0]). + +-record(reader, { + fd :: file:io_device() | eof, + buffer :: binary(), + hasread = 0 :: non_neg_integer() +}). + +-type writer() :: file:io_device(). +-type reader() :: #reader{}. + +%% + +-define(VSN, 1). +-define(HEADER(MD), [?MODULE, ?VSN, MD]). + +-define(READAHEAD_BYTES, 64 * 4096). +-define(SANE_TERM_SIZE, 256 * 1024 * 1024). + +%% + +-spec new_writer(file:io_device(), _Meta) -> writer(). +new_writer(FD, Meta) -> + %% TODO: Validate header is not too big? + Header = term_to_iovec(?HEADER(Meta)), + case file:write(FD, Header) of + ok -> + FD; + {error, Reason} -> + error({buffer_write_failed, Reason}) + end. + +-spec write(_Term, writer()) -> ok | {error, file:posix()}. +write(Term, FD) -> + IOData = term_to_iovec(Term), + Marker = term_to_binary(iolist_size(IOData)), + file:write(FD, [Marker | IOData]). + +%% + +-spec new_reader(file:io_device()) -> {_Meta, reader()}. +new_reader(FD) -> + Reader0 = #reader{fd = FD, buffer = <<>>}, + Reader1 = read_buffered(?READAHEAD_BYTES, Reader0), + case read_next_term(Reader1) of + {?HEADER(MD), Reader} -> + {MD, Reader}; + {UnexpectedHeader, _Reader} -> + error({buffer_unexpected_header, UnexpectedHeader}); + eof -> + error({buffer_incomplete, header}) + end. + +-spec read(reader()) -> {_Term, reader()} | eof. +read(Reader0) -> + case read_next_term(read_buffered(_LargeEnough = 16, Reader0)) of + {Size, Reader1} when is_integer(Size) andalso Size > 0 andalso Size < ?SANE_TERM_SIZE -> + case read_next_term(read_buffered(Size, Reader1)) of + {Term, Reader} -> + {Term, Reader}; + eof -> + error({buffer_incomplete, Size}) + end; + {UnexpectedSize, _Reader} -> + error({buffer_unexpected_record_size, UnexpectedSize}); + eof -> + eof + end. + +-spec takeover(reader()) -> writer(). +takeover(#reader{fd = FD, hasread = HasRead}) -> + case file:position(FD, HasRead) of + {ok, HasRead} -> + case file:truncate(FD) of + ok -> + FD; + {error, Reason} -> + error({buffer_takeover_failed, Reason}) + end; + {error, Reason} -> + error({buffer_takeover_failed, Reason}) + end. + +read_next_term(#reader{fd = eof, buffer = <<>>}) -> + eof; +read_next_term(Reader = #reader{buffer = Buffer, hasread = HasRead}) -> + {Term, UsedBytes} = erlang:binary_to_term(Buffer, [safe, used]), + BufferSize = byte_size(Buffer), + BufferLeft = binary:part(Buffer, UsedBytes, BufferSize - UsedBytes), + {Term, Reader#reader{buffer = BufferLeft, hasread = HasRead + UsedBytes}}. + +read_buffered(_Size, Reader = #reader{fd = eof}) -> + Reader; +read_buffered(Size, Reader = #reader{fd = FD, buffer = Buffer0}) -> + BufferSize = byte_size(Buffer0), + ReadSize = erlang:max(Size, ?READAHEAD_BYTES), + case BufferSize < Size andalso file:read(FD, ReadSize) of + false -> + Reader; + {ok, Data} -> + Reader#reader{buffer = <>}; + eof -> + Reader#reader{fd = eof}; + {error, Reason} -> + error({buffer_read_failed, Reason}) + end. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl new file mode 100644 index 000000000..33895d8c1 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl @@ -0,0 +1,105 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% CSV container implementation for `emqx_bridge_s3_aggregator`. +-module(emqx_bridge_s3_aggreg_csv). + +%% Container API +-export([ + new/1, + fill/2, + close/1 +]). + +-export_type([container/0]). + +-record(csv, { + columns :: [binary()] | undefined, + column_order :: [binary()], + %% A string or character that separates each field in a record from the next. + %% Default: "," + field_separator :: char() | iodata(), + %% A string or character that delimits boundaries of a record. + %% Default: "\n" + record_delimiter :: char() | iodata(), + quoting_mp :: _ReMP +}). + +-type container() :: #csv{}. + +-type options() :: #{ + %% Which columns have to be ordered first in the resulting CSV? + column_order => [column()] +}. + +-type record() :: emqx_bridge_s3_aggregator:record(). +-type column() :: binary(). + +%% + +-spec new(options()) -> container(). +new(Opts) -> + {ok, MP} = re:compile("[\\[\\],\\r\\n\"]", [unicode]), + #csv{ + column_order = maps:get(column_order, Opts, []), + field_separator = $,, + record_delimiter = $\n, + quoting_mp = MP + }. + +-spec fill([record()], container()) -> {iodata(), container()}. +fill(Records = [Record | _], CSV0 = #csv{columns = undefined}) -> + Columns = mk_columns(Record, CSV0), + Header = emit_header(Columns, CSV0), + {Writes, CSV} = fill(Records, CSV0#csv{columns = Columns}), + {[Header | Writes], CSV}; +fill(Records, CSV) -> + Writes = [emit_row(R, CSV) || R <- Records], + {Writes, CSV}. + +-spec close(container()) -> iodata(). +close(#csv{}) -> + []. + +%% + +mk_columns(Record, #csv{column_order = ColumnOrder}) -> + Columns = [emqx_utils_conv:bin(C) || C <- lists:sort(maps:keys(Record))], + Unoredered = Columns -- ColumnOrder, + ColumnOrder ++ Unoredered. + +-spec emit_header([column()], container()) -> iodata(). +emit_header([C], #csv{record_delimiter = Delim}) -> + [C, Delim]; +emit_header([C | Rest], CSV = #csv{field_separator = Sep}) -> + [C, Sep | emit_header(Rest, CSV)]; +emit_header([], #csv{record_delimiter = Delim}) -> + [Delim]. + +-spec emit_row(record(), container()) -> iodata(). +emit_row(Record, CSV = #csv{columns = Columns}) -> + emit_row(Record, Columns, CSV). + +emit_row(Record, [C], CSV = #csv{record_delimiter = Delim}) -> + [emit_cell(C, Record, CSV), Delim]; +emit_row(Record, [C | Rest], CSV = #csv{field_separator = Sep}) -> + [emit_cell(C, Record, CSV), Sep | emit_row(Record, Rest, CSV)]; +emit_row(#{}, [], #csv{record_delimiter = Delim}) -> + [Delim]. + +emit_cell(Column, Record, CSV) -> + case emqx_template:lookup(Column, Record) of + {ok, Value} -> + encode_cell(emqx_template:to_string(Value), CSV); + {error, undefined} -> + _Empty = "" + end. + +encode_cell(V, #csv{quoting_mp = MP}) -> + case re:run(V, MP, []) of + nomatch -> + V; + _ -> + [$", re:replace(V, <<"\"">>, <<"\"\"">>, [global, unicode]), $"] + end. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl new file mode 100644 index 000000000..02099dbec --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl @@ -0,0 +1,212 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% This module takes aggregated records from a buffer and delivers them to S3, +%% wrapped in a configurable container (though currently there's only CSV). +-module(emqx_bridge_s3_aggreg_delivery). + +-include_lib("snabbkaffe/include/trace.hrl"). +-include("emqx_bridge_s3_aggregator.hrl"). + +-export([start_link/3]). + +%% Internal exports +-export([ + init/4, + loop/3 +]). + +-behaviour(emqx_template). +-export([lookup/2]). + +%% Sys +-export([ + system_continue/3, + system_terminate/4, + format_status/2 +]). + +-record(delivery, { + name :: _Name, + container :: emqx_bridge_s3_aggreg_csv:container(), + reader :: emqx_bridge_s3_aggreg_buffer:reader(), + upload :: emqx_s3_upload:t(), + empty :: boolean() +}). + +-type state() :: #delivery{}. + +%% + +start_link(Name, Buffer, Opts) -> + proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). + +%% + +-spec init(pid(), _Name, buffer(), _Opts :: map()) -> no_return(). +init(Parent, Name, Buffer, Opts) -> + ?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}), + Reader = open_buffer(Buffer), + Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), + _ = erlang:process_flag(trap_exit, true), + ok = proc_lib:init_ack({ok, self()}), + loop(Delivery, Parent, []). + +init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) -> + #delivery{ + name = Name, + container = mk_container(ContainerOpts), + reader = Reader, + upload = mk_upload(Buffer, Opts), + empty = true + }. + +open_buffer(#buffer{filename = Filename}) -> + case file:open(Filename, [read, binary, raw]) of + {ok, FD} -> + {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD), + Reader; + {error, Reason} -> + error({buffer_open_failed, Reason}) + end. + +mk_container(#{type := csv, column_order := OrderOpt}) -> + %% TODO: Deduplicate? + ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt), + emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}). + +mk_upload( + Buffer, + Opts = #{ + bucket := Bucket, + upload_options := UploadOpts, + client_config := Config, + uploader_config := UploaderConfig + } +) -> + Client = emqx_s3_client:create(Bucket, Config), + Key = mk_object_key(Buffer, Opts), + emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). + +mk_object_key(Buffer, #{action := Name, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}). + +%% + +-spec loop(state(), pid(), [sys:debug_option()]) -> no_return(). +loop(Delivery, Parent, Debug) -> + %% NOTE: This function is mocked in tests. + receive + Msg -> handle_msg(Msg, Delivery, Parent, Debug) + after 0 -> + process_delivery(Delivery, Parent, Debug) + end. + +process_delivery(Delivery0 = #delivery{reader = Reader0}, Parent, Debug) -> + case emqx_bridge_s3_aggreg_buffer:read(Reader0) of + {Records = [#{} | _], Reader} -> + Delivery1 = Delivery0#delivery{reader = Reader}, + Delivery2 = process_append_records(Records, Delivery1), + Delivery = process_write(Delivery2), + loop(Delivery, Parent, Debug); + {[], Reader} -> + Delivery = Delivery0#delivery{reader = Reader}, + loop(Delivery, Parent, Debug); + eof -> + process_complete(Delivery0); + {Unexpected, _Reader} -> + exit({buffer_unexpected_record, Unexpected}) + end. + +process_append_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) -> + {Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0), + {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), + Delivery#delivery{ + container = Container, + upload = Upload, + empty = false + }. + +process_write(Delivery = #delivery{upload = Upload0}) -> + case emqx_s3_upload:write(Upload0) of + {ok, Upload} -> + Delivery#delivery{upload = Upload}; + {cont, Upload} -> + process_write(Delivery#delivery{upload = Upload}); + {error, Reason} -> + _ = emqx_s3_upload:abort(Upload0), + exit({upload_failed, Reason}) + end. + +process_complete(#delivery{name = Name, empty = true}) -> + ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}), + exit({shutdown, {skipped, empty}}); +process_complete(#delivery{name = Name, container = Container, upload = Upload0}) -> + Trailer = emqx_bridge_s3_aggreg_csv:close(Container), + {ok, Upload} = emqx_s3_upload:append(Trailer, Upload0), + case emqx_s3_upload:complete(Upload) of + {ok, Completed} -> + ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}), + ok; + {error, Reason} -> + _ = emqx_s3_upload:abort(Upload), + exit({upload_failed, Reason}) + end. + +%% + +handle_msg({system, From, Msg}, Delivery, Parent, Debug) -> + sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, Delivery); +handle_msg({'EXIT', Parent, Reason}, Delivery, Parent, Debug) -> + system_terminate(Reason, Parent, Debug, Delivery); +handle_msg(_Msg, Delivery, Parent, Debug) -> + loop(Parent, Debug, Delivery). + +-spec system_continue(pid(), [sys:debug_option()], state()) -> no_return(). +system_continue(Parent, Debug, Delivery) -> + loop(Delivery, Parent, Debug). + +-spec system_terminate(_Reason, pid(), [sys:debug_option()], state()) -> _. +system_terminate(_Reason, _Parent, _Debug, #delivery{upload = Upload}) -> + emqx_s3_upload:abort(Upload). + +-spec format_status(normal, Args :: [term()]) -> _StateFormatted. +format_status(_Normal, [_PDict, _SysState, _Parent, _Debug, Delivery]) -> + Delivery#delivery{ + upload = emqx_s3_upload:format(Delivery#delivery.upload) + }. + +%% + +-spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> + {ok, integer() | string()} | {error, undefined}. +lookup([<<"action">>], {Name, _Buffer}) -> + {ok, mk_fs_safe_string(Name)}; +lookup(Accessor, {_Name, Buffer = #buffer{}}) -> + lookup_buffer_var(Accessor, Buffer); +lookup(_Accessor, _Context) -> + {error, undefined}. + +lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) -> + {ok, format_timestamp(Since, Format)}; +lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) -> + {ok, format_timestamp(Until, Format)}; +lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) -> + {ok, Seq}; +lookup_buffer_var([<<"node">>], #buffer{}) -> + {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; +lookup_buffer_var(_Binding, _Context) -> + {error, undefined}. + +format_timestamp(Timestamp, <<"rfc3339utc">>) -> + String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]), + mk_fs_safe_string(String); +format_timestamp(Timestamp, <<"rfc3339">>) -> + String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]), + mk_fs_safe_string(String); +format_timestamp(Timestamp, <<"unix">>) -> + Timestamp. + +mk_fs_safe_string(String) -> + unicode:characters_to_binary(string:replace(String, ":", "_", all)). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl new file mode 100644 index 000000000..02d43f4f0 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl @@ -0,0 +1,275 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_s3.hrl"). + +-define(ACTION, ?ACTION_AGGREGATED_UPLOAD). + +-define(DEFAULT_BATCH_SIZE, 100). +-define(DEFAULT_BATCH_TIME, <<"10ms">>). + +-behaviour(hocon_schema). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% Interpreting options +-export([ + mk_key_template/1, + mk_upload_options/1 +]). + +%% emqx_bridge_v2_schema API +-export([bridge_v2_examples/1]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_s3". + +roots() -> + []. + +fields(Field) when + Field == "get_bridge_v2"; + Field == "put_bridge_v2"; + Field == "post_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); +fields(action) -> + {?ACTION, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), + #{ + desc => <<"S3 Aggregated Upload Action Config">>, + required => false + } + )}; +fields(?ACTION) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + ?R_REF(s3_aggregated_upload_parameters), + #{ + required => true, + desc => ?DESC(s3_aggregated_upload_parameters) + } + ), + #{ + resource_opts_ref => ?R_REF(s3_aggreg_upload_resource_opts) + } + ); +fields(s3_aggregated_upload_parameters) -> + lists:append([ + [ + {container, + hoconsc:mk( + %% TODO: Support selectors once there are more than one container. + hoconsc:union(fun + (all_union_members) -> [?REF(s3_aggregated_container_csv)]; + ({value, _Valur}) -> [?REF(s3_aggregated_container_csv)] + end), + #{ + required => true, + default => #{<<"type">> => <<"csv">>}, + desc => ?DESC(s3_aggregated_container) + } + )}, + {aggregation, + hoconsc:mk( + ?REF(s3_aggregation), + #{ + required => true, + desc => ?DESC(s3_aggregation) + } + )} + ], + emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ + {key, #{desc => ?DESC(s3_aggregated_upload_key)}} + ]), + emqx_s3_schema:fields(s3_uploader) + ]); +fields(s3_aggregated_container_csv) -> + [ + {type, + hoconsc:mk( + csv, + #{ + required => true, + desc => ?DESC(s3_aggregated_container_csv) + } + )}, + {column_order, + hoconsc:mk( + hoconsc:array(string()), + #{ + required => false, + default => [], + desc => ?DESC(s3_aggregated_container_csv_column_order) + } + )} + ]; +fields(s3_aggregation) -> + [ + %% TODO: Needs bucketing? (e.g. messages falling in this 1h interval) + {time_interval, + hoconsc:mk( + emqx_schema:duration_s(), + #{ + required => false, + default => <<"1h">>, + desc => ?DESC(s3_aggregation_interval) + } + )}, + {max_records, + hoconsc:mk( + pos_integer(), + #{ + required => false, + default => <<"1000000">>, + desc => ?DESC(s3_aggregation_max_records) + } + )} + ]; +fields(s3_aggreg_upload_resource_opts) -> + %% NOTE: This action should benefit from generous batching defaults. + emqx_bridge_v2_schema:action_resource_opts_fields([ + {batch_size, #{default => ?DEFAULT_BATCH_SIZE}}, + {batch_time, #{default => ?DEFAULT_BATCH_TIME}} + ]). + +desc(Name) when + Name == s3_aggregated_upload; + Name == s3_aggregated_upload_parameters; + Name == s3_aggregation; + Name == s3_aggregated_container_csv +-> + ?DESC(Name); +desc(s3_aggreg_upload_resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +%% Interpreting options + +-spec mk_key_template(_Parameters :: map()) -> emqx_template:str(). +mk_key_template(#{key := Key}) -> + Template = emqx_template:parse(Key), + {_, BindingErrors} = emqx_template:render(Template, #{}), + {UsedBindings, _} = lists:unzip(BindingErrors), + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + Template; + false -> + Template ++ SuffixTemplate + end. + +mk_suffix_template(UsedBindings) -> + RequiredBindings = ["action", "node", "datetime.", "sequence"], + SuffixBindings = [ + mk_default_binding(RB) + || RB <- RequiredBindings, + lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings) + ], + SuffixTemplate = [["/", B] || B <- SuffixBindings], + emqx_template:parse(SuffixTemplate). + +mk_default_binding("datetime.") -> + "${datetime.rfc3339utc}"; +mk_default_binding(Binding) -> + "${" ++ Binding ++ "}". + +-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options(). +mk_upload_options(Parameters) -> + Headers = mk_upload_headers(Parameters), + #{ + headers => Headers, + acl => maps:get(acl, Parameters, undefined) + }. + +mk_upload_headers(Parameters = #{container := Container}) -> + Headers = normalize_headers(maps:get(headers, Parameters, #{})), + ContainerHeaders = mk_container_headers(Container), + maps:merge(ContainerHeaders, Headers). + +normalize_headers(Headers) -> + maps:fold( + fun(Header, Value, Acc) -> + maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc) + end, + #{}, + Headers + ). + +mk_container_headers(#{type := csv}) -> + #{"content-type" => "text/csv"}; +mk_container_headers(#{}) -> + #{}. + +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"s3_aggregated_upload">> => #{ + summary => <<"S3 Aggregated Upload">>, + value => s3_action_example(Method) + } + } + ]. + +s3_action_example(post) -> + maps:merge( + s3_action_example(put), + #{ + type => atom_to_binary(?ACTION_UPLOAD), + name => <<"my_s3_action">> + } + ); +s3_action_example(get) -> + maps:merge( + s3_action_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +s3_action_example(put) -> + #{ + enable => true, + connector => <<"my_s3_connector">>, + description => <<"My action">>, + parameters => #{ + bucket => <<"mqtt-aggregated">>, + key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>, + acl => <<"public_read">>, + aggregation => #{ + time_interval => <<"15m">>, + max_records => 100_000 + }, + <<"container">> => #{ + type => <<"csv">>, + column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>] + } + }, + resource_opts => #{ + health_check_interval => <<"10s">>, + query_mode => <<"async">>, + inflight_window => 100 + } + }. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl new file mode 100644 index 000000000..b179073e5 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl @@ -0,0 +1,21 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload_action_info). + +-behaviour(emqx_action_info). + +-include("emqx_bridge_s3.hrl"). + +-export([ + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +action_type_name() -> ?ACTION_AGGREGATED_UPLOAD. + +connector_type_name() -> s3. + +schema_module() -> emqx_bridge_s3_aggreg_upload. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl new file mode 100644 index 000000000..973187b7e --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl @@ -0,0 +1,72 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload_sup). + +-export([ + start_link/3, + start_link_delivery_sup/2 +]). + +-export([ + start_delivery/2, + start_delivery_proc/3 +]). + +-behaviour(supervisor). +-export([init/1]). + +-define(SUPREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}). + +%% + +start_link(Name, AggregOpts, DeliveryOpts) -> + supervisor:start_link(?MODULE, {root, Name, AggregOpts, DeliveryOpts}). + +start_link_delivery_sup(Name, DeliveryOpts) -> + supervisor:start_link(?SUPREF(Name), ?MODULE, {delivery, Name, DeliveryOpts}). + +%% + +start_delivery(Name, Buffer) -> + supervisor:start_child(?SUPREF(Name), [Buffer]). + +start_delivery_proc(Name, DeliveryOpts, Buffer) -> + emqx_bridge_s3_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts). + +%% + +init({root, Name, AggregOpts, DeliveryOpts}) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 5 + }, + AggregatorChildSpec = #{ + id => aggregator, + start => {emqx_bridge_s3_aggregator, start_link, [Name, AggregOpts]}, + type => worker, + restart => permanent + }, + DeliverySupChildSpec = #{ + id => delivery_sup, + start => {?MODULE, start_link_delivery_sup, [Name, DeliveryOpts]}, + type => supervisor, + restart => permanent + }, + {ok, {SupFlags, [DeliverySupChildSpec, AggregatorChildSpec]}}; +init({delivery, Name, DeliveryOpts}) -> + SupFlags = #{ + strategy => simple_one_for_one, + intensity => 100, + period => 5 + }, + ChildSpec = #{ + id => delivery, + start => {?MODULE, start_delivery_proc, [Name, DeliveryOpts]}, + type => worker, + restart => temporary, + shutdown => 1000 + }, + {ok, {SupFlags, [ChildSpec]}}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl new file mode 100644 index 000000000..47ecdeb4a --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl @@ -0,0 +1,486 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% This module manages buffers for aggregating records and offloads them +%% to separate "delivery" processes when they are full or time interval +%% is over. +-module(emqx_bridge_s3_aggregator). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +-include("emqx_bridge_s3_aggregator.hrl"). + +-export([ + start_link/2, + push_records/3, + tick/2, + take_error/1 +]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-export_type([ + record/0, + timestamp/0 +]). + +%% Record. +-type record() :: #{binary() => _}. + +%% Unix timestamp, seconds since epoch. +-type timestamp() :: _Seconds :: non_neg_integer(). + +%% + +-define(VSN, 1). +-define(SRVREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}). + +%% + +start_link(Name, Opts) -> + gen_server:start_link(?SRVREF(Name), ?MODULE, mk_state(Name, Opts), []). + +push_records(Name, Timestamp, Records = [_ | _]) -> + %% FIXME: Error feedback. + case pick_buffer(Name, Timestamp) of + undefined -> + BufferNext = next_buffer(Name, Timestamp), + write_records_limited(Name, BufferNext, Records); + Buffer -> + write_records_limited(Name, Buffer, Records) + end; +push_records(_Name, _Timestamp, []) -> + ok. + +tick(Name, Timestamp) -> + case pick_buffer(Name, Timestamp) of + #buffer{} -> + ok; + _Outdated -> + send_close_buffer(Name, Timestamp) + end. + +take_error(Name) -> + gen_server:call(?SRVREF(Name), take_error). + +%% + +write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) -> + write_records(Name, Buffer, Records); +write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records) -> + NR = length(Records), + case inc_num_records(Buffer, NR) of + NR -> + %% NOTE: Allow unconditionally if it's the first write. + write_records(Name, Buffer, Records); + NWritten when NWritten > MaxRecords -> + NextBuffer = rotate_buffer(Name, Buffer), + write_records_limited(Name, NextBuffer, Records); + _ -> + write_records(Name, Buffer, Records) + end. + +write_records(Name, Buffer = #buffer{fd = Writer}, Records) -> + case emqx_bridge_s3_aggreg_buffer:write(Records, Writer) of + ok -> + ?tp(s3_aggreg_records_written, #{action => Name, records => Records}), + ok; + {error, terminated} -> + BufferNext = rotate_buffer(Name, Buffer), + write_records(Name, BufferNext, Records); + {error, _} = Error -> + Error + end. + +inc_num_records(#buffer{cnt_records = Counter}, Size) -> + inc_counter(Counter, Size). + +next_buffer(Name, Timestamp) -> + gen_server:call(?SRVREF(Name), {next_buffer, Timestamp}). + +rotate_buffer(Name, #buffer{fd = FD}) -> + gen_server:call(?SRVREF(Name), {rotate_buffer, FD}). + +send_close_buffer(Name, Timestamp) -> + gen_server:cast(?SRVREF(Name), {close_buffer, Timestamp}). + +%% + +-record(st, { + name :: _Name, + tab :: ets:tid() | undefined, + buffer :: buffer() | undefined, + queued :: buffer() | undefined, + deliveries = #{} :: #{reference() => buffer()}, + errors = queue:new() :: queue:queue(_Error), + interval :: emqx_schema:duration_s(), + max_records :: pos_integer(), + work_dir :: file:filename() +}). + +-type state() :: #st{}. + +mk_state(Name, Opts) -> + Interval = maps:get(time_interval, Opts), + MaxRecords = maps:get(max_records, Opts), + WorkDir = maps:get(work_dir, Opts), + ok = ensure_workdir(WorkDir), + #st{ + name = Name, + interval = Interval, + max_records = MaxRecords, + work_dir = WorkDir + }. + +ensure_workdir(WorkDir) -> + %% NOTE + %% Writing MANIFEST as a means to ensure the work directory is writable. It's not + %% (yet) read back because there's only one version of the implementation. + ok = filelib:ensure_path(WorkDir), + ok = write_manifest(WorkDir). + +write_manifest(WorkDir) -> + Manifest = #{<<"version">> => ?VSN}, + file:write_file(filename:join(WorkDir, "MANIFEST"), hocon_pp:do(Manifest, #{})). + +%% + +-spec init(state()) -> {ok, state()}. +init(St0 = #st{name = Name}) -> + _ = erlang:process_flag(trap_exit, true), + St1 = St0#st{tab = create_tab(Name)}, + St = recover(St1), + _ = announce_current_buffer(St), + {ok, St}. + +handle_call({next_buffer, Timestamp}, _From, St0) -> + St = #st{buffer = Buffer} = handle_next_buffer(Timestamp, St0), + {reply, Buffer, St, 0}; +handle_call({rotate_buffer, FD}, _From, St0) -> + St = #st{buffer = Buffer} = handle_rotate_buffer(FD, St0), + {reply, Buffer, St, 0}; +handle_call(take_error, _From, St0) -> + {MaybeError, St} = handle_take_error(St0), + {reply, MaybeError, St}. + +handle_cast({close_buffer, Timestamp}, St) -> + {noreply, handle_close_buffer(Timestamp, St)}; +handle_cast(_Cast, St) -> + {noreply, St}. + +handle_info(timeout, St) -> + {noreply, handle_queued_buffer(St)}; +handle_info({'DOWN', MRef, _, Pid, Reason}, St0 = #st{name = Name, deliveries = Ds0}) -> + case maps:take(MRef, Ds0) of + {Buffer, Ds} -> + St = St0#st{deliveries = Ds}, + {noreply, handle_delivery_exit(Buffer, Reason, St)}; + error -> + ?SLOG(notice, #{ + msg => "unexpected_down_signal", + action => Name, + pid => Pid, + reason => Reason + }), + {noreply, St0} + end; +handle_info(_Msg, St) -> + {noreply, St}. + +terminate(_Reason, #st{name = Name}) -> + cleanup_tab(Name). + +%% + +handle_next_buffer(Timestamp, St = #st{buffer = #buffer{until = Until}}) when Timestamp < Until -> + St; +handle_next_buffer(Timestamp, St0 = #st{buffer = Buffer = #buffer{since = PrevSince}}) -> + BufferClosed = close_buffer(Buffer), + St = enqueue_closed_buffer(BufferClosed, St0), + handle_next_buffer(Timestamp, PrevSince, St); +handle_next_buffer(Timestamp, St = #st{buffer = undefined}) -> + handle_next_buffer(Timestamp, Timestamp, St). + +handle_next_buffer(Timestamp, PrevSince, St0) -> + NextBuffer = allocate_next_buffer(Timestamp, PrevSince, St0), + St = St0#st{buffer = NextBuffer}, + _ = announce_current_buffer(St), + St. + +handle_rotate_buffer( + FD, + St0 = #st{buffer = Buffer = #buffer{since = Since, seq = Seq, fd = FD}} +) -> + BufferClosed = close_buffer(Buffer), + NextBuffer = allocate_buffer(Since, Seq + 1, St0), + St = enqueue_closed_buffer(BufferClosed, St0#st{buffer = NextBuffer}), + _ = announce_current_buffer(St), + St; +handle_rotate_buffer(_ClosedFD, St) -> + St. + +enqueue_closed_buffer(Buffer, St = #st{queued = undefined}) -> + St#st{queued = Buffer}; +enqueue_closed_buffer(Buffer, St0) -> + %% NOTE: Should never really happen unless interval / max records are too tight. + St = handle_queued_buffer(St0), + St#st{queued = Buffer}. + +handle_queued_buffer(St = #st{queued = undefined}) -> + St; +handle_queued_buffer(St = #st{queued = Buffer}) -> + enqueue_delivery(Buffer, St#st{queued = undefined}). + +allocate_next_buffer(Timestamp, PrevSince, St = #st{interval = Interval}) -> + Since = compute_since(Timestamp, PrevSince, Interval), + allocate_buffer(Since, 0, St). + +compute_since(Timestamp, PrevSince, Interval) -> + Timestamp - (Timestamp - PrevSince) rem Interval. + +allocate_buffer(Since, Seq, St = #st{name = Name}) -> + Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St), + {ok, FD} = file:open(Filename, [write, binary]), + Writer = emqx_bridge_s3_aggreg_buffer:new_writer(FD, _Meta = []), + _ = add_counter(Counter), + ?tp(s3_aggreg_buffer_allocated, #{action => Name, filename => Filename}), + Buffer#buffer{fd = Writer}. + +recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) -> + {ok, FD} = file:open(Filename, [read, write, binary]), + case recover_buffer_writer(FD, Filename) of + {ok, Writer, NWritten} -> + _ = add_counter(Counter, NWritten), + Buffer#buffer{fd = Writer}; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "existing_buffer_recovery_failed", + filename => Filename, + reason => Reason, + details => "Buffer is corrupted beyond repair, will be discarded." + }), + _ = file:close(FD), + _ = file:delete(Filename), + undefined + end. + +recover_buffer_writer(FD, Filename) -> + try emqx_bridge_s3_aggreg_buffer:new_reader(FD) of + {_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0) + catch + error:Reason -> + {error, Reason} + end. + +recover_buffer_writer(FD, Filename, Reader0, NWritten) -> + try emqx_bridge_s3_aggreg_buffer:read(Reader0) of + {Records, Reader} when is_list(Records) -> + recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records)); + {Unexpected, _Reader} -> + %% Buffer is corrupted, should be discarded. + {error, {buffer_unexpected_record, Unexpected}}; + eof -> + %% Buffer is fine, continue writing at the end. + {ok, FD, NWritten} + catch + error:Reason -> + %% Buffer is truncated or corrupted somewhere in the middle. + %% Continue writing after the last valid record. + ?SLOG(warning, #{ + msg => "existing_buffer_recovered_partially", + filename => Filename, + reason => Reason, + details => + "Buffer is truncated or corrupted somewhere in the middle. " + "Corrupted records will be discarded." + }), + Writer = emqx_bridge_s3_aggreg_buffer:takeover(Reader0), + {ok, Writer, NWritten} + end. + +mk_buffer( + Since, + Seq, + #st{tab = Tab, interval = Interval, max_records = MaxRecords, work_dir = WorkDir} +) -> + Name = mk_filename(Since, Seq), + Counter = {Tab, {Since, Seq}}, + #buffer{ + since = Since, + until = Since + Interval, + seq = Seq, + filename = filename:join(WorkDir, Name), + max_records = MaxRecords, + cnt_records = Counter + }. + +handle_close_buffer( + Timestamp, + St0 = #st{buffer = Buffer = #buffer{until = Until}} +) when Timestamp >= Until -> + St = St0#st{buffer = undefined}, + _ = announce_current_buffer(St), + enqueue_delivery(close_buffer(Buffer), St); +handle_close_buffer(_Timestamp, St = #st{buffer = undefined}) -> + St. + +close_buffer(Buffer = #buffer{fd = FD}) -> + ok = file:close(FD), + Buffer#buffer{fd = undefined}. + +discard_buffer(#buffer{filename = Filename, cnt_records = Counter}) -> + %% NOTE: Hopefully, no process is touching this counter anymore. + _ = del_counter(Counter), + file:delete(Filename). + +pick_buffer(Name, Timestamp) -> + case lookup_current_buffer(Name) of + #buffer{until = Until} = Buffer when Timestamp < Until -> + Buffer; + #buffer{since = Since} when Timestamp < Since -> + %% TODO: Support timestamps going back. + error({invalid_timestamp, Timestamp}); + _Outdated -> + undefined + end. + +announce_current_buffer(#st{tab = Tab, buffer = Buffer}) -> + ets:insert(Tab, {buffer, Buffer}). + +lookup_current_buffer(Name) -> + ets:lookup_element(lookup_tab(Name), buffer, 2). + +%% + +enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) -> + {ok, Pid} = emqx_bridge_s3_aggreg_upload_sup:start_delivery(Name, Buffer), + MRef = erlang:monitor(process, Pid), + St#st{deliveries = Ds#{MRef => Buffer}}. + +handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when + Normal == normal; Normal == noproc +-> + ?SLOG(debug, #{ + msg => "aggregated_buffer_delivery_completed", + action => Name, + buffer => Buffer#buffer.filename + }), + ok = discard_buffer(Buffer), + St; +handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name}) -> + ?SLOG(info, #{ + msg => "aggregated_buffer_delivery_skipped", + action => Name, + buffer => {Buffer#buffer.since, Buffer#buffer.seq}, + reason => Reason + }), + ok = discard_buffer(Buffer), + St; +handle_delivery_exit(Buffer, Error, St = #st{name = Name}) -> + ?SLOG(error, #{ + msg => "aggregated_buffer_delivery_failed", + action => Name, + buffer => {Buffer#buffer.since, Buffer#buffer.seq}, + filename => Buffer#buffer.filename, + reason => Error + }), + %% TODO: Retries? + enqueue_status_error(Error, St). + +enqueue_status_error({upload_failed, Error}, St = #st{errors = QErrors}) -> + %% TODO + %% This code feels too specific, errors probably need classification. + St#st{errors = queue:in(Error, QErrors)}; +enqueue_status_error(_AnotherError, St) -> + St. + +handle_take_error(St = #st{errors = QErrors0}) -> + case queue:out(QErrors0) of + {{value, Error}, QErrors} -> + {[Error], St#st{errors = QErrors}}; + {empty, QErrors} -> + {[], St#st{errors = QErrors}} + end. + +%% + +recover(St0 = #st{work_dir = WorkDir}) -> + {ok, Filenames} = file:list_dir(WorkDir), + ExistingBuffers = lists:flatmap(fun(FN) -> read_existing_file(FN, St0) end, Filenames), + case lists:reverse(lists:keysort(#buffer.since, ExistingBuffers)) of + [Buffer | ClosedBuffers] -> + St = lists:foldl(fun enqueue_delivery/2, St0, ClosedBuffers), + St#st{buffer = recover_buffer(Buffer)}; + [] -> + St0 + end. + +read_existing_file("MANIFEST", _St) -> + []; +read_existing_file(Name, St) -> + case parse_filename(Name) of + {Since, Seq} -> + [read_existing_buffer(Since, Seq, Name, St)]; + error -> + %% TODO: log? + [] + end. + +read_existing_buffer(Since, Seq, Name, St = #st{work_dir = WorkDir}) -> + Filename = filename:join(WorkDir, Name), + Buffer = mk_buffer(Since, Seq, St), + Buffer#buffer{filename = Filename}. + +%% + +mk_filename(Since, Seq) -> + "T" ++ integer_to_list(Since) ++ "_" ++ pad_number(Seq, 4). + +parse_filename(Filename) -> + case re:run(Filename, "^T(\\d+)_(\\d+)$", [{capture, all_but_first, list}]) of + {match, [Since, Seq]} -> + {list_to_integer(Since), list_to_integer(Seq)}; + nomatch -> + error + end. + +%% + +add_counter({Tab, Counter}) -> + add_counter({Tab, Counter}, 0). + +add_counter({Tab, Counter}, N) -> + ets:insert(Tab, {Counter, N}). + +inc_counter({Tab, Counter}, Size) -> + ets:update_counter(Tab, Counter, {2, Size}). + +del_counter({Tab, Counter}) -> + ets:delete(Tab, Counter). + +%% + +create_tab(Name) -> + Tab = ets:new(?MODULE, [public, set, {write_concurrency, auto}]), + ok = persistent_term:put({?MODULE, Name}, Tab), + Tab. + +lookup_tab(Name) -> + persistent_term:get({?MODULE, Name}). + +cleanup_tab(Name) -> + persistent_term:erase({?MODULE, Name}). + +%% + +pad_number(I, L) -> + string:pad(integer_to_list(I), L, leading, $0). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl new file mode 100644 index 000000000..7ac62c6b5 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl @@ -0,0 +1,15 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-record(buffer, { + since :: emqx_bridge_s3_aggregator:timestamp(), + until :: emqx_bridge_s3_aggregator:timestamp(), + seq :: non_neg_integer(), + filename :: file:filename(), + fd :: file:io_device() | undefined, + max_records :: pos_integer() | undefined, + cnt_records :: {ets:tab(), _Counter} | undefined +}). + +-type buffer() :: #buffer{}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl new file mode 100644 index 000000000..e5b77f9d6 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl @@ -0,0 +1,16 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_app). + +-behaviour(application). +-export([start/2, stop/1]). + +%% + +start(_StartType, _StartArgs) -> + emqx_bridge_s3_sup:start_link(). + +stop(_State) -> + ok. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 5d3ed19f8..d135a087a 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -7,6 +7,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include("emqx_bridge_s3.hrl"). -behaviour(emqx_resource). -export([ @@ -17,7 +18,7 @@ on_remove_channel/3, on_get_channels/1, on_query/3, - % on_batch_query/3, + on_batch_query/3, on_get_status/2, on_get_channel_status/3 ]). @@ -31,12 +32,31 @@ }. -type channel_config() :: #{ - parameters := #{ - bucket := string(), - key := string(), - content := string(), - acl => emqx_s3:acl() - } + bridge_type := binary(), + parameters := s3_upload_parameters() | s3_aggregated_upload_parameters() +}. + +-type s3_upload_parameters() :: #{ + bucket := string(), + key := string(), + content := string(), + acl => emqx_s3:acl() +}. + +-type s3_aggregated_upload_parameters() :: #{ + bucket := string(), + key := string(), + acl => emqx_s3:acl(), + aggregation => #{ + time_interval := emqx_schema:duration_s(), + max_records := pos_integer() + }, + container := #{ + type := csv, + column_order => [string()] + }, + min_part_size := emqx_schema:bytesize(), + max_part_size := emqx_schema:bytesize() }. -type channel_state() :: #{ @@ -123,12 +143,13 @@ on_get_status(_InstId, State = #{client_config := Config}) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - ChannelState = init_channel_state(Config), + ChannelState = start_channel(State, Config), {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) -> + ok = stop_channel(maps:get(ChannelId, Channels, undefined)), {ok, State#{channels => maps:remove(ChannelId, Channels)}}. -spec on_get_channels(_InstanceId :: resource_id()) -> @@ -138,27 +159,121 @@ on_get_channels(InstId) -> -spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) -> channel_status(). -on_get_channel_status(_InstId, ChannelId, #{channels := Channels}) -> +on_get_channel_status(_InstId, ChannelId, State = #{channels := Channels}) -> case maps:get(ChannelId, Channels, undefined) of - _ChannelState = #{} -> - %% TODO - %% Since bucket name may be templated, we can't really provide any - %% additional information regarding the channel health. - ?status_connected; + ChannelState = #{} -> + channel_status(ChannelState, State); undefined -> ?status_disconnected end. -init_channel_state(#{parameters := Parameters}) -> +start_channel(_State, #{ + bridge_type := ?BRIDGE_TYPE_UPLOAD, + parameters := Parameters = #{ + bucket := Bucket, + key := Key, + content := Content + } +}) -> #{ - bucket => emqx_template:parse(maps:get(bucket, Parameters)), - key => emqx_template:parse(maps:get(key, Parameters)), - content => emqx_template:parse(maps:get(content, Parameters)), - upload_options => #{ - acl => maps:get(acl, Parameters, undefined) - } + type => ?ACTION_UPLOAD, + bucket => emqx_template:parse(Bucket), + key => emqx_template:parse(Key), + content => emqx_template:parse(Content), + upload_options => upload_options(Parameters) + }; +start_channel(State, #{ + bridge_type := Type = ?BRIDGE_TYPE_AGGREGATED_UPLOAD, + bridge_name := Name, + parameters := Parameters = #{ + aggregation := #{ + time_interval := TimeInterval, + max_records := MaxRecords + }, + container := Container, + bucket := Bucket + } +}) -> + AggregOpts = #{ + time_interval => TimeInterval, + max_records => MaxRecords, + work_dir => work_dir(Type, Name) + }, + DeliveryOpts = #{ + bucket => Bucket, + key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters), + container => Container, + upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters), + client_config => maps:get(client_config, State), + uploader_config => maps:with([min_part_size, max_part_size], Parameters) + }, + _ = emqx_bridge_s3_sup:delete_child({Type, Name}), + {ok, SupPid} = emqx_bridge_s3_sup:start_child(#{ + id => {Type, Name}, + start => {emqx_bridge_s3_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, + type => supervisor, + restart => permanent + }), + #{ + type => ?ACTION_AGGREGATED_UPLOAD, + name => Name, + bucket => Bucket, + supervisor => SupPid, + on_stop => fun() -> emqx_bridge_s3_sup:delete_child({Type, Name}) end }. +upload_options(Parameters) -> + #{acl => maps:get(acl, Parameters, undefined)}. + +work_dir(Type, Name) -> + filename:join([emqx:data_dir(), bridge, Type, Name]). + +stop_channel(#{on_stop := OnStop}) -> + OnStop(); +stop_channel(_ChannelState) -> + ok. + +channel_status(#{type := ?ACTION_UPLOAD}, _State) -> + %% TODO + %% Since bucket name may be templated, we can't really provide any additional + %% information regarding the channel health. + ?status_connected; +channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> + %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. + Timestamp = erlang:system_time(second), + ok = emqx_bridge_s3_aggregator:tick(Name, Timestamp), + ok = check_bucket_accessible(Bucket, State), + ok = check_aggreg_upload_errors(Name), + ?status_connected. + +check_bucket_accessible(Bucket, #{client_config := Config}) -> + case emqx_s3_client:aws_config(Config) of + {error, Reason} -> + throw({unhealthy_target, Reason}); + AWSConfig -> + try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of + Props when is_list(Props) -> + ok + catch + error:{aws_error, {http_error, 404, _, _Reason}} -> + throw({unhealthy_target, "Bucket does not exist"}); + error:{aws_error, {socket_error, Reason}} -> + throw({unhealthy_target, emqx_utils:format(Reason)}) + end + end. + +check_aggreg_upload_errors(Name) -> + case emqx_bridge_s3_aggregator:take_error(Name) of + [Error] -> + %% TODO + %% This approach means that, for example, 3 upload failures will cause + %% the channel to be marked as unhealthy for 3 consecutive health checks. + ErrorMessage = emqx_utils:format(Error), + throw({unhealthy_target, ErrorMessage}); + [] -> + ok + end. + %% Queries -type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}. @@ -167,8 +282,21 @@ init_channel_state(#{parameters := Parameters}) -> {ok, _Result} | {error, _Reason}. on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) -> case maps:get(Tag, Channels, undefined) of - ChannelState = #{} -> + ChannelState = #{type := ?ACTION_UPLOAD} -> run_simple_upload(InstId, Tag, Data, ChannelState, Config); + ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} -> + run_aggregated_upload(InstId, [Data], ChannelState); + undefined -> + {error, {unrecoverable_error, {invalid_message_tag, Tag}}} + end. + +-spec on_batch_query(_InstanceId :: resource_id(), [query()], state()) -> + {ok, _Result} | {error, _Reason}. +on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) -> + case maps:get(Tag, Channels, undefined) of + ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} -> + Records = [Data0 | [Data || {_, Data} <- Rest]], + run_aggregated_upload(InstId, Records, ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -206,6 +334,16 @@ run_simple_upload( {error, map_error(Reason)} end. +run_aggregated_upload(InstId, Records, #{name := Name}) -> + Timestamp = erlang:system_time(second), + case emqx_bridge_s3_aggregator:push_records(Name, Timestamp, Records) of + ok -> + ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), + ok; + {error, Reason} -> + {error, {unrecoverable_error, Reason}} + end. + map_error({socket_error, _} = Reason) -> {recoverable_error, Reason}; map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl index be3c29bae..560334610 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl @@ -20,7 +20,7 @@ type_name() -> s3. bridge_types() -> - [s3]. + [s3, s3_aggregated_upload]. resource_callback_module() -> emqx_bridge_s3_connector. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl new file mode 100644 index 000000000..230711a76 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl @@ -0,0 +1,42 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_sup). + +-export([ + start_link/0, + start_child/1, + delete_child/1 +]). + +-behaviour(supervisor). +-export([init/1]). + +-define(SUPREF, ?MODULE). + +%% + +start_link() -> + supervisor:start_link({local, ?SUPREF}, ?MODULE, root). + +start_child(ChildSpec) -> + supervisor:start_child(?SUPREF, ChildSpec). + +delete_child(ChildId) -> + case supervisor:terminate_child(?SUPREF, ChildId) of + ok -> + supervisor:delete_child(?SUPREF, ChildId); + Error -> + Error + end. + +%% + +init(root) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 1, + period => 1 + }, + {ok, {SupFlags, []}}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl new file mode 100644 index 000000000..44e2360b8 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -0,0 +1,143 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_upload). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_s3.hrl"). + +-define(ACTION, ?ACTION_UPLOAD). + +-behaviour(hocon_schema). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-export([ + bridge_v2_examples/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_s3". + +roots() -> + []. + +fields(Field) when + Field == "get_bridge_v2"; + Field == "put_bridge_v2"; + Field == "post_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); +fields(action) -> + {?ACTION, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), + #{ + desc => <<"S3 Upload Action Config">>, + required => false + } + )}; +fields(?ACTION) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + ?R_REF(s3_upload_parameters), + #{ + required => true, + desc => ?DESC(s3_upload) + } + ), + #{ + resource_opts_ref => ?R_REF(s3_action_resource_opts) + } + ); +fields(s3_upload_parameters) -> + emqx_s3_schema:fields(s3_upload) ++ + [ + {content, + hoconsc:mk( + emqx_schema:template(), + #{ + required => false, + default => <<"${.}">>, + desc => ?DESC(s3_object_content) + } + )} + ]; +fields(s3_action_resource_opts) -> + UnsupportedOpts = [batch_size, batch_time], + lists:filter( + fun({N, _}) -> not lists:member(N, UnsupportedOpts) end, + emqx_bridge_v2_schema:action_resource_opts_fields() + ). + +desc(s3) -> + ?DESC(s3_upload); +desc(Name) when + Name == s3_upload; + Name == s3_upload_parameters +-> + ?DESC(Name); +desc(s3_action_resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"s3">> => #{ + summary => <<"S3 Simple Upload">>, + value => s3_upload_action_example(Method) + } + } + ]. + +s3_upload_action_example(post) -> + maps:merge( + s3_upload_action_example(put), + #{ + type => atom_to_binary(?ACTION_UPLOAD), + name => <<"my_s3_action">> + } + ); +s3_upload_action_example(get) -> + maps:merge( + s3_upload_action_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +s3_upload_action_example(put) -> + #{ + enable => true, + connector => <<"my_s3_connector">>, + description => <<"My action">>, + parameters => #{ + bucket => <<"${clientid}">>, + key => <<"${topic}">>, + content => <<"${payload}">>, + acl => <<"public_read">> + }, + resource_opts => #{ + query_mode => <<"sync">>, + inflight_window => 10 + } + }. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload_action_info.erl similarity index 69% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl rename to apps/emqx_bridge_s3/src/emqx_bridge_s3_upload_action_info.erl index 646173bf4..15a76ae7c 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload_action_info.erl @@ -2,18 +2,20 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_s3_action_info). +-module(emqx_bridge_s3_upload_action_info). -behaviour(emqx_action_info). +-include("emqx_bridge_s3.hrl"). + -export([ action_type_name/0, connector_type_name/0, schema_module/0 ]). -action_type_name() -> s3. +action_type_name() -> ?ACTION_UPLOAD. connector_type_name() -> s3. -schema_module() -> emqx_bridge_s3. +schema_module() -> emqx_bridge_s3_upload. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index 738f9186f..322666b1f 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -11,8 +11,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). --import(emqx_utils_conv, [bin/1]). - %% See `emqx_bridge_s3.hrl`. -define(BRIDGE_TYPE, <<"s3">>). -define(CONNECTOR_TYPE, <<"s3">>). @@ -79,67 +77,56 @@ end_per_testcase(_TestCase, _Config) -> connector_config(Name, _Config) -> BaseConf = emqx_s3_test_helpers:base_raw_config(tcp), - parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{ - <<"enable">> => true, - <<"description">> => <<"S3 Connector">>, - <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)), - <<"port">> => maps:get(<<"port">>, BaseConf), - <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), - <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), - <<"transport_options">> => #{ - <<"headers">> => #{ - <<"content-type">> => <> + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"connectors">>, ?CONNECTOR_TYPE, Name, #{ + <<"enable">> => true, + <<"description">> => <<"S3 Connector">>, + <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)), + <<"port">> => maps:get(<<"port">>, BaseConf), + <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), + <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), + <<"transport_options">> => #{ + <<"headers">> => #{ + <<"content-type">> => <> + }, + <<"connect_timeout">> => <<"500ms">>, + <<"request_timeout">> => <<"1s">>, + <<"pool_size">> => 4, + <<"max_retries">> => 0, + <<"enable_pipelining">> => 1 }, - <<"connect_timeout">> => <<"500ms">>, - <<"request_timeout">> => <<"1s">>, - <<"pool_size">> => 4, - <<"max_retries">> => 0, - <<"enable_pipelining">> => 1 - }, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"5s">>, - <<"start_timeout">> => <<"5s">> + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"5s">>, + <<"start_timeout">> => <<"5s">> + } } - }). + ). action_config(Name, ConnectorId) -> - parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{ - <<"enable">> => true, - <<"connector">> => ConnectorId, - <<"parameters">> => #{ - <<"bucket">> => <<"${clientid}">>, - <<"key">> => <<"${topic}">>, - <<"content">> => <<"${payload}">>, - <<"acl">> => <<"public_read">> - }, - <<"resource_opts">> => #{ - <<"buffer_mode">> => <<"memory_only">>, - <<"buffer_seg_bytes">> => <<"10MB">>, - <<"health_check_interval">> => <<"3s">>, - <<"inflight_window">> => 40, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"metrics_flush_interval">> => <<"1s">>, - <<"query_mode">> => <<"sync">>, - <<"request_ttl">> => <<"60s">>, - <<"resume_interval">> => <<"3s">>, - <<"worker_pool_size">> => <<"4">> + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"actions">>, ?BRIDGE_TYPE, Name, #{ + <<"enable">> => true, + <<"connector">> => ConnectorId, + <<"parameters">> => #{ + <<"bucket">> => <<"${clientid}">>, + <<"key">> => <<"${topic}">>, + <<"content">> => <<"${payload}">>, + <<"acl">> => <<"public_read">> + }, + <<"resource_opts">> => #{ + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"3s">>, + <<"inflight_window">> => 40, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"60s">>, + <<"resume_interval">> => <<"3s">>, + <<"worker_pool_size">> => <<"4">> + } } - }). - -parse_and_check_config(Root, Type, Name, ConfigIn) -> - Schema = - case Root of - <<"connectors">> -> emqx_connector_schema; - <<"actions">> -> emqx_bridge_v2_schema - end, - #{Root := #{Type := #{Name := Config}}} = - hocon_tconf:check_plain( - Schema, - #{Root => #{Type => #{Name => ConfigIn}}}, - #{required => false, atom_key => false} - ), - ct:pal("parsed config: ~p", [Config]), - ConfigIn. + ). t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). @@ -190,7 +177,7 @@ t_sync_query(Config) -> ok = erlcloud_s3:create_bucket(Bucket, AwsConfig), ok = emqx_bridge_v2_testlib:t_sync_query( Config, - fun() -> mk_message(Bucket, Topic, Payload) end, + fun() -> emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload) end, fun(Res) -> ?assertMatch(ok, Res) end, s3_bridge_connector_upload_ok ), @@ -224,15 +211,10 @@ t_query_retry_recoverable(Config) -> heal_failure, [timeout, ?PROXY_NAME, ProxyHost, ProxyPort] ), - Message = mk_message(Bucket, Topic, Payload), + Message = emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload), %% Verify that the message is sent eventually. ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}), ?assertMatch( #{content := Payload}, maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig)) ). - -mk_message(ClientId, Topic, Payload) -> - Message = emqx_message:make(bin(ClientId), bin(Topic), Payload), - {Event, _} = emqx_rule_events:eventmsg_publish(Message), - Event. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl new file mode 100644 index 000000000..199e070d3 --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl @@ -0,0 +1,181 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_buffer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%% CT Setup + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = emqx_cth_suite:work_dir(Config), + ok = filelib:ensure_path(WorkDir), + [{work_dir, WorkDir} | Config]. + +end_per_suite(_Config) -> + ok. + +%% Testcases + +t_write_read_cycle(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, + {ok, WFD} = file:open(Filename, [write, binary]), + Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Terms = [ + [], + [[[[[[[[]]]]]]]], + 123, + lists:seq(1, 100), + lists:seq(1, 1000), + lists:seq(1, 10000), + lists:seq(1, 100000), + #{<<"id">> => 123456789, <<"ts">> => <<"2028-02-29T12:34:56Z">>, <<"gauge">> => 42.42}, + {<<"text/plain">>, _Huge = rand:bytes(1048576)}, + {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})} + ], + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, + Terms + ), + ok = file:close(WFD), + {ok, RFD} = file:open(Filename, [read, binary, raw]), + {MetadataRead, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), + ?assertEqual(Metadata, MetadataRead), + TermsRead = read_until_eof(Reader), + ?assertEqual(Terms, TermsRead). + +t_read_empty(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + ok = file:close(WFD), + {ok, RFD} = file:open(Filename, [read, binary]), + ?assertError( + {buffer_incomplete, header}, + emqx_bridge_s3_aggreg_buffer:new_reader(RFD) + ). + +t_read_garbage(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + ok = file:write(WFD, rand:bytes(1048576)), + ok = file:close(WFD), + {ok, RFD} = file:open(Filename, [read, binary]), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:new_reader(RFD) + ). + +t_read_truncated(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, + Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Terms = [ + [[[[[[[[[[[]]]]]]]]]]], + lists:seq(1, 100000), + #{<<"id">> => 123456789, <<"ts">> => <<"2029-02-30T12:34:56Z">>, <<"gauge">> => 42.42}, + {<<"text/plain">>, _Huge = rand:bytes(1048576)} + ], + LastTerm = + {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}, + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, + Terms + ), + {ok, WPos} = file:position(WFD, cur), + ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(LastTerm, Writer)), + ok = file:close(WFD), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos + 1), + {ok, RFD1} = file:open(Filename, [read, binary]), + {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD1), + {ReadTerms1, Reader1} = read_terms(length(Terms), Reader0), + ?assertEqual(Terms, ReadTerms1), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:read(Reader1) + ), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), + {ok, RFD2} = file:open(Filename, [read, binary]), + {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD2), + {ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2), + ?assertEqual(lists:sublist(Terms, 3), ReadTerms2), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:read(Reader3) + ). + +t_read_truncated_takeover_write(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, + Writer1 = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Terms1 = [ + [[[[[[[[[[[]]]]]]]]]]], + lists:seq(1, 10000), + lists:duplicate(1000, ?FUNCTION_NAME), + {<<"text/plain">>, _Huge = rand:bytes(1048576)} + ], + Terms2 = [ + {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}, + {<<"application/x-octet-stream">>, rand:bytes(102400)} + ], + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer1)) end, + Terms1 + ), + {ok, WPos} = file:position(WFD, cur), + ok = file:close(WFD), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), + {ok, RWFD} = file:open(Filename, [read, write, binary]), + {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RWFD), + {ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0), + ?assertEqual( + lists:sublist(Terms1, 3), + ReadTerms1 + ), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:read(Reader1) + ), + Writer2 = emqx_bridge_s3_aggreg_buffer:takeover(Reader1), + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer2)) end, + Terms2 + ), + ok = file:close(RWFD), + {ok, RFD} = file:open(Filename, [read, binary]), + {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), + ReadTerms2 = read_until_eof(Reader2), + ?assertEqual( + lists:sublist(Terms1, 3) ++ Terms2, + ReadTerms2 + ). + +%% + +mk_filename(Name, Config) -> + filename:join(?config(work_dir, Config), Name). + +read_terms(0, Reader) -> + {[], Reader}; +read_terms(N, Reader0) -> + {Term, Reader1} = emqx_bridge_s3_aggreg_buffer:read(Reader0), + {Terms, Reader} = read_terms(N - 1, Reader1), + {[Term | Terms], Reader}. + +read_until_eof(Reader0) -> + case emqx_bridge_s3_aggreg_buffer:read(Reader0) of + {Term, Reader} -> + [Term | read_until_eof(Reader)]; + eof -> + [] + end. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl new file mode 100644 index 000000000..6da70c6fe --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl @@ -0,0 +1,72 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_csv_tests). + +-include_lib("eunit/include/eunit.hrl"). + +encoding_test() -> + CSV = emqx_bridge_s3_aggreg_csv:new(#{}), + ?assertEqual( + "A,B,Ç\n" + "1.2345,string,0.0\n" + "0.3333333333,\"[]\",-0.0\n" + "111111,🫠,0.0\n" + "111.111,\"\"\"quoted\"\"\",\"line\r\nbreak\"\n" + "222.222,,\n", + fill_close(CSV, [ + [ + #{<<"A">> => 1.2345, <<"B">> => "string", <<"Ç"/utf8>> => +0.0}, + #{<<"A">> => 1 / 3, <<"B">> => "[]", <<"Ç"/utf8>> => -0.0}, + #{<<"A">> => 111111, <<"B">> => "🫠", <<"Ç"/utf8>> => 0.0}, + #{<<"A">> => 111.111, <<"B">> => "\"quoted\"", <<"Ç"/utf8>> => "line\r\nbreak"}, + #{<<"A">> => 222.222, <<"B">> => "", <<"Ç"/utf8>> => undefined} + ] + ]) + ). + +column_order_test() -> + Order = [<<"ID">>, <<"TS">>], + CSV = emqx_bridge_s3_aggreg_csv:new(#{column_order => Order}), + ?assertEqual( + "ID,TS,A,B,D\n" + "1,2024-01-01,12.34,str,\"[]\"\n" + "2,2024-01-02,23.45,ing,\n" + "3,,45,,'\n" + "4,2024-01-04,,,\n", + fill_close(CSV, [ + [ + #{ + <<"A">> => 12.34, + <<"B">> => "str", + <<"ID">> => 1, + <<"TS">> => "2024-01-01", + <<"D">> => <<"[]">> + }, + #{ + <<"TS">> => "2024-01-02", + <<"C">> => <<"null">>, + <<"ID">> => 2, + <<"A">> => 23.45, + <<"B">> => "ing" + } + ], + [ + #{<<"A">> => 45, <<"D">> => <<"'">>, <<"ID">> => 3}, + #{<<"ID">> => 4, <<"TS">> => "2024-01-04"} + ] + ]) + ). + +fill_close(CSV, LRecords) -> + string(fill_close_(CSV, LRecords)). + +fill_close_(CSV0, [Records | LRest]) -> + {Writes, CSV} = emqx_bridge_s3_aggreg_csv:fill(Records, CSV0), + [Writes | fill_close_(CSV, LRest)]; +fill_close_(CSV, []) -> + [emqx_bridge_s3_aggreg_csv:close(CSV)]. + +string(Writes) -> + unicode:characters_to_list(Writes). diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl new file mode 100644 index 000000000..6577b45ed --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -0,0 +1,465 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +-import(emqx_utils_conv, [bin/1]). + +%% See `emqx_bridge_s3.hrl`. +-define(BRIDGE_TYPE, <<"s3_aggregated_upload">>). +-define(CONNECTOR_TYPE, <<"s3">>). + +-define(PROXY_NAME, "minio_tcp"). + +-define(CONF_TIME_INTERVAL, 4000). +-define(CONF_MAX_RECORDS, 100). +-define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])). +-define(CONF_COLUMN_ORDER(T), [ + <<"publish_received_at">>, + <<"clientid">>, + <<"topic">>, + <<"payload">>, + <<"empty">> + | T +]). + +-define(LIMIT_TOLERANCE, 1.1). + +%% CT Setup + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + _ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_s3, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [ + {apps, Apps}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {proxy_name, ?PROXY_NAME} + | Config + ]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +%% Testcases + +init_per_testcase(TestCase, Config) -> + ct:timetrap(timer:seconds(15)), + ok = snabbkaffe:start_trace(), + TS = erlang:system_time(), + Name = iolist_to_binary(io_lib:format("~s-~p", [TestCase, TS])), + Bucket = unicode:characters_to_list(string:replace(Name, "_", "-", all)), + ConnectorConfig = connector_config(Name, Config), + ActionConfig = action_config(Name, Name, Bucket), + ok = emqx_bridge_s3_test_helpers:create_bucket(Bucket), + [ + {connector_type, ?CONNECTOR_TYPE}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, ?BRIDGE_TYPE}, + {bridge_name, Name}, + {bridge_config, ActionConfig}, + {s3_bucket, Bucket} + | Config + ]. + +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), + ok. + +connector_config(Name, _Config) -> + BaseConf = emqx_s3_test_helpers:base_raw_config(tcp), + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"connectors">>, ?CONNECTOR_TYPE, Name, #{ + <<"enable">> => true, + <<"description">> => <<"S3 Connector">>, + <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)), + <<"port">> => maps:get(<<"port">>, BaseConf), + <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), + <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), + <<"transport_options">> => #{ + <<"connect_timeout">> => <<"500ms">>, + <<"request_timeout">> => <<"1s">>, + <<"pool_size">> => 4, + <<"max_retries">> => 0 + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">> + } + } + ). + +action_config(Name, ConnectorId, Bucket) -> + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"actions">>, ?BRIDGE_TYPE, Name, #{ + <<"enable">> => true, + <<"connector">> => ConnectorId, + <<"parameters">> => #{ + <<"bucket">> => unicode:characters_to_binary(Bucket), + <<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>, + <<"acl">> => <<"public_read">>, + <<"headers">> => #{ + <<"X-AMZ-Meta-Version">> => <<"42">> + }, + <<"aggregation">> => #{ + <<"time_interval">> => <<"4s">>, + <<"max_records">> => ?CONF_MAX_RECORDS + }, + <<"container">> => #{ + <<"type">> => <<"csv">>, + <<"column_order">> => ?CONF_COLUMN_ORDER + } + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">>, + <<"max_buffer_bytes">> => <<"64MB">>, + <<"query_mode">> => <<"async">>, + <<"worker_pool_size">> => 4 + } + } + ). + +t_start_stop(Config) -> + emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). + +t_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{}). + +t_aggreg_upload(Config) -> + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + BridgeNameString = unicode:characters_to_list(BridgeName), + NodeString = atom_to_list(node()), + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Prepare some sample messages that look like Rule SQL productions. + MessageEvents = lists:map(fun mk_message_event/1, [ + {<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>}, + {<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>}, + {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} + ]), + ok = send_messages(BridgeName, MessageEvents), + %% Wait until the delivery is completed. + ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check the uploaded objects. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + ?assertMatch( + [BridgeNameString, NodeString, _Datetime, _Seq = "0"], + string:split(Key, "/", all) + ), + Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + ?assertMatch( + #{content_type := "text/csv", "x-amz-meta-version" := "42"}, + Upload + ), + %% Verify that column order is respected. + ?assertMatch( + {ok, [ + ?CONF_COLUMN_ORDER(_), + [TS, <<"C1">>, T1, P1, <<>> | _], + [TS, <<"C2">>, T2, P2, <<>> | _], + [TS, <<"C3">>, T3, P3, <<>> | _] + ]}, + erl_csv:decode(Content) + ). + +t_aggreg_upload_rule(Config) -> + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + ClientID = emqx_utils_conv:bin(?FUNCTION_NAME), + %% Create a bridge with the sample configuration and a simple SQL rule. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + ?assertMatch( + {ok, _Rule}, + emqx_bridge_v2_testlib:create_rule_and_action_http(?BRIDGE_TYPE, <<>>, Config, #{ + sql => << + "SELECT" + " *," + " strlen(payload) as psize," + " unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at" + " FROM 's3/#'" + >> + }) + ), + ok = lists:foreach(fun emqx:publish/1, [ + emqx_message:make(?FUNCTION_NAME, T1 = <<"s3/m1">>, P1 = <<"[HELLO]">>), + emqx_message:make(?FUNCTION_NAME, T2 = <<"s3/m2">>, P2 = <<"[WORLD]">>), + emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), + emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) + ]), + ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check the uploaded objects. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), + %% Verify that column order is respected and event fields are preserved. + ?assertMatch(?CONF_COLUMN_ORDER(_), Header), + ?assertEqual( + [<<"event">>, <<"qos">>, <<"psize">>], + [C || C <- [<<"event">>, <<"qos">>, <<"psize">>], lists:member(C, Header)] + ), + %% Verify that all the matching messages are present. + ?assertMatch( + [ + [_TS1, ClientID, T1, P1 | _], + [_TS2, ClientID, T2, P2 | _], + [_TS3, ClientID, T3, P3 | _] + ], + Rows + ), + %% Verify that timestamp column now has RFC3339 format. + [_Row = [TS1 | _] | _Rest] = Rows, + ?assert( + is_integer(emqx_rule_funcs:rfc3339_to_unix_ts(TS1, millisecond)), + TS1 + ). + +t_aggreg_upload_restart(Config) -> + %% NOTE + %% This test verifies that the bridge will reuse existing aggregation buffer + %% after a restart. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Send some sample messages that look like Rule SQL productions. + MessageEvents = lists:map(fun mk_message_event/1, [ + {<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>}, + {<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>}, + {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} + ]), + ok = send_messages(BridgeName, MessageEvents), + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), + %% Restart the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + %% Send some more messages. + ok = send_messages(BridgeName, MessageEvents), + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), + %% Wait until the delivery is completed. + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check there's still only one upload. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + ?assertMatch( + {ok, [ + _Header = [_ | _], + [TS1, <<"C1">>, T1, P1 | _], + [TS1, <<"C2">>, T2, P2 | _], + [TS1, <<"C3">>, T3, P3 | _], + [TS2, <<"C1">>, T1, P1 | _], + [TS2, <<"C2">>, T2, P2 | _], + [TS2, <<"C3">>, T3, P3 | _] + ]}, + erl_csv:decode(Content) + ). + +t_aggreg_upload_restart_corrupted(Config) -> + %% NOTE + %% This test verifies that the bridge can recover from a buffer file corruption, + %% and does so while preserving uncompromised data. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + BatchSize = ?CONF_MAX_RECORDS div 2, + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Send some sample messages that look like Rule SQL productions. + Messages1 = [ + {integer_to_binary(N), <<"a/b/c">>, <<"{\"hello\":\"world\"}">>} + || N <- lists:seq(1, BatchSize) + ], + %% Ensure that they span multiple batch queries. + ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), + {ok, _} = ?block_until( + #{?snk_kind := s3_aggreg_records_written, action := BridgeName}, + infinity, + 0 + ), + %% Find out the buffer file. + {ok, #{filename := Filename}} = ?block_until( + #{?snk_kind := s3_aggreg_buffer_allocated, action := BridgeName} + ), + %% Stop the bridge, corrupt the buffer file, and restart the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + BufferFileSize = filelib:file_size(Filename), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, BufferFileSize div 2), + {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + %% Send some more messages. + Messages2 = [ + {integer_to_binary(N), <<"c/d/e">>, <<"{\"hello\":\"world\"}">>} + || N <- lists:seq(1, BatchSize) + ], + ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), + %% Wait until the delivery is completed. + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check that upload contains part of the first batch and all of the second batch. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), + NRows = length(Rows), + ?assert( + NRows > BatchSize, + CSV + ), + ?assertEqual( + lists:sublist(Messages1, NRows - BatchSize) ++ Messages2, + [{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows], + CSV + ). + +t_aggreg_pending_upload_restart(Config) -> + %% NOTE + %% This test verifies that the bridge will finish uploading a buffer file after + %% a restart. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Send few large messages that will require multipart upload. + %% Ensure that they span multiple batch queries. + Payload = iolist_to_binary(lists:duplicate(128 * 1024, "PAYLOAD!")), + Messages = [{integer_to_binary(N), <<"a/b/c">>, Payload} || N <- lists:seq(1, 10)], + ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages), 10), + %% Wait until the multipart upload is started. + {ok, #{key := ObjectKey}} = + ?block_until(#{?snk_kind := s3_client_multipart_started, bucket := Bucket}), + %% Stop the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + %% Verify that pending uploads have been gracefully aborted. + %% NOTE: Minio does not support multipart upload listing w/o prefix. + ?assertEqual( + [], + emqx_bridge_s3_test_helpers:list_pending_uploads(Bucket, ObjectKey) + ), + %% Restart the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + %% Wait until the delivery is completed. + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check that delivery contains all the messages. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + [_Header | Rows] = fetch_parse_csv(Bucket, Key), + ?assertEqual( + Messages, + [{CID, Topic, PL} || [_TS, CID, Topic, PL | _] <- Rows] + ). + +t_aggreg_next_rotate(Config) -> + %% NOTE + %% This is essentially a stress test that tries to verify that buffer rotation + %% and windowing work correctly under high rate, high concurrency conditions. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + NSenders = 4, + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Start separate processes to send messages. + Senders = [ + spawn_link(fun() -> run_message_sender(BridgeName, N) end) + || N <- lists:seq(1, NSenders) + ], + %% Give them some time to send messages so that rotation and windowing will happen. + ok = timer:sleep(round(?CONF_TIME_INTERVAL * 1.5)), + %% Stop the senders. + _ = [Sender ! {stop, self()} || Sender <- Senders], + NSent = receive_sender_reports(Senders), + %% Wait for the last delivery to complete. + ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), + ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}, infinity, 0), + %% There should be at least 2 time windows of aggregated records. + Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], + DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]], + ?assert( + ordsets:size(ordsets:from_list(DTs)) > 1, + Uploads + ), + %% Uploads should not contain more than max allowed records. + CSVs = [{K, fetch_parse_csv(Bucket, K)} || K <- Uploads], + NRecords = [{K, length(CSV) - 1} || {K, CSV} <- CSVs], + ?assertEqual( + [], + [{K, NR} || {K, NR} <- NRecords, NR > ?CONF_MAX_RECORDS * ?LIMIT_TOLERANCE] + ), + %% No message should be lost. + ?assertEqual( + NSent, + lists:sum([NR || {_, NR} <- NRecords]) + ). + +run_message_sender(BridgeName, N) -> + ClientID = integer_to_binary(N), + Topic = <<"a/b/c/", ClientID/binary>>, + run_message_sender(BridgeName, N, ClientID, Topic, N, 0). + +run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent) -> + Payload = integer_to_binary(N * 1_000_000 + NSent), + Message = emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload), + _ = send_message(BridgeName, Message), + receive + {stop, From} -> + From ! {sent, self(), NSent + 1} + after Delay -> + run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent + 1) + end. + +receive_sender_reports([Sender | Rest]) -> + receive + {sent, Sender, NSent} -> NSent + receive_sender_reports(Rest) + end; +receive_sender_reports([]) -> + 0. + +%% + +mk_message_event({ClientID, Topic, Payload}) -> + emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload). + +send_messages(BridgeName, MessageEvents) -> + lists:foreach( + fun(M) -> send_message(BridgeName, M) end, + MessageEvents + ). + +send_messages_delayed(BridgeName, MessageEvents, Delay) -> + lists:foreach( + fun(M) -> + send_message(BridgeName, M), + timer:sleep(Delay) + end, + MessageEvents + ). + +send_message(BridgeName, Message) -> + ?assertEqual(ok, emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{})). + +fetch_parse_csv(Bucket, Key) -> + #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + {ok, CSV} = erl_csv:decode(Content), + CSV. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl new file mode 100644 index 000000000..21729369b --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_test_helpers). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_utils_conv, [bin/1]). + +parse_and_check_config(Root, Type, Name, Config) -> + Schema = + case Root of + <<"connectors">> -> emqx_connector_schema; + <<"actions">> -> emqx_bridge_v2_schema + end, + #{Root := #{Type := #{Name := _ConfigParsed}}} = + hocon_tconf:check_plain( + Schema, + #{Root => #{Type => #{Name => Config}}}, + #{required => false, atom_key => false} + ), + Config. + +mk_message_event(ClientId, Topic, Payload) -> + Message = emqx_message:make(bin(ClientId), bin(Topic), Payload), + {Event, _} = emqx_rule_events:eventmsg_publish(Message), + emqx_utils_maps:binary_key_map(Event). + +create_bucket(Bucket) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + erlcloud_s3:create_bucket(Bucket, AwsConfig). + +list_objects(Bucket) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + Response = erlcloud_s3:list_objects(Bucket, AwsConfig), + false = proplists:get_value(is_truncated, Response), + Contents = proplists:get_value(contents, Response), + lists:map(fun maps:from_list/1, Contents). + +get_object(Bucket, Key) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + maps:from_list(erlcloud_s3:get_object(Bucket, Key, AwsConfig)). + +list_pending_uploads(Bucket, Key) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + {ok, Props} = erlcloud_s3:list_multipart_uploads(Bucket, [{prefix, Key}], [], AwsConfig), + Uploads = proplists:get_value(uploads, Props), + lists:map(fun maps:from_list/1, Uploads). + +%% File utilities + +truncate_at(Filename, Pos) -> + {ok, FD} = file:open(Filename, [read, write, binary]), + {ok, Pos} = file:position(FD, Pos), + ok = file:truncate(FD), + ok = file:close(FD). diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index a415cf8d4..58029ae85 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -6,6 +6,7 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("erlcloud/include/erlcloud_aws.hrl"). -export([ @@ -133,7 +134,13 @@ start_multipart( Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)), case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of {ok, Props} -> - {ok, response_property('uploadId', Props)}; + UploadId = response_property('uploadId', Props), + ?tp(s3_client_multipart_started, #{ + bucket => Bucket, + key => Key, + upload_id => UploadId + }), + {ok, UploadId}; {error, Reason} -> ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), {error, Reason} @@ -177,6 +184,11 @@ complete_multipart( ) of ok -> + ?tp(s3_client_multipart_completed, #{ + bucket => Bucket, + key => Key, + upload_id => UploadId + }), ok; {error, Reason} -> ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}), @@ -193,6 +205,11 @@ abort_multipart( ) -> case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of ok -> + ?tp(s3_client_multipart_aborted, #{ + bucket => Bucket, + key => Key, + upload_id => UploadId + }), ok; {error, Reason} -> ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}), diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 67f9d33b3..7bb5d87f1 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -53,7 +53,7 @@ emqx_s3_client:bucket(), emqx_s3_client:config(), emqx_s3_client:upload_options(), - emqx_s3_uploader:config() + emqx_s3_upload:config() }. -define(DEFAULT_CALL_TIMEOUT, 5000). diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index 1199948d0..1fa6d31cd 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -102,6 +102,14 @@ fields(s3_upload) -> desc => ?DESC("acl"), required => false } + )}, + {headers, + hoconsc:mk( + map(), + #{ + required => false, + desc => ?DESC("upload_headers") + } )} ]; fields(s3_uploader) -> diff --git a/apps/emqx_s3/src/emqx_s3_upload.erl b/apps/emqx_s3/src/emqx_s3_upload.erl new file mode 100644 index 000000000..565c6b8bc --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_upload.erl @@ -0,0 +1,217 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_upload). + +-include_lib("emqx/include/types.hrl"). + +-export([ + new/4, + append/2, + write/1, + complete/1, + abort/1 +]). + +-export([format/1]). + +-export_type([t/0, config/0]). + +-type config() :: #{ + min_part_size => pos_integer(), + max_part_size => pos_integer() +}. + +-type t() :: #{ + started := boolean(), + client := emqx_s3_client:client(), + key := emqx_s3_client:key(), + upload_opts := emqx_s3_client:upload_options(), + buffer := iodata(), + buffer_size := non_neg_integer(), + min_part_size := pos_integer(), + max_part_size := pos_integer(), + upload_id := undefined | emqx_s3_client:upload_id(), + etags := [emqx_s3_client:etag()], + part_number := emqx_s3_client:part_number() +}. + +%% 5MB +-define(DEFAULT_MIN_PART_SIZE, 5242880). +%% 5GB +-define(DEFAULT_MAX_PART_SIZE, 5368709120). + +%% + +-spec new( + emqx_s3_client:client(), + emqx_s3_client:key(), + emqx_s3_client:upload_options(), + config() +) -> + t(). +new(Client, Key, UploadOpts, Config) -> + #{ + started => false, + client => Client, + key => Key, + upload_opts => UploadOpts, + buffer => [], + buffer_size => 0, + min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE), + max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE), + upload_id => undefined, + etags => [], + part_number => 1 + }. + +-spec append(iodata(), t()) -> {ok, t()} | {error, term()}. +append(WriteData, #{buffer := Buffer, buffer_size := BufferSize} = Upload) -> + case is_valid_part(WriteData, Upload) of + true -> + {ok, Upload#{ + buffer => [Buffer, WriteData], + buffer_size => BufferSize + iolist_size(WriteData) + }}; + false -> + {error, {too_large, iolist_size(WriteData)}} + end. + +-spec write(t()) -> {ok, t()} | {cont, t()} | {error, term()}. +write(U0 = #{started := false}) -> + case maybe_start_upload(U0) of + not_started -> + {ok, U0}; + {started, U1} -> + {cont, U1#{started := true}}; + {error, _} = Error -> + Error + end; +write(U0 = #{started := true}) -> + maybe_upload_part(U0). + +-spec complete(t()) -> {ok, t()} | {error, term()}. +complete( + #{ + started := true, + client := Client, + key := Key, + upload_id := UploadId + } = U0 +) -> + case upload_part(U0) of + {ok, #{etags := ETagsRev} = U1} -> + ETags = lists:reverse(ETagsRev), + case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of + ok -> + {ok, U1}; + {error, _} = Error -> + Error + end; + {error, _} = Error -> + Error + end; +complete(#{started := false} = Upload) -> + put_object(Upload). + +-spec abort(t()) -> ok_or_error(term()). +abort(#{ + started := true, + client := Client, + key := Key, + upload_id := UploadId +}) -> + case emqx_s3_client:abort_multipart(Client, Key, UploadId) of + ok -> + ok; + {error, _} = Error -> + Error + end; +abort(#{started := false}) -> + ok. + +%%-------------------------------------------------------------------- + +-spec format(t()) -> map(). +format(Upload = #{client := Client}) -> + Upload#{ + client => emqx_s3_client:format(Client), + buffer => [<<"...">>] + }. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +-spec maybe_start_upload(t()) -> not_started | {started, t()} | {error, term()}. +maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> + case BufferSize >= MinPartSize of + true -> + start_upload(Data); + false -> + not_started + end. + +-spec start_upload(t()) -> {started, t()} | {error, term()}. +start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) -> + case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of + {ok, UploadId} -> + NewData = Data#{upload_id => UploadId}, + {started, NewData}; + {error, _} = Error -> + Error + end. + +-spec maybe_upload_part(t()) -> ok_or_error(t(), term()). +maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> + case BufferSize >= MinPartSize of + true -> + upload_part(Data); + false -> + {ok, Data} + end. + +-spec upload_part(t()) -> ok_or_error(t(), term()). +upload_part(#{buffer_size := 0} = Upload) -> + {ok, Upload}; +upload_part( + #{ + client := Client, + key := Key, + upload_id := UploadId, + buffer := Buffer, + part_number := PartNumber, + etags := ETags + } = Upload +) -> + case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of + {ok, ETag} -> + {ok, Upload#{ + buffer => [], + buffer_size => 0, + part_number => PartNumber + 1, + etags => [{PartNumber, ETag} | ETags] + }}; + {error, _} = Error -> + Error + end. + +-spec put_object(t()) -> ok_or_error(t(), term()). +put_object( + #{ + client := Client, + key := Key, + upload_opts := UploadOpts, + buffer := Buffer + } = Upload +) -> + case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of + ok -> + {ok, Upload}; + {error, _} = Error -> + Error + end. + +is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> + BufferSize + iolist_size(WriteData) =< MaxPartSize. diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl index 160ecbfef..99a89eb92 100644 --- a/apps/emqx_s3/src/emqx_s3_uploader.erl +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -6,7 +6,7 @@ -include_lib("emqx/include/types.hrl"). --behaviour(gen_statem). +-behaviour(gen_server). -export([ start_link/3, @@ -25,46 +25,23 @@ -export([ init/1, - callback_mode/0, - handle_event/4, - terminate/3, - code_change/4, - format_status/1, - format_status/2 + handle_call/3, + handle_cast/2, + terminate/2, + format_status/1 ]). --export_type([config/0]). - --type config() :: #{ - min_part_size => pos_integer(), - max_part_size => pos_integer() -}. - -type data() :: #{ profile_id => emqx_s3:profile_id(), - client := emqx_s3_client:client(), - key := emqx_s3_client:key(), - upload_opts := emqx_s3_client:upload_options(), - buffer := iodata(), - buffer_size := non_neg_integer(), - min_part_size := pos_integer(), - max_part_size := pos_integer(), - upload_id := undefined | emqx_s3_client:upload_id(), - etags := [emqx_s3_client:etag()], - part_number := emqx_s3_client:part_number() + upload := emqx_s3_upload:t() | aborted }. -%% 5MB --define(DEFAULT_MIN_PART_SIZE, 5242880). -%% 5GB --define(DEFAULT_MAX_PART_SIZE, 5368709120). - -define(DEFAULT_TIMEOUT, 30000). -spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) -> - gen_statem:start_ret(). + gen_server:start_ret(). start_link(ProfileId, Key, UploadOpts) when is_list(Key) -> - gen_statem:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []). + gen_server:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []). -spec write(pid(), iodata()) -> ok_or_error(term()). write(Pid, WriteData) -> @@ -72,7 +49,7 @@ write(Pid, WriteData) -> -spec write(pid(), iodata(), timeout()) -> ok_or_error(term()). write(Pid, WriteData, Timeout) -> - gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout). + gen_server:call(Pid, {write, wrap(WriteData)}, Timeout). -spec complete(pid()) -> ok_or_error(term()). complete(Pid) -> @@ -80,7 +57,7 @@ complete(Pid) -> -spec complete(pid(), timeout()) -> ok_or_error(term()). complete(Pid, Timeout) -> - gen_statem:call(Pid, complete, Timeout). + gen_server:call(Pid, complete, Timeout). -spec abort(pid()) -> ok_or_error(term()). abort(Pid) -> @@ -88,7 +65,7 @@ abort(Pid) -> -spec abort(pid(), timeout()) -> ok_or_error(term()). abort(Pid, Timeout) -> - gen_statem:call(Pid, abort, Timeout). + gen_server:call(Pid, abort, Timeout). -spec shutdown(pid()) -> ok. shutdown(Pid) -> @@ -99,231 +76,73 @@ shutdown(Pid) -> %% gen_statem callbacks %%-------------------------------------------------------------------- -callback_mode() -> handle_event_function. - init({profile, ProfileId, Key, UploadOpts}) -> + _ = process_flag(trap_exit, true), {Bucket, ClientConfig, BaseOpts, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId), - Upload = #{ - profile_id => ProfileId, - client => client(Bucket, ClientConfig), - key => Key, - upload_opts => maps:merge(BaseOpts, UploadOpts) - }, - init({upload, UploaderConfig, Upload}); -init({upload, Config, Upload}) -> - process_flag(trap_exit, true), - {ok, upload_not_started, Upload#{ - buffer => [], - buffer_size => 0, - min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE), - max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE), - upload_id => undefined, - etags => [], - part_number => 1 - }}. + Upload = emqx_s3_upload:new( + client(Bucket, ClientConfig), + Key, + maps:merge(BaseOpts, UploadOpts), + UploaderConfig + ), + {ok, #{profile_id => ProfileId, upload => Upload}}. -handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) -> +-spec handle_call(_Call, gen_server:from(), data()) -> + {reply, _Result, data()} | {stop, _Reason, _Result, data()}. +handle_call({write, WriteDataWrapped}, _From, St0 = #{upload := U0}) -> WriteData = unwrap(WriteDataWrapped), - case is_valid_part(WriteData, Data0) of - true -> - handle_write(State, From, WriteData, Data0); - false -> - {keep_state_and_data, {reply, From, {error, {too_large, iolist_size(WriteData)}}}} + case emqx_s3_upload:append(WriteData, U0) of + {ok, U1} -> + handle_write(St0#{upload := U1}); + {error, _} = Error -> + {reply, Error, St0} end; -handle_event({call, From}, complete, upload_not_started, Data0) -> - case put_object(Data0) of +handle_call(complete, _From, St0 = #{upload := U0}) -> + case emqx_s3_upload:complete(U0) of + {ok, U1} -> + {stop, normal, ok, St0#{upload := U1}}; + {error, _} = Error -> + {stop, Error, Error, St0} + end; +handle_call(abort, _From, St = #{upload := Upload}) -> + case emqx_s3_upload:abort(Upload) of ok -> - {stop_and_reply, normal, {reply, From, ok}}; + {stop, normal, ok, St}; {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data0} - end; -handle_event({call, From}, complete, upload_started, Data0) -> - case complete_upload(Data0) of - {ok, Data1} -> - {stop_and_reply, normal, {reply, From, ok}, Data1}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data0} - end; -handle_event({call, From}, abort, upload_not_started, _Data) -> - {stop_and_reply, normal, {reply, From, ok}}; -handle_event({call, From}, abort, upload_started, Data0) -> - case abort_upload(Data0) of - ok -> - {stop_and_reply, normal, {reply, From, ok}}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data0} + {stop, Error, Error, St} end. -handle_write(upload_not_started, From, WriteData, Data0) -> - Data1 = append_buffer(Data0, WriteData), - case maybe_start_upload(Data1) of - not_started -> - {keep_state, Data1, {reply, From, ok}}; - {started, Data2} -> - case upload_part(Data2) of - {ok, Data3} -> - {next_state, upload_started, Data3, {reply, From, ok}}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data2} - end; +handle_write(St = #{upload := U0}) -> + case emqx_s3_upload:write(U0) of + {ok, U1} -> + {reply, ok, St#{upload := U1}}; + {cont, U1} -> + handle_write(St#{upload := U1}); {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data1} - end; -handle_write(upload_started, From, WriteData, Data0) -> - Data1 = append_buffer(Data0, WriteData), - case maybe_upload_part(Data1) of - {ok, Data2} -> - {keep_state, Data2, {reply, From, ok}}; - {error, _} = Error -> - {stop_and_reply, Error, {reply, From, Error}, Data1} + {stop, Error, Error, St} end. -terminate(Reason, _State, #{client := Client, upload_id := UploadId, key := Key}) when - (UploadId =/= undefined) andalso (Reason =/= normal) --> - emqx_s3_client:abort_multipart(Client, Key, UploadId); -terminate(_Reason, _State, _Data) -> - ok. +-spec handle_cast(_Cast, data()) -> {noreply, data()}. +handle_cast(_Cast, St) -> + {noreply, St}. -code_change(_OldVsn, StateName, State, _Extra) -> - {ok, StateName, State}. +-spec terminate(_Reason, data()) -> ok. +terminate(normal, _St) -> + ok; +terminate({shutdown, _}, _St) -> + ok; +terminate(_Reason, #{upload := Upload}) -> + emqx_s3_upload:abort(Upload). -format_status(#{data := #{client := Client} = Data} = Status) -> - Status#{ - data => Data#{ - client => emqx_s3_client:format(Client), - buffer => [<<"...">>] - } - }. - -format_status(_Opt, [PDict, State, #{client := Client} = Data]) -> - #{ - data => Data#{ - client => emqx_s3_client:format(Client), - buffer => [<<"...">>] - }, - state => State, - pdict => PDict - }. +format_status(#{state := State = #{upload := Upload}} = Status) -> + StateRedacted = State#{upload := emqx_s3_upload:format(Upload)}, + Status#{state := StateRedacted}. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- --spec maybe_start_upload(data()) -> not_started | {started, data()} | {error, term()}. -maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> - case BufferSize >= MinPartSize of - true -> - start_upload(Data); - false -> - not_started - end. - --spec start_upload(data()) -> {started, data()} | {error, term()}. -start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) -> - case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of - {ok, UploadId} -> - NewData = Data#{upload_id => UploadId}, - {started, NewData}; - {error, _} = Error -> - Error - end. - --spec maybe_upload_part(data()) -> ok_or_error(data(), term()). -maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> - case BufferSize >= MinPartSize of - true -> - upload_part(Data); - false -> - {ok, Data} - end. - --spec upload_part(data()) -> ok_or_error(data(), term()). -upload_part(#{buffer_size := 0} = Data) -> - {ok, Data}; -upload_part( - #{ - client := Client, - key := Key, - upload_id := UploadId, - buffer := Buffer, - part_number := PartNumber, - etags := ETags - } = Data -) -> - case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of - {ok, ETag} -> - NewData = Data#{ - buffer => [], - buffer_size => 0, - part_number => PartNumber + 1, - etags => [{PartNumber, ETag} | ETags] - }, - {ok, NewData}; - {error, _} = Error -> - Error - end. - --spec complete_upload(data()) -> ok_or_error(data(), term()). -complete_upload( - #{ - client := Client, - key := Key, - upload_id := UploadId - } = Data0 -) -> - case upload_part(Data0) of - {ok, #{etags := ETagsRev} = Data1} -> - ETags = lists:reverse(ETagsRev), - case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of - ok -> - {ok, Data1}; - {error, _} = Error -> - Error - end; - {error, _} = Error -> - Error - end. - --spec abort_upload(data()) -> ok_or_error(term()). -abort_upload( - #{ - client := Client, - key := Key, - upload_id := UploadId - } -) -> - case emqx_s3_client:abort_multipart(Client, Key, UploadId) of - ok -> - ok; - {error, _} = Error -> - Error - end. - --spec put_object(data()) -> ok_or_error(term()). -put_object( - #{ - client := Client, - key := Key, - upload_opts := UploadOpts, - buffer := Buffer - } -) -> - case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of - ok -> - ok; - {error, _} = Error -> - Error - end. - --spec append_buffer(data(), iodata()) -> data(). -append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) -> - Data#{ - buffer => [Buffer, WriteData], - buffer_size => BufferSize + iolist_size(WriteData) - }. - -compile({inline, [wrap/1, unwrap/1]}). wrap(Data) -> fun() -> Data end. @@ -331,8 +150,5 @@ wrap(Data) -> unwrap(WrappedData) -> WrappedData(). -is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> - BufferSize + iolist_size(WriteData) =< MaxPartSize. - client(Bucket, Config) -> emqx_s3_client:create(Bucket, Config). diff --git a/rel/i18n/emqx_bridge_s3.hocon b/rel/i18n/emqx_bridge_s3.hocon index fe10313e0..531ba9823 100644 --- a/rel/i18n/emqx_bridge_s3.hocon +++ b/rel/i18n/emqx_bridge_s3.hocon @@ -5,19 +5,4 @@ config_connector.label: config_connector.desc: """Configuration for a connector to S3 API compatible storage service.""" -s3_upload.label: -"""S3 Simple Upload""" -s3_upload.desc: -"""Action to upload a single object to the S3 service.""" - -s3_upload_parameters.label: -"""S3 Upload action parameters""" -s3_upload_parameters.desc: -"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content.""" - -s3_object_content.label: -"""S3 Object Content""" -s3_object_content.desc: -"""Content of the S3 object being uploaded. Supports templates.""" - } diff --git a/rel/i18n/emqx_bridge_s3_aggreg_upload.hocon b/rel/i18n/emqx_bridge_s3_aggreg_upload.hocon new file mode 100644 index 000000000..07239a32d --- /dev/null +++ b/rel/i18n/emqx_bridge_s3_aggreg_upload.hocon @@ -0,0 +1,64 @@ +emqx_bridge_s3_aggreg_upload { + +s3_aggregated_upload.label: +"""S3 Aggregated Upload""" +s3_aggregated_upload.desc: +"""Action that enables time-based aggregation of incoming events and uploading them to the S3 service as a single object.""" + +s3_aggregated_upload_parameters.label: +"""S3 Aggregated Upload action parameters""" +s3_aggregated_upload_parameters.desc: +"""Set of parameters for the aggregated upload action.""" + +s3_aggregation.label: +"""Aggregation parameters""" +s3_aggregation.desc: +"""Set of parameters governing the aggregation process.""" + +s3_aggregation_interval.label: +"""Time interval""" +s3_aggregation_interval.desc: +"""Amount of time events will be aggregated in a single object before uploading.""" + +s3_aggregation_max_records.label: +"""Maximum number of records""" +s3_aggregation_max_records.desc: +"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.
+If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key.""" + +s3_aggregated_container.label: +"""Container for aggregated events""" +s3_aggregated_container.desc: +"""Settings governing the file format of an upload containing aggregated events.""" + +s3_aggregated_container_csv.label: +"""CSV container""" +s3_aggregated_container_csv.desc: +"""Records (events) will be aggregated and uploaded as a CSV file.""" + +s3_aggregated_container_csv_column_order.label: +"""CSV column order""" +s3_aggregated_container_csv_column_order.desc: +"""Event fields that will be ordered first as columns in the resulting CSV file.
+Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order.""" + +s3_aggregated_upload_key.label: +"""S3 object key template""" +s3_aggregated_upload_key.desc: +"""Template for the S3 object key of an aggregated upload.
+Template may contain placeholders for the following variables: + +All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template.""" +} diff --git a/rel/i18n/emqx_bridge_s3_upload.hocon b/rel/i18n/emqx_bridge_s3_upload.hocon new file mode 100644 index 000000000..7d08cfaa5 --- /dev/null +++ b/rel/i18n/emqx_bridge_s3_upload.hocon @@ -0,0 +1,18 @@ +emqx_bridge_s3_upload { + +s3_upload.label: +"""S3 Simple Upload""" +s3_upload.desc: +"""Action to upload a single object to the S3 service.""" + +s3_upload_parameters.label: +"""S3 Upload action parameters""" +s3_upload_parameters.desc: +"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content.""" + +s3_object_content.label: +"""S3 Object Content""" +s3_object_content.desc: +"""Content of the S3 object being uploaded. Supports templates.""" + +} diff --git a/rel/i18n/emqx_s3_schema.hocon b/rel/i18n/emqx_s3_schema.hocon index 44f4bbc56..b8b963539 100644 --- a/rel/i18n/emqx_s3_schema.hocon +++ b/rel/i18n/emqx_s3_schema.hocon @@ -18,6 +18,13 @@ key.desc: key.label: """Object Key""" +upload_headers.label: +"""S3 object HTTP headers""" + +upload_headers.desc: +"""HTTP headers to include in the S3 object upload request.
+Useful to specify content type, content encoding, etc. of the S3 object.""" + host.desc: """The host of the S3 endpoint."""