diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index a8aaf9fdd..dc71d5806 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -118,7 +118,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_pulsar_action_info, emqx_bridge_greptimedb_action_info, emqx_bridge_tdengine_action_info, - emqx_bridge_s3_action_info + emqx_bridge_s3_upload_action_info, + emqx_bridge_s3_aggreg_upload_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index a6000067a..5cdf3fb82 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -10,9 +10,15 @@ emqx_s3 ]}, {env, [ - {emqx_action_info_modules, [emqx_bridge_s3_action_info]}, - {emqx_connector_info_modules, [emqx_bridge_s3_connector_info]} + {emqx_action_info_modules, [ + emqx_bridge_s3_upload_action_info, + emqx_bridge_s3_aggreg_upload_action_info + ]}, + {emqx_connector_info_modules, [ + emqx_bridge_s3_connector_info + ]} ]}, + {mod, {emqx_bridge_s3_app, []}}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl index 79cc560d2..49f033554 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl @@ -19,7 +19,6 @@ ]). -export([ - bridge_v2_examples/1, connector_examples/1 ]). @@ -39,58 +38,11 @@ fields(Field) when Field == "post_connector" -> emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config)); -fields(Field) when - Field == "get_bridge_v2"; - Field == "put_bridge_v2"; - Field == "post_bridge_v2" --> - emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); -fields(action) -> - {?ACTION, - hoconsc:mk( - hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), - #{ - desc => <<"S3 Action Config">>, - required => false - } - )}; fields("config_connector") -> emqx_connector_schema:common_fields() ++ fields(s3_connector_config); -fields(?ACTION) -> - emqx_bridge_v2_schema:make_producer_action_schema( - hoconsc:mk( - ?R_REF(s3_upload_parameters), - #{ - required => true, - desc => ?DESC(s3_upload) - } - ), - #{ - resource_opts_ref => ?R_REF(s3_action_resource_opts) - } - ); fields(s3_connector_config) -> emqx_s3_schema:fields(s3_client) ++ emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts); -fields(s3_upload_parameters) -> - emqx_s3_schema:fields(s3_upload) ++ - [ - {content, - hoconsc:mk( - emqx_schema:template(), - #{ - required => false, - default => <<"${.}">>, - desc => ?DESC(s3_object_content) - } - )} - ]; -fields(s3_action_resource_opts) -> - UnsupportedOpts = [batch_size, batch_time], - lists:filter( - fun({N, _}) -> not lists:member(N, UnsupportedOpts) end, - emqx_bridge_v2_schema:action_resource_opts_fields() - ); fields(s3_connector_resource_opts) -> CommonOpts = emqx_connector_schema:common_resource_opts_subfields(), lists:filter( @@ -100,14 +52,6 @@ fields(s3_connector_resource_opts) -> desc("config_connector") -> ?DESC(config_connector); -desc(?ACTION) -> - ?DESC(s3_upload); -desc(s3_upload) -> - ?DESC(s3_upload); -desc(s3_upload_parameters) -> - ?DESC(s3_upload_parameters); -desc(s3_action_resource_opts) -> - ?DESC(emqx_resource_schema, resource_opts); desc(s3_connector_resource_opts) -> ?DESC(emqx_resource_schema, resource_opts); desc(_Name) -> @@ -115,54 +59,6 @@ desc(_Name) -> %% Examples -bridge_v2_examples(Method) -> - [ - #{ - <<"s3">> => #{ - summary => <<"S3 Simple Upload">>, - value => action_example(Method) - } - } - ]. - -action_example(post) -> - maps:merge( - action_example(put), - #{ - type => atom_to_binary(?ACTION), - name => <<"my_s3_action">> - } - ); -action_example(get) -> - maps:merge( - action_example(put), - #{ - status => <<"connected">>, - node_status => [ - #{ - node => <<"emqx@localhost">>, - status => <<"connected">> - } - ] - } - ); -action_example(put) -> - #{ - enable => true, - connector => <<"my_s3_connector">>, - description => <<"My action">>, - parameters => #{ - bucket => <<"${clientid}">>, - key => <<"${topic}">>, - content => <<"${payload}">>, - acl => <<"public_read">> - }, - resource_opts => #{ - query_mode => <<"sync">>, - inflight_window => 10 - } - }. - connector_examples(Method) -> [ #{ diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl index 6d500d056..62a80d260 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl @@ -5,7 +5,12 @@ -ifndef(__EMQX_BRIDGE_S3_HRL__). -define(__EMQX_BRIDGE_S3_HRL__, true). --define(ACTION, s3). +%% Actions +-define(ACTION_UPLOAD, s3). +-define(BRIDGE_TYPE_UPLOAD, <<"s3">>). +-define(ACTION_AGGREGATED_UPLOAD, s3_aggregated_upload). +-define(BRIDGE_TYPE_AGGREGATED_UPLOAD, <<"s3_aggregated_upload">>). + -define(CONNECTOR, s3). -endif. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl new file mode 100644 index 000000000..503b864a7 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_buffer.erl @@ -0,0 +1,138 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% This module provides pretty stupid interface for writing and reading +%% Erlang terms to/from a file descriptor (i.e. IO device), with a goal +%% of being able to write and read terms in a streaming fashion, and to +%% survive partial corruption. +%% +%% Layout of the file is as follows: +%% ``` +%% ETF(Header { Metadata }) +%% ETF(Record1 ByteSize) +%% ETF(Record1) +%% ETF(Record2 ByteSize) +%% ETF(Record2) +%% ... +%% ``` +%% ^ ETF = Erlang External Term Format (i.e. `erlang:term_to_binary/1`). +-module(emqx_bridge_s3_aggreg_buffer). + +-export([ + new_writer/2, + write/2, + takeover/1 +]). + +-export([ + new_reader/1, + read/1 +]). + +-export_type([writer/0, reader/0]). + +-record(reader, { + fd :: file:io_device() | eof, + buffer :: binary(), + hasread = 0 :: non_neg_integer() +}). + +-type writer() :: file:io_device(). +-type reader() :: #reader{}. + +%% + +-define(VSN, 1). +-define(HEADER(MD), [?MODULE, ?VSN, MD]). + +-define(READAHEAD_BYTES, 64 * 4096). +-define(SANE_TERM_SIZE, 256 * 1024 * 1024). + +%% + +-spec new_writer(file:io_device(), _Meta) -> writer(). +new_writer(FD, Meta) -> + %% TODO: Validate header is not too big? + Header = term_to_iovec(?HEADER(Meta)), + case file:write(FD, Header) of + ok -> + FD; + {error, Reason} -> + error({buffer_write_failed, Reason}) + end. + +-spec write(_Term, writer()) -> ok | {error, file:posix()}. +write(Term, FD) -> + IOData = term_to_iovec(Term), + Marker = term_to_binary(iolist_size(IOData)), + file:write(FD, [Marker | IOData]). + +%% + +-spec new_reader(file:io_device()) -> {_Meta, reader()}. +new_reader(FD) -> + Reader0 = #reader{fd = FD, buffer = <<>>}, + Reader1 = read_buffered(?READAHEAD_BYTES, Reader0), + case read_next_term(Reader1) of + {?HEADER(MD), Reader} -> + {MD, Reader}; + {UnexpectedHeader, _Reader} -> + error({buffer_unexpected_header, UnexpectedHeader}); + eof -> + error({buffer_incomplete, header}) + end. + +-spec read(reader()) -> {_Term, reader()} | eof. +read(Reader0) -> + case read_next_term(read_buffered(_LargeEnough = 16, Reader0)) of + {Size, Reader1} when is_integer(Size) andalso Size > 0 andalso Size < ?SANE_TERM_SIZE -> + case read_next_term(read_buffered(Size, Reader1)) of + {Term, Reader} -> + {Term, Reader}; + eof -> + error({buffer_incomplete, Size}) + end; + {UnexpectedSize, _Reader} -> + error({buffer_unexpected_record_size, UnexpectedSize}); + eof -> + eof + end. + +-spec takeover(reader()) -> writer(). +takeover(#reader{fd = FD, hasread = HasRead}) -> + case file:position(FD, HasRead) of + {ok, HasRead} -> + case file:truncate(FD) of + ok -> + FD; + {error, Reason} -> + error({buffer_takeover_failed, Reason}) + end; + {error, Reason} -> + error({buffer_takeover_failed, Reason}) + end. + +read_next_term(#reader{fd = eof, buffer = <<>>}) -> + eof; +read_next_term(Reader = #reader{buffer = Buffer, hasread = HasRead}) -> + {Term, UsedBytes} = erlang:binary_to_term(Buffer, [safe, used]), + BufferSize = byte_size(Buffer), + BufferLeft = binary:part(Buffer, UsedBytes, BufferSize - UsedBytes), + {Term, Reader#reader{buffer = BufferLeft, hasread = HasRead + UsedBytes}}. + +read_buffered(_Size, Reader = #reader{fd = eof}) -> + Reader; +read_buffered(Size, Reader = #reader{fd = FD, buffer = Buffer0}) -> + BufferSize = byte_size(Buffer0), + ReadSize = erlang:max(Size, ?READAHEAD_BYTES), + case BufferSize < Size andalso file:read(FD, ReadSize) of + false -> + Reader; + {ok, Data} -> + Reader#reader{buffer = <>}; + eof -> + Reader#reader{fd = eof}; + {error, Reason} -> + error({buffer_read_failed, Reason}) + end. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl new file mode 100644 index 000000000..31ce1fcc7 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl @@ -0,0 +1,98 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% CSV container implementation for `emqx_bridge_s3_aggregator`. +-module(emqx_bridge_s3_aggreg_csv). + +%% Container API +-export([ + new/1, + fill/2, + close/1 +]). + +-export_type([container/0]). + +-record(csv, { + columns :: [binary()] | undefined, + order :: [binary()], + separator :: char() | iodata(), + delimiter :: char() | iodata(), + quoting_mp :: _ReMP +}). + +-type container() :: #csv{}. +-type options() :: #{column_order => [column()]}. + +-type record() :: emqx_bridge_s3_aggregator:record(). +-type column() :: binary(). + +%% + +-spec new(options()) -> container(). +new(Opts) -> + {ok, MP} = re:compile("[\\[\\],\\r\\n\"]", [unicode]), + #csv{ + order = maps:get(column_order, Opts, []), + separator = $,, + delimiter = $\n, + quoting_mp = MP + }. + +-spec fill([record()], container()) -> {iodata(), container()}. +fill(Records = [Record | _], CSV0 = #csv{columns = undefined}) -> + Columns = mk_columns(Record, CSV0), + Header = emit_header(Columns, CSV0), + {Writes, CSV} = fill(Records, CSV0#csv{columns = Columns}), + {[Header | Writes], CSV}; +fill(Records, CSV) -> + Writes = [emit_row(R, CSV) || R <- Records], + {Writes, CSV}. + +-spec close(container()) -> iodata(). +close(#csv{}) -> + []. + +%% + +mk_columns(Record, #csv{order = ColumnOrder}) -> + Columns = lists:sort(maps:keys(Record)), + OrderedFirst = [CO || CO <- ColumnOrder, lists:member(CO, Columns)], + Unoredered = Columns -- ColumnOrder, + OrderedFirst ++ Unoredered. + +-spec emit_header([column()], container()) -> iodata(). +emit_header([C], #csv{delimiter = Delim}) -> + [C, Delim]; +emit_header([C | Rest], CSV = #csv{separator = Sep}) -> + [C, Sep | emit_header(Rest, CSV)]; +emit_header([], #csv{delimiter = Delim}) -> + [Delim]. + +-spec emit_row(record(), container()) -> iodata(). +emit_row(Record, CSV = #csv{columns = Columns}) -> + emit_row(Record, Columns, CSV). + +emit_row(Record, [C], CSV = #csv{delimiter = Delim}) -> + [emit_cell(C, Record, CSV), Delim]; +emit_row(Record, [C | Rest], CSV = #csv{separator = Sep}) -> + [emit_cell(C, Record, CSV), Sep | emit_row(Record, Rest, CSV)]; +emit_row(#{}, [], #csv{delimiter = Delim}) -> + [Delim]. + +emit_cell(Column, Record, CSV) -> + case maps:get(Column, Record, undefined) of + undefined -> + _Empty = ""; + Value -> + encode_cell(emqx_template:to_string(Value), CSV) + end. + +encode_cell(V, #csv{quoting_mp = MP}) -> + case re:run(V, MP, []) of + nomatch -> + V; + _ -> + [$", re:replace(V, <<"\"">>, <<"\"\"">>, [global, unicode]), $"] + end. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl new file mode 100644 index 000000000..c548226ab --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_delivery.erl @@ -0,0 +1,162 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% This module takes aggregated records from a buffer and delivers them to S3, +%% wrapped in a configurable container (though currently there's only CSV). +-module(emqx_bridge_s3_aggreg_delivery). + +-include_lib("snabbkaffe/include/trace.hrl"). +-include("emqx_bridge_s3_aggregator.hrl"). + +-export([start_link/3]). + +%% Internal exports +-export([run_delivery/3]). + +-behaviour(emqx_template). +-export([lookup/2]). + +-record(delivery, { + name :: _Name, + container :: emqx_bridge_s3_aggreg_csv:container(), + reader :: emqx_bridge_s3_aggreg_buffer:reader(), + upload :: emqx_s3_upload:t(), + empty :: boolean() +}). + +%% + +start_link(Name, Buffer, Opts) -> + proc_lib:start_link(?MODULE, run_delivery, [Name, Buffer, Opts]). + +%% + +run_delivery(Name, Buffer, Opts) -> + ?tp(s3_aggreg_delivery_started, #{action => Name, buffer => Buffer}), + Reader = open_buffer(Buffer), + Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), + ok = proc_lib:init_ack({ok, self()}), + loop_deliver(Delivery). + +init_delivery(Name, Reader, Buffer, Opts = #{container := ContainerOpts}) -> + #delivery{ + name = Name, + container = mk_container(ContainerOpts), + reader = Reader, + upload = mk_upload(Buffer, Opts), + empty = true + }. + +loop_deliver(Delivery = #delivery{reader = Reader0}) -> + case emqx_bridge_s3_aggreg_buffer:read(Reader0) of + {Records = [#{} | _], Reader} -> + loop_deliver_records(Records, Delivery#delivery{reader = Reader}); + {[], Reader} -> + loop_deliver(Delivery#delivery{reader = Reader}); + eof -> + complete_delivery(Delivery); + {Unexpected, _Reader} -> + exit({buffer_unexpected_record, Unexpected}) + end. + +loop_deliver_records(Records, Delivery = #delivery{container = Container0, upload = Upload0}) -> + {Writes, Container} = emqx_bridge_s3_aggreg_csv:fill(Records, Container0), + {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), + loop_deliver_upload(Delivery#delivery{ + container = Container, + upload = Upload, + empty = false + }). + +loop_deliver_upload(Delivery = #delivery{upload = Upload0}) -> + case emqx_s3_upload:write(Upload0) of + {ok, Upload} -> + loop_deliver(Delivery#delivery{upload = Upload}); + {cont, Upload} -> + loop_deliver_upload(Delivery#delivery{upload = Upload}); + {error, Reason} -> + %% TODO: retries + _ = emqx_s3_upload:abort(Upload0), + exit({upload_failed, Reason}) + end. + +complete_delivery(#delivery{name = Name, empty = true}) -> + ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => empty}), + exit({shutdown, {skipped, empty}}); +complete_delivery(#delivery{name = Name, container = Container, upload = Upload0}) -> + Trailer = emqx_bridge_s3_aggreg_csv:close(Container), + {ok, Upload} = emqx_s3_upload:append(Trailer, Upload0), + case emqx_s3_upload:complete(Upload) of + {ok, Completed} -> + ?tp(s3_aggreg_delivery_completed, #{action => Name, upload => Completed}), + ok; + {error, Reason} -> + %% TODO: retries + _ = emqx_s3_upload:abort(Upload), + exit({upload_failed, Reason}) + end. + +mk_container(#{type := csv, column_order := OrderOpt}) -> + %% TODO: Deduplicate? + ColumnOrder = lists:map(fun emqx_utils_conv:bin/1, OrderOpt), + emqx_bridge_s3_aggreg_csv:new(#{column_order => ColumnOrder}). + +mk_upload( + Buffer, + Opts = #{ + bucket := Bucket, + upload_options := UploadOpts, + client_config := Config, + uploader_config := UploaderConfig + } +) -> + Client = emqx_s3_client:create(Bucket, Config), + Key = mk_object_key(Buffer, Opts), + emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). + +mk_object_key(Buffer, #{action := Name, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {Name, Buffer}}). + +open_buffer(#buffer{filename = Filename}) -> + case file:open(Filename, [read, binary, raw]) of + {ok, FD} -> + {_Meta, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(FD), + Reader; + {error, Reason} -> + error({buffer_open_failed, Reason}) + end. + +%% + +-spec lookup(emqx_template:accessor(), {_Name, buffer()}) -> + {ok, integer() | string()} | {error, undefined}. +lookup([<<"action">>], {Name, _Buffer}) -> + {ok, mk_fs_safe_string(Name)}; +lookup(Accessor, {_Name, Buffer = #buffer{}}) -> + lookup_buffer_var(Accessor, Buffer); +lookup(_Accessor, _Context) -> + {error, undefined}. + +lookup_buffer_var([<<"datetime">>, Format], #buffer{since = Since}) -> + {ok, format_timestamp(Since, Format)}; +lookup_buffer_var([<<"datetime_until">>, Format], #buffer{until = Until}) -> + {ok, format_timestamp(Until, Format)}; +lookup_buffer_var([<<"sequence">>], #buffer{seq = Seq}) -> + {ok, Seq}; +lookup_buffer_var([<<"node">>], #buffer{}) -> + {ok, mk_fs_safe_string(atom_to_binary(erlang:node()))}; +lookup_buffer_var(_Binding, _Context) -> + {error, undefined}. + +format_timestamp(Timestamp, <<"rfc3339utc">>) -> + String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]), + mk_fs_safe_string(String); +format_timestamp(Timestamp, <<"rfc3339">>) -> + String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]), + mk_fs_safe_string(String); +format_timestamp(Timestamp, <<"unix">>) -> + Timestamp. + +mk_fs_safe_string(String) -> + unicode:characters_to_binary(string:replace(String, ":", "_", all)). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl new file mode 100644 index 000000000..a62b47c9d --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl @@ -0,0 +1,240 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_s3.hrl"). + +-define(ACTION, ?ACTION_AGGREGATED_UPLOAD). + +-define(DEFAULT_BATCH_SIZE, 100). +-define(DEFAULT_BATCH_TIME, <<"10ms">>). + +-behaviour(hocon_schema). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% Interpreting options +-export([ + mk_key_template/1 +]). + +%% emqx_bridge_v2_schema API +-export([bridge_v2_examples/1]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_s3". + +roots() -> + []. + +fields(Field) when + Field == "get_bridge_v2"; + Field == "put_bridge_v2"; + Field == "post_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); +fields(action) -> + {?ACTION, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), + #{ + desc => <<"S3 Aggregated Upload Action Config">>, + required => false + } + )}; +fields(?ACTION) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + ?R_REF(s3_aggregated_upload_parameters), + #{ + required => true, + desc => ?DESC(s3_aggregated_upload) + } + ), + #{ + resource_opts_ref => ?R_REF(s3_aggreg_upload_resource_opts) + } + ); +fields(s3_aggregated_upload_parameters) -> + [ + {container, + hoconsc:mk( + %% TODO: Support selectors once there are more than one container. + hoconsc:union(fun + (all_union_members) -> [?REF(s3_container_csv)]; + ({value, _Valur}) -> [?REF(s3_container_csv)] + end), + #{ + required => true, + default => #{<<"type">> => <<"csv">>}, + desc => ?DESC(s3_aggregated_container) + } + )}, + {aggregation, + hoconsc:mk( + ?REF(s3_aggregation), + #{ + required => true, + desc => ?DESC(s3_aggregation) + } + )} + ] ++ + emqx_s3_schema:fields(s3_upload) ++ + emqx_s3_schema:fields(s3_uploader); +fields(s3_container_csv) -> + [ + {type, + hoconsc:mk( + csv, + #{ + required => true, + desc => ?DESC(s3_aggregated_container_csv) + } + )}, + {column_order, + hoconsc:mk( + hoconsc:array(string()), + #{ + required => false, + default => [], + desc => ?DESC(s3_aggregated_container_csv_column_order) + } + )} + ]; +fields(s3_aggregation) -> + [ + %% TODO: Needs bucketing? (e.g. messages falling in this 1h interval) + {time_interval, + hoconsc:mk( + emqx_schema:duration_s(), + #{ + required => false, + default => <<"1h">>, + desc => ?DESC(s3_aggregation_interval) + } + )}, + {max_records, + hoconsc:mk( + pos_integer(), + #{ + required => false, + default => <<"1000000">>, + desc => ?DESC(s3_aggregation_max_records) + } + )} + ]; +fields(s3_aggreg_upload_resource_opts) -> + %% NOTE: This action should benefit from generous batching defaults. + emqx_bridge_v2_schema:action_resource_opts_fields([ + {batch_size, #{default => ?DEFAULT_BATCH_SIZE}}, + {batch_time, #{default => ?DEFAULT_BATCH_TIME}} + ]). + +desc(?ACTION) -> + ?DESC(s3_aggregated_upload); +desc(s3_aggregated_upload_parameters) -> + ?DESC(s3_aggregated_upload_parameters); +desc(s3_aggreg_upload_resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +%% Interpreting options + +-spec mk_key_template(string()) -> emqx_template:str(). +mk_key_template(Key) -> + Template = emqx_template:parse(Key), + {_, BindingErrors} = emqx_template:render(Template, #{}), + {UsedBindings, _} = lists:unzip(BindingErrors), + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + Template; + false -> + Template ++ SuffixTemplate + end. + +mk_suffix_template(UsedBindings) -> + RequiredBindings = ["action", "node", "datetime.", "sequence"], + SuffixBindings = [ + mk_default_binding(RB) + || RB <- RequiredBindings, + lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings) + ], + SuffixTemplate = [["/", B] || B <- SuffixBindings], + emqx_template:parse(SuffixTemplate). + +mk_default_binding("datetime.") -> + "${datetime.rfc3339utc}"; +mk_default_binding(Binding) -> + "${" ++ Binding ++ "}". + +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"s3_aggregated_upload">> => #{ + summary => <<"S3 Aggregated Upload">>, + value => s3_action_example(Method) + } + } + ]. + +s3_action_example(post) -> + maps:merge( + s3_action_example(put), + #{ + type => atom_to_binary(?ACTION_UPLOAD), + name => <<"my_s3_action">> + } + ); +s3_action_example(get) -> + maps:merge( + s3_action_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +s3_action_example(put) -> + #{ + enable => true, + connector => <<"my_s3_connector">>, + description => <<"My action">>, + parameters => #{ + bucket => <<"mqtt-aggregated">>, + key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>, + acl => <<"public_read">>, + aggregation => #{ + time_interval => <<"15m">>, + max_records => 100_000 + }, + <<"container">> => #{ + type => <<"csv">>, + column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>] + } + }, + resource_opts => #{ + health_check_interval => <<"10s">>, + query_mode => <<"async">>, + inflight_window => 100 + } + }. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl new file mode 100644 index 000000000..b179073e5 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl @@ -0,0 +1,21 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload_action_info). + +-behaviour(emqx_action_info). + +-include("emqx_bridge_s3.hrl"). + +-export([ + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +action_type_name() -> ?ACTION_AGGREGATED_UPLOAD. + +connector_type_name() -> s3. + +schema_module() -> emqx_bridge_s3_aggreg_upload. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl new file mode 100644 index 000000000..973187b7e --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_sup.erl @@ -0,0 +1,72 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload_sup). + +-export([ + start_link/3, + start_link_delivery_sup/2 +]). + +-export([ + start_delivery/2, + start_delivery_proc/3 +]). + +-behaviour(supervisor). +-export([init/1]). + +-define(SUPREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}). + +%% + +start_link(Name, AggregOpts, DeliveryOpts) -> + supervisor:start_link(?MODULE, {root, Name, AggregOpts, DeliveryOpts}). + +start_link_delivery_sup(Name, DeliveryOpts) -> + supervisor:start_link(?SUPREF(Name), ?MODULE, {delivery, Name, DeliveryOpts}). + +%% + +start_delivery(Name, Buffer) -> + supervisor:start_child(?SUPREF(Name), [Buffer]). + +start_delivery_proc(Name, DeliveryOpts, Buffer) -> + emqx_bridge_s3_aggreg_delivery:start_link(Name, Buffer, DeliveryOpts). + +%% + +init({root, Name, AggregOpts, DeliveryOpts}) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 5 + }, + AggregatorChildSpec = #{ + id => aggregator, + start => {emqx_bridge_s3_aggregator, start_link, [Name, AggregOpts]}, + type => worker, + restart => permanent + }, + DeliverySupChildSpec = #{ + id => delivery_sup, + start => {?MODULE, start_link_delivery_sup, [Name, DeliveryOpts]}, + type => supervisor, + restart => permanent + }, + {ok, {SupFlags, [DeliverySupChildSpec, AggregatorChildSpec]}}; +init({delivery, Name, DeliveryOpts}) -> + SupFlags = #{ + strategy => simple_one_for_one, + intensity => 100, + period => 5 + }, + ChildSpec = #{ + id => delivery, + start => {?MODULE, start_delivery_proc, [Name, DeliveryOpts]}, + type => worker, + restart => temporary, + shutdown => 1000 + }, + {ok, {SupFlags, [ChildSpec]}}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl new file mode 100644 index 000000000..c49a29cb8 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.erl @@ -0,0 +1,485 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% This module manages buffers for aggregating records and offloads them +%% to separate "delivery" processes when they are full or time interval +%% is over. +-module(emqx_bridge_s3_aggregator). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +-include("emqx_bridge_s3_aggregator.hrl"). + +-export([ + start_link/2, + push_records/3, + tick/2, + take_error/1 +]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-export_type([ + record/0, + timestamp/0 +]). + +%% Record. +-type record() :: #{binary() => _}. + +%% Unix timestamp, seconds since epoch. +-type timestamp() :: _Seconds :: non_neg_integer(). + +%% + +-define(VSN, 1). +-define(SRVREF(NAME), {via, gproc, {n, l, {?MODULE, NAME}}}). + +%% + +start_link(Name, Opts) -> + gen_server:start_link(?SRVREF(Name), ?MODULE, mk_state(Name, Opts), []). + +push_records(Name, Timestamp, Records = [_ | _]) -> + %% FIXME: Error feedback. + case pick_buffer(Name, Timestamp) of + undefined -> + BufferNext = next_buffer(Name, Timestamp), + write_records_limited(Name, BufferNext, Records); + Buffer -> + write_records_limited(Name, Buffer, Records) + end; +push_records(_Name, _Timestamp, []) -> + ok. + +tick(Name, Timestamp) -> + case pick_buffer(Name, Timestamp) of + #buffer{} -> + ok; + _Outdated -> + send_close_buffer(Name, Timestamp) + end. + +take_error(Name) -> + gen_server:call(?SRVREF(Name), take_error). + +%% + +write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) -> + write_records(Name, Buffer, Records); +write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records) -> + NR = length(Records), + case inc_num_records(Buffer, NR) of + NR -> + %% NOTE: Allow unconditionally if it's the first write. + write_records(Name, Buffer, Records); + NWritten when NWritten > MaxRecords -> + NextBuffer = rotate_buffer(Name, Buffer), + write_records_limited(Name, NextBuffer, Records); + _ -> + write_records(Name, Buffer, Records) + end. + +write_records(Name, Buffer = #buffer{fd = Writer}, Records) -> + case emqx_bridge_s3_aggreg_buffer:write(Records, Writer) of + ok -> + ?tp(s3_aggreg_records_written, #{action => Name, records => Records}), + ok; + {error, Reason} when Reason == terminated orelse Reason == closed -> + BufferNext = rotate_buffer(Name, Buffer), + write_records(Name, BufferNext, Records); + {error, _} = Error -> + Error + end. + +inc_num_records(#buffer{cnt_records = Counter}, Size) -> + inc_counter(Counter, Size). + +next_buffer(Name, Timestamp) -> + gen_server:call(?SRVREF(Name), {next_buffer, Timestamp}). + +rotate_buffer(Name, #buffer{fd = FD}) -> + gen_server:call(?SRVREF(Name), {rotate_buffer, FD}). + +send_close_buffer(Name, Timestamp) -> + gen_server:cast(?SRVREF(Name), {close_buffer, Timestamp}). + +%% + +-record(st, { + name :: _Name, + tab :: ets:tid() | undefined, + buffer :: buffer() | undefined, + queued :: buffer() | undefined, + deliveries = #{} :: #{reference() => buffer()}, + errors = queue:new() :: queue:queue(_Error), + interval :: emqx_schema:duration_s(), + max_records :: pos_integer(), + work_dir :: file:filename() +}). + +-type state() :: #st{}. + +mk_state(Name, Opts) -> + Interval = maps:get(time_interval, Opts), + MaxRecords = maps:get(max_records, Opts), + WorkDir = maps:get(work_dir, Opts), + ok = ensure_workdir(WorkDir), + #st{ + name = Name, + interval = Interval, + max_records = MaxRecords, + work_dir = WorkDir + }. + +ensure_workdir(WorkDir) -> + %% NOTE + %% Writing MANIFEST as a means to ensure the work directory is writable. It's not + %% (yet) read back because there's only one version of the implementation. + ok = filelib:ensure_path(WorkDir), + ok = write_manifest(WorkDir). + +write_manifest(WorkDir) -> + Manifest = #{<<"version">> => ?VSN}, + file:write_file(filename:join(WorkDir, "MANIFEST"), hocon_pp:do(Manifest, #{})). + +%% + +-spec init(state()) -> {ok, state()}. +init(St0 = #st{name = Name}) -> + _ = erlang:process_flag(trap_exit, true), + St1 = St0#st{tab = create_tab(Name)}, + St = recover(St1), + _ = announce_current_buffer(St), + {ok, St}. + +handle_call({next_buffer, Timestamp}, _From, St0) -> + St = #st{buffer = Buffer} = handle_next_buffer(Timestamp, St0), + {reply, Buffer, St, 0}; +handle_call({rotate_buffer, FD}, _From, St0) -> + St = #st{buffer = Buffer} = handle_rotate_buffer(FD, St0), + {reply, Buffer, St, 0}; +handle_call(take_error, _From, St0) -> + {MaybeError, St} = handle_take_error(St0), + {reply, MaybeError, St}. + +handle_cast({close_buffer, Timestamp}, St) -> + {noreply, handle_close_buffer(Timestamp, St)}; +handle_cast(_Cast, St) -> + {noreply, St}. + +handle_info(timeout, St) -> + {noreply, handle_queued_buffer(St)}; +handle_info({'DOWN', MRef, _, Pid, Reason}, St0 = #st{name = Name, deliveries = Ds0}) -> + case maps:take(MRef, Ds0) of + {Buffer, Ds} -> + St = St0#st{deliveries = Ds}, + {noreply, handle_delivery_exit(Buffer, Reason, St)}; + error -> + ?SLOG(notice, #{ + msg => "unexpected_down_signal", + action => Name, + pid => Pid, + reason => Reason + }), + {noreply, St0} + end; +handle_info(_Msg, St) -> + {noreply, St}. + +terminate(_Reason, #st{name = Name}) -> + cleanup_tab(Name). + +%% + +handle_next_buffer(Timestamp, St = #st{buffer = #buffer{until = Until}}) when Timestamp < Until -> + St; +handle_next_buffer(Timestamp, St0 = #st{buffer = Buffer = #buffer{since = PrevSince}}) -> + BufferClosed = close_buffer(Buffer), + St = enqueue_closed_buffer(BufferClosed, St0), + handle_next_buffer(Timestamp, PrevSince, St); +handle_next_buffer(Timestamp, St = #st{buffer = undefined}) -> + handle_next_buffer(Timestamp, Timestamp, St). + +handle_next_buffer(Timestamp, PrevSince, St0) -> + NextBuffer = allocate_next_buffer(Timestamp, PrevSince, St0), + St = St0#st{buffer = NextBuffer}, + _ = announce_current_buffer(St), + St. + +handle_rotate_buffer( + FD, + St0 = #st{buffer = Buffer = #buffer{since = Since, seq = Seq, fd = FD}} +) -> + BufferClosed = close_buffer(Buffer), + NextBuffer = allocate_buffer(Since, Seq + 1, St0), + St = enqueue_closed_buffer(BufferClosed, St0#st{buffer = NextBuffer}), + _ = announce_current_buffer(St), + St; +handle_rotate_buffer(_ClosedFD, St) -> + St. + +enqueue_closed_buffer(Buffer, St = #st{queued = undefined}) -> + St#st{queued = Buffer}; +enqueue_closed_buffer(Buffer, St0) -> + %% NOTE: Should never really happen unless interval / max records are too tight. + St = handle_queued_buffer(St0), + St#st{queued = Buffer}. + +handle_queued_buffer(St = #st{queued = undefined}) -> + St; +handle_queued_buffer(St = #st{queued = Buffer}) -> + enqueue_delivery(Buffer, St#st{queued = undefined}). + +allocate_next_buffer(Timestamp, PrevSince, St = #st{interval = Interval}) -> + Since = compute_since(Timestamp, PrevSince, Interval), + allocate_buffer(Since, 0, St). + +compute_since(Timestamp, PrevSince, Interval) -> + Timestamp - (Timestamp - PrevSince) rem Interval. + +allocate_buffer(Since, Seq, St = #st{name = Name}) -> + Buffer = #buffer{filename = Filename, cnt_records = Counter} = mk_buffer(Since, Seq, St), + {ok, FD} = file:open(Filename, [write, binary]), + Writer = emqx_bridge_s3_aggreg_buffer:new_writer(FD, _Meta = []), + _ = add_counter(Counter), + ?tp(s3_aggreg_buffer_allocated, #{action => Name, filename => Filename}), + Buffer#buffer{fd = Writer}. + +recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) -> + {ok, FD} = file:open(Filename, [read, write, binary]), + case recover_buffer_writer(FD, Filename) of + {ok, Writer, NWritten} -> + _ = add_counter(Counter, NWritten), + Buffer#buffer{fd = Writer}; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "existing_buffer_recovery_failed", + filename => Filename, + reason => Reason, + details => "Buffer is corrupted beyond repair, will be discarded." + }), + _ = file:close(FD), + _ = file:delete(Filename), + undefined + end. + +recover_buffer_writer(FD, Filename) -> + try emqx_bridge_s3_aggreg_buffer:new_reader(FD) of + {_Meta, Reader} -> recover_buffer_writer(FD, Filename, Reader, 0) + catch + error:Reason -> + {error, Reason} + end. + +recover_buffer_writer(FD, Filename, Reader0, NWritten) -> + try emqx_bridge_s3_aggreg_buffer:read(Reader0) of + {Records, Reader} when is_list(Records) -> + recover_buffer_writer(FD, Filename, Reader, NWritten + length(Records)); + {Unexpected, _Reader} -> + %% Buffer is corrupted, should be discarded. + {error, {buffer_unexpected_record, Unexpected}}; + eof -> + %% Buffer is fine, continue writing at the end. + {ok, FD, NWritten} + catch + error:Reason -> + %% Buffer is truncated or corrupted somewhere in the middle. + %% Continue writing after the last valid record. + ?SLOG(warning, #{ + msg => "existing_buffer_recovered_partially", + filename => Filename, + reason => Reason, + details => + "Buffer is truncated or corrupted somewhere in the middle. " + "Corrupted records will be discarded." + }), + Writer = emqx_bridge_s3_aggreg_buffer:takeover(Reader0), + {ok, Writer, NWritten} + end. + +mk_buffer( + Since, + Seq, + #st{tab = Tab, interval = Interval, max_records = MaxRecords, work_dir = WorkDir} +) -> + Name = mk_filename(Since, Seq), + Counter = {Tab, {Since, Seq}}, + #buffer{ + since = Since, + until = Since + Interval, + seq = Seq, + filename = filename:join(WorkDir, Name), + max_records = MaxRecords, + cnt_records = Counter + }. + +handle_close_buffer( + Timestamp, + St0 = #st{buffer = Buffer = #buffer{until = Until}} +) when Timestamp >= Until -> + St = St0#st{buffer = undefined}, + _ = announce_current_buffer(St), + enqueue_delivery(close_buffer(Buffer), St); +handle_close_buffer(_Timestamp, St = #st{buffer = undefined}) -> + St. + +close_buffer(Buffer = #buffer{fd = FD}) -> + ok = file:close(FD), + Buffer#buffer{fd = undefined}. + +discard_buffer(#buffer{filename = Filename, cnt_records = Counter}) -> + %% NOTE: Hopefully, no process is touching this counter anymore. + _ = del_counter(Counter), + file:delete(Filename). + +pick_buffer(Name, Timestamp) -> + case lookup_current_buffer(Name) of + #buffer{until = Until} = Buffer when Timestamp < Until -> + Buffer; + #buffer{since = Since} when Timestamp < Since -> + %% TODO: Support timestamps going back. + error({invalid_timestamp, Timestamp}); + _Outdated -> + undefined + end. + +announce_current_buffer(#st{tab = Tab, buffer = Buffer}) -> + ets:insert(Tab, {buffer, Buffer}). + +lookup_current_buffer(Name) -> + ets:lookup_element(lookup_tab(Name), buffer, 2). + +%% + +enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) -> + {ok, Pid} = emqx_bridge_s3_aggreg_upload_sup:start_delivery(Name, Buffer), + MRef = erlang:monitor(process, Pid), + St#st{deliveries = Ds#{MRef => Buffer}}. + +handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when + Normal == normal; Normal == noproc +-> + ?SLOG(debug, #{ + msg => "aggregated_buffer_delivery_completed", + action => Name, + buffer => Buffer#buffer.filename + }), + ok = discard_buffer(Buffer), + St; +handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name}) -> + ?SLOG(info, #{ + msg => "aggregated_buffer_delivery_skipped", + action => Name, + buffer => {Buffer#buffer.since, Buffer#buffer.seq}, + reason => Reason + }), + ok = discard_buffer(Buffer), + St; +handle_delivery_exit(Buffer, Error, St = #st{name = Name}) -> + ?SLOG(error, #{ + msg => "aggregated_buffer_delivery_failed", + action => Name, + buffer => {Buffer#buffer.since, Buffer#buffer.seq}, + filename => Buffer#buffer.filename, + reason => Error + }), + enqueue_status_error(Error, St). + +enqueue_status_error({upload_failed, Error}, St = #st{errors = QErrors}) -> + %% TODO + %% This code feels too specific, errors probably need classification. + St#st{errors = queue:in(Error, QErrors)}; +enqueue_status_error(_AnotherError, St) -> + St. + +handle_take_error(St = #st{errors = QErrors0}) -> + case queue:out(QErrors0) of + {{value, Error}, QErrors} -> + {[Error], St#st{errors = QErrors}}; + {empty, QErrors} -> + {[], St#st{errors = QErrors}} + end. + +%% + +recover(St0 = #st{work_dir = WorkDir}) -> + {ok, Filenames} = file:list_dir(WorkDir), + ExistingBuffers = lists:flatmap(fun(FN) -> read_existing_file(FN, St0) end, Filenames), + case lists:reverse(lists:keysort(#buffer.since, ExistingBuffers)) of + [Buffer | ClosedBuffers] -> + St = lists:foldl(fun enqueue_delivery/2, St0, ClosedBuffers), + St#st{buffer = recover_buffer(Buffer)}; + [] -> + St0 + end. + +read_existing_file("MANIFEST", _St) -> + []; +read_existing_file(Name, St) -> + case parse_filename(Name) of + {Since, Seq} -> + [read_existing_buffer(Since, Seq, Name, St)]; + error -> + %% TODO: log? + [] + end. + +read_existing_buffer(Since, Seq, Name, St = #st{work_dir = WorkDir}) -> + Filename = filename:join(WorkDir, Name), + Buffer = mk_buffer(Since, Seq, St), + Buffer#buffer{filename = Filename}. + +%% + +mk_filename(Since, Seq) -> + "T" ++ integer_to_list(Since) ++ "_" ++ pad_number(Seq, 4). + +parse_filename(Filename) -> + case re:run(Filename, "^T(\\d+)_(\\d+)$", [{capture, all_but_first, list}]) of + {match, [Since, Seq]} -> + {list_to_integer(Since), list_to_integer(Seq)}; + nomatch -> + error + end. + +%% + +add_counter({Tab, Counter}) -> + add_counter({Tab, Counter}, 0). + +add_counter({Tab, Counter}, N) -> + ets:insert(Tab, {Counter, N}). + +inc_counter({Tab, Counter}, Size) -> + ets:update_counter(Tab, Counter, {2, Size}). + +del_counter({Tab, Counter}) -> + ets:delete(Tab, Counter). + +%% + +create_tab(Name) -> + Tab = ets:new(?MODULE, [public, set, {write_concurrency, auto}]), + ok = persistent_term:put({?MODULE, Name}, Tab), + Tab. + +lookup_tab(Name) -> + persistent_term:get({?MODULE, Name}). + +cleanup_tab(Name) -> + persistent_term:erase({?MODULE, Name}). + +%% + +pad_number(I, L) -> + string:pad(integer_to_list(I), L, leading, $0). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl new file mode 100644 index 000000000..7ac62c6b5 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggregator.hrl @@ -0,0 +1,15 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-record(buffer, { + since :: emqx_bridge_s3_aggregator:timestamp(), + until :: emqx_bridge_s3_aggregator:timestamp(), + seq :: non_neg_integer(), + filename :: file:filename(), + fd :: file:io_device() | undefined, + max_records :: pos_integer() | undefined, + cnt_records :: {ets:tab(), _Counter} | undefined +}). + +-type buffer() :: #buffer{}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl new file mode 100644 index 000000000..e5b77f9d6 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl @@ -0,0 +1,16 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_app). + +-behaviour(application). +-export([start/2, stop/1]). + +%% + +start(_StartType, _StartArgs) -> + emqx_bridge_s3_sup:start_link(). + +stop(_State) -> + ok. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 5d3ed19f8..972294ef7 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -7,6 +7,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include("emqx_bridge_s3.hrl"). -behaviour(emqx_resource). -export([ @@ -17,7 +18,7 @@ on_remove_channel/3, on_get_channels/1, on_query/3, - % on_batch_query/3, + on_batch_query/3, on_get_status/2, on_get_channel_status/3 ]). @@ -31,12 +32,31 @@ }. -type channel_config() :: #{ - parameters := #{ - bucket := string(), - key := string(), - content := string(), - acl => emqx_s3:acl() - } + bridge_type := binary(), + parameters := s3_upload_parameters() | s3_aggregated_upload_parameters() +}. + +-type s3_upload_parameters() :: #{ + bucket := string(), + key := string(), + content := string(), + acl => emqx_s3:acl() +}. + +-type s3_aggregated_upload_parameters() :: #{ + bucket := string(), + key := string(), + acl => emqx_s3:acl(), + aggregation => #{ + time_interval := emqx_schema:duration_s(), + max_records := pos_integer() + }, + container := #{ + type := csv, + column_order => [string()] + }, + min_part_size := emqx_schema:bytesize(), + max_part_size := emqx_schema:bytesize() }. -type channel_state() :: #{ @@ -123,12 +143,13 @@ on_get_status(_InstId, State = #{client_config := Config}) -> -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - ChannelState = init_channel_state(Config), + ChannelState = start_channel(State, Config), {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) -> + ok = stop_channel(maps:get(ChannelId, Channels, undefined)), {ok, State#{channels => maps:remove(ChannelId, Channels)}}. -spec on_get_channels(_InstanceId :: resource_id()) -> @@ -138,27 +159,122 @@ on_get_channels(InstId) -> -spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) -> channel_status(). -on_get_channel_status(_InstId, ChannelId, #{channels := Channels}) -> +on_get_channel_status(_InstId, ChannelId, State = #{channels := Channels}) -> case maps:get(ChannelId, Channels, undefined) of - _ChannelState = #{} -> - %% TODO - %% Since bucket name may be templated, we can't really provide any - %% additional information regarding the channel health. - ?status_connected; + ChannelState = #{} -> + channel_status(ChannelState, State); undefined -> ?status_disconnected end. -init_channel_state(#{parameters := Parameters}) -> +start_channel(_State, #{ + bridge_type := ?BRIDGE_TYPE_UPLOAD, + parameters := Parameters = #{ + bucket := Bucket, + key := Key, + content := Content + } +}) -> #{ - bucket => emqx_template:parse(maps:get(bucket, Parameters)), - key => emqx_template:parse(maps:get(key, Parameters)), - content => emqx_template:parse(maps:get(content, Parameters)), - upload_options => #{ - acl => maps:get(acl, Parameters, undefined) - } + type => ?ACTION_UPLOAD, + bucket => emqx_template:parse(Bucket), + key => emqx_template:parse(Key), + content => emqx_template:parse(Content), + upload_options => upload_options(Parameters) + }; +start_channel(State, #{ + bridge_type := Type = ?BRIDGE_TYPE_AGGREGATED_UPLOAD, + bridge_name := Name, + parameters := Parameters = #{ + aggregation := #{ + time_interval := TimeInterval, + max_records := MaxRecords + }, + container := Container, + bucket := Bucket, + key := Key + } +}) -> + AggregOpts = #{ + time_interval => TimeInterval, + max_records => MaxRecords, + work_dir => work_dir(Type, Name) + }, + DeliveryOpts = #{ + bucket => Bucket, + key => emqx_bridge_s3_aggreg_upload:mk_key_template(Key), + container => Container, + upload_options => upload_options(Parameters), + client_config => maps:get(client_config, State), + uploader_config => maps:with([min_part_size, max_part_size], Parameters) + }, + _ = emqx_bridge_s3_sup:delete_child({Type, Name}), + {ok, SupPid} = emqx_bridge_s3_sup:start_child(#{ + id => {Type, Name}, + start => {emqx_bridge_s3_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, + type => supervisor, + restart => permanent + }), + #{ + type => ?ACTION_AGGREGATED_UPLOAD, + name => Name, + bucket => Bucket, + supervisor => SupPid, + on_stop => fun() -> emqx_bridge_s3_sup:delete_child({Type, Name}) end }. +upload_options(Parameters) -> + #{acl => maps:get(acl, Parameters, undefined)}. + +work_dir(Type, Name) -> + filename:join([emqx:data_dir(), bridge, Type, Name]). + +stop_channel(#{on_stop := OnStop}) -> + OnStop(); +stop_channel(_ChannelState) -> + ok. + +channel_status(#{type := ?ACTION_UPLOAD}, _State) -> + %% TODO + %% Since bucket name may be templated, we can't really provide any additional + %% information regarding the channel health. + ?status_connected; +channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> + %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. + Timestamp = erlang:system_time(second), + ok = emqx_bridge_s3_aggregator:tick(Name, Timestamp), + ok = check_bucket_accessible(Bucket, State), + ok = check_aggreg_upload_errors(Name), + ?status_connected. + +check_bucket_accessible(Bucket, #{client_config := Config}) -> + case emqx_s3_client:aws_config(Config) of + {error, Reason} -> + throw({unhealthy_target, Reason}); + AWSConfig -> + try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of + Props when is_list(Props) -> + ok + catch + error:{aws_error, {http_error, 404, _, _Reason}} -> + throw({unhealthy_target, "Bucket does not exist"}); + error:{aws_error, {socket_error, Reason}} -> + throw({unhealthy_target, emqx_utils:format(Reason)}) + end + end. + +check_aggreg_upload_errors(Name) -> + case emqx_bridge_s3_aggregator:take_error(Name) of + [Error] -> + %% TODO + %% This approach means that, for example, 3 upload failures will cause + %% the channel to be marked as unhealthy for 3 consecutive health checks. + ErrorMessage = emqx_utils:format(Error), + throw({unhealthy_target, ErrorMessage}); + [] -> + ok + end. + %% Queries -type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}. @@ -167,8 +283,21 @@ init_channel_state(#{parameters := Parameters}) -> {ok, _Result} | {error, _Reason}. on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) -> case maps:get(Tag, Channels, undefined) of - ChannelState = #{} -> + ChannelState = #{type := ?ACTION_UPLOAD} -> run_simple_upload(InstId, Tag, Data, ChannelState, Config); + ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} -> + run_aggregated_upload(InstId, [Data], ChannelState); + undefined -> + {error, {unrecoverable_error, {invalid_message_tag, Tag}}} + end. + +-spec on_batch_query(_InstanceId :: resource_id(), [query()], state()) -> + {ok, _Result} | {error, _Reason}. +on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) -> + case maps:get(Tag, Channels, undefined) of + ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} -> + Records = [Data0 | [Data || {_, Data} <- Rest]], + run_aggregated_upload(InstId, Records, ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -206,6 +335,16 @@ run_simple_upload( {error, map_error(Reason)} end. +run_aggregated_upload(InstId, Records, #{name := Name}) -> + Timestamp = erlang:system_time(second), + case emqx_bridge_s3_aggregator:push_records(Name, Timestamp, Records) of + ok -> + ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), + ok; + {error, Reason} -> + {error, {unrecoverable_error, Reason}} + end. + map_error({socket_error, _} = Reason) -> {recoverable_error, Reason}; map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl index be3c29bae..560334610 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector_info.erl @@ -20,7 +20,7 @@ type_name() -> s3. bridge_types() -> - [s3]. + [s3, s3_aggregated_upload]. resource_callback_module() -> emqx_bridge_s3_connector. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl new file mode 100644 index 000000000..230711a76 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl @@ -0,0 +1,42 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_sup). + +-export([ + start_link/0, + start_child/1, + delete_child/1 +]). + +-behaviour(supervisor). +-export([init/1]). + +-define(SUPREF, ?MODULE). + +%% + +start_link() -> + supervisor:start_link({local, ?SUPREF}, ?MODULE, root). + +start_child(ChildSpec) -> + supervisor:start_child(?SUPREF, ChildSpec). + +delete_child(ChildId) -> + case supervisor:terminate_child(?SUPREF, ChildId) of + ok -> + supervisor:delete_child(?SUPREF, ChildId); + Error -> + Error + end. + +%% + +init(root) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 1, + period => 1 + }, + {ok, {SupFlags, []}}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl new file mode 100644 index 000000000..6a63321bd --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -0,0 +1,142 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_upload). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_s3.hrl"). + +-define(ACTION, ?ACTION_UPLOAD). + +-behaviour(hocon_schema). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-export([ + bridge_v2_examples/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_s3". + +roots() -> + []. + +fields(Field) when + Field == "get_bridge_v2"; + Field == "put_bridge_v2"; + Field == "post_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); +fields(action) -> + {?ACTION, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), + #{ + desc => <<"S3 Upload Action Config">>, + required => false + } + )}; +fields(?ACTION) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + ?R_REF(s3_upload_parameters), + #{ + required => true, + desc => ?DESC(s3_upload) + } + ), + #{ + resource_opts_ref => ?R_REF(s3_action_resource_opts) + } + ); +fields(s3_upload_parameters) -> + emqx_s3_schema:fields(s3_upload) ++ + [ + {content, + hoconsc:mk( + emqx_schema:template(), + #{ + required => false, + default => <<"${.}">>, + desc => ?DESC(s3_object_content) + } + )} + ]; +fields(s3_action_resource_opts) -> + UnsupportedOpts = [batch_size, batch_time], + lists:filter( + fun({N, _}) -> not lists:member(N, UnsupportedOpts) end, + emqx_bridge_v2_schema:action_resource_opts_fields() + ). + +desc(?ACTION) -> + ?DESC(s3_upload); +desc(s3_upload) -> + ?DESC(s3_upload); +desc(s3_upload_parameters) -> + ?DESC(s3_upload_parameters); +desc(s3_action_resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"s3">> => #{ + summary => <<"S3 Simple Upload">>, + value => s3_upload_action_example(Method) + } + } + ]. + +s3_upload_action_example(post) -> + maps:merge( + s3_upload_action_example(put), + #{ + type => atom_to_binary(?ACTION_UPLOAD), + name => <<"my_s3_action">> + } + ); +s3_upload_action_example(get) -> + maps:merge( + s3_upload_action_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +s3_upload_action_example(put) -> + #{ + enable => true, + connector => <<"my_s3_connector">>, + description => <<"My action">>, + parameters => #{ + bucket => <<"${clientid}">>, + key => <<"${topic}">>, + content => <<"${payload}">>, + acl => <<"public_read">> + }, + resource_opts => #{ + query_mode => <<"sync">>, + inflight_window => 10 + } + }. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload_action_info.erl similarity index 69% rename from apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl rename to apps/emqx_bridge_s3/src/emqx_bridge_s3_upload_action_info.erl index 646173bf4..15a76ae7c 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload_action_info.erl @@ -2,18 +2,20 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_s3_action_info). +-module(emqx_bridge_s3_upload_action_info). -behaviour(emqx_action_info). +-include("emqx_bridge_s3.hrl"). + -export([ action_type_name/0, connector_type_name/0, schema_module/0 ]). -action_type_name() -> s3. +action_type_name() -> ?ACTION_UPLOAD. connector_type_name() -> s3. -schema_module() -> emqx_bridge_s3. +schema_module() -> emqx_bridge_s3_upload. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index 738f9186f..322666b1f 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -11,8 +11,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). --import(emqx_utils_conv, [bin/1]). - %% See `emqx_bridge_s3.hrl`. -define(BRIDGE_TYPE, <<"s3">>). -define(CONNECTOR_TYPE, <<"s3">>). @@ -79,67 +77,56 @@ end_per_testcase(_TestCase, _Config) -> connector_config(Name, _Config) -> BaseConf = emqx_s3_test_helpers:base_raw_config(tcp), - parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{ - <<"enable">> => true, - <<"description">> => <<"S3 Connector">>, - <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)), - <<"port">> => maps:get(<<"port">>, BaseConf), - <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), - <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), - <<"transport_options">> => #{ - <<"headers">> => #{ - <<"content-type">> => <> + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"connectors">>, ?CONNECTOR_TYPE, Name, #{ + <<"enable">> => true, + <<"description">> => <<"S3 Connector">>, + <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)), + <<"port">> => maps:get(<<"port">>, BaseConf), + <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), + <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), + <<"transport_options">> => #{ + <<"headers">> => #{ + <<"content-type">> => <> + }, + <<"connect_timeout">> => <<"500ms">>, + <<"request_timeout">> => <<"1s">>, + <<"pool_size">> => 4, + <<"max_retries">> => 0, + <<"enable_pipelining">> => 1 }, - <<"connect_timeout">> => <<"500ms">>, - <<"request_timeout">> => <<"1s">>, - <<"pool_size">> => 4, - <<"max_retries">> => 0, - <<"enable_pipelining">> => 1 - }, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"5s">>, - <<"start_timeout">> => <<"5s">> + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"5s">>, + <<"start_timeout">> => <<"5s">> + } } - }). + ). action_config(Name, ConnectorId) -> - parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{ - <<"enable">> => true, - <<"connector">> => ConnectorId, - <<"parameters">> => #{ - <<"bucket">> => <<"${clientid}">>, - <<"key">> => <<"${topic}">>, - <<"content">> => <<"${payload}">>, - <<"acl">> => <<"public_read">> - }, - <<"resource_opts">> => #{ - <<"buffer_mode">> => <<"memory_only">>, - <<"buffer_seg_bytes">> => <<"10MB">>, - <<"health_check_interval">> => <<"3s">>, - <<"inflight_window">> => 40, - <<"max_buffer_bytes">> => <<"256MB">>, - <<"metrics_flush_interval">> => <<"1s">>, - <<"query_mode">> => <<"sync">>, - <<"request_ttl">> => <<"60s">>, - <<"resume_interval">> => <<"3s">>, - <<"worker_pool_size">> => <<"4">> + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"actions">>, ?BRIDGE_TYPE, Name, #{ + <<"enable">> => true, + <<"connector">> => ConnectorId, + <<"parameters">> => #{ + <<"bucket">> => <<"${clientid}">>, + <<"key">> => <<"${topic}">>, + <<"content">> => <<"${payload}">>, + <<"acl">> => <<"public_read">> + }, + <<"resource_opts">> => #{ + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"3s">>, + <<"inflight_window">> => 40, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"60s">>, + <<"resume_interval">> => <<"3s">>, + <<"worker_pool_size">> => <<"4">> + } } - }). - -parse_and_check_config(Root, Type, Name, ConfigIn) -> - Schema = - case Root of - <<"connectors">> -> emqx_connector_schema; - <<"actions">> -> emqx_bridge_v2_schema - end, - #{Root := #{Type := #{Name := Config}}} = - hocon_tconf:check_plain( - Schema, - #{Root => #{Type => #{Name => ConfigIn}}}, - #{required => false, atom_key => false} - ), - ct:pal("parsed config: ~p", [Config]), - ConfigIn. + ). t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). @@ -190,7 +177,7 @@ t_sync_query(Config) -> ok = erlcloud_s3:create_bucket(Bucket, AwsConfig), ok = emqx_bridge_v2_testlib:t_sync_query( Config, - fun() -> mk_message(Bucket, Topic, Payload) end, + fun() -> emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload) end, fun(Res) -> ?assertMatch(ok, Res) end, s3_bridge_connector_upload_ok ), @@ -224,15 +211,10 @@ t_query_retry_recoverable(Config) -> heal_failure, [timeout, ?PROXY_NAME, ProxyHost, ProxyPort] ), - Message = mk_message(Bucket, Topic, Payload), + Message = emqx_bridge_s3_test_helpers:mk_message_event(Bucket, Topic, Payload), %% Verify that the message is sent eventually. ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}), ?assertMatch( #{content := Payload}, maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig)) ). - -mk_message(ClientId, Topic, Payload) -> - Message = emqx_message:make(bin(ClientId), bin(Topic), Payload), - {Event, _} = emqx_rule_events:eventmsg_publish(Message), - Event. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl new file mode 100644 index 000000000..199e070d3 --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_buffer_SUITE.erl @@ -0,0 +1,181 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_buffer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%% CT Setup + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = emqx_cth_suite:work_dir(Config), + ok = filelib:ensure_path(WorkDir), + [{work_dir, WorkDir} | Config]. + +end_per_suite(_Config) -> + ok. + +%% Testcases + +t_write_read_cycle(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, + {ok, WFD} = file:open(Filename, [write, binary]), + Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Terms = [ + [], + [[[[[[[[]]]]]]]], + 123, + lists:seq(1, 100), + lists:seq(1, 1000), + lists:seq(1, 10000), + lists:seq(1, 100000), + #{<<"id">> => 123456789, <<"ts">> => <<"2028-02-29T12:34:56Z">>, <<"gauge">> => 42.42}, + {<<"text/plain">>, _Huge = rand:bytes(1048576)}, + {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})} + ], + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, + Terms + ), + ok = file:close(WFD), + {ok, RFD} = file:open(Filename, [read, binary, raw]), + {MetadataRead, Reader} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), + ?assertEqual(Metadata, MetadataRead), + TermsRead = read_until_eof(Reader), + ?assertEqual(Terms, TermsRead). + +t_read_empty(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + ok = file:close(WFD), + {ok, RFD} = file:open(Filename, [read, binary]), + ?assertError( + {buffer_incomplete, header}, + emqx_bridge_s3_aggreg_buffer:new_reader(RFD) + ). + +t_read_garbage(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + ok = file:write(WFD, rand:bytes(1048576)), + ok = file:close(WFD), + {ok, RFD} = file:open(Filename, [read, binary]), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:new_reader(RFD) + ). + +t_read_truncated(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, + Writer = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Terms = [ + [[[[[[[[[[[]]]]]]]]]]], + lists:seq(1, 100000), + #{<<"id">> => 123456789, <<"ts">> => <<"2029-02-30T12:34:56Z">>, <<"gauge">> => 42.42}, + {<<"text/plain">>, _Huge = rand:bytes(1048576)} + ], + LastTerm = + {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}, + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer)) end, + Terms + ), + {ok, WPos} = file:position(WFD, cur), + ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(LastTerm, Writer)), + ok = file:close(WFD), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos + 1), + {ok, RFD1} = file:open(Filename, [read, binary]), + {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD1), + {ReadTerms1, Reader1} = read_terms(length(Terms), Reader0), + ?assertEqual(Terms, ReadTerms1), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:read(Reader1) + ), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), + {ok, RFD2} = file:open(Filename, [read, binary]), + {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD2), + {ReadTerms2, Reader3} = read_terms(_FitsInto = 3, Reader2), + ?assertEqual(lists:sublist(Terms, 3), ReadTerms2), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:read(Reader3) + ). + +t_read_truncated_takeover_write(Config) -> + Filename = mk_filename(?FUNCTION_NAME, Config), + {ok, WFD} = file:open(Filename, [write, binary]), + Metadata = {?MODULE, #{tc => ?FUNCTION_NAME}}, + Writer1 = emqx_bridge_s3_aggreg_buffer:new_writer(WFD, Metadata), + Terms1 = [ + [[[[[[[[[[[]]]]]]]]]]], + lists:seq(1, 10000), + lists:duplicate(1000, ?FUNCTION_NAME), + {<<"text/plain">>, _Huge = rand:bytes(1048576)} + ], + Terms2 = [ + {<<"application/json">>, emqx_utils_json:encode(#{j => <<"son">>, null => null})}, + {<<"application/x-octet-stream">>, rand:bytes(102400)} + ], + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer1)) end, + Terms1 + ), + {ok, WPos} = file:position(WFD, cur), + ok = file:close(WFD), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, WPos div 2), + {ok, RWFD} = file:open(Filename, [read, write, binary]), + {Metadata, Reader0} = emqx_bridge_s3_aggreg_buffer:new_reader(RWFD), + {ReadTerms1, Reader1} = read_terms(_Survived = 3, Reader0), + ?assertEqual( + lists:sublist(Terms1, 3), + ReadTerms1 + ), + ?assertError( + badarg, + emqx_bridge_s3_aggreg_buffer:read(Reader1) + ), + Writer2 = emqx_bridge_s3_aggreg_buffer:takeover(Reader1), + ok = lists:foreach( + fun(T) -> ?assertEqual(ok, emqx_bridge_s3_aggreg_buffer:write(T, Writer2)) end, + Terms2 + ), + ok = file:close(RWFD), + {ok, RFD} = file:open(Filename, [read, binary]), + {Metadata, Reader2} = emqx_bridge_s3_aggreg_buffer:new_reader(RFD), + ReadTerms2 = read_until_eof(Reader2), + ?assertEqual( + lists:sublist(Terms1, 3) ++ Terms2, + ReadTerms2 + ). + +%% + +mk_filename(Name, Config) -> + filename:join(?config(work_dir, Config), Name). + +read_terms(0, Reader) -> + {[], Reader}; +read_terms(N, Reader0) -> + {Term, Reader1} = emqx_bridge_s3_aggreg_buffer:read(Reader0), + {Terms, Reader} = read_terms(N - 1, Reader1), + {[Term | Terms], Reader}. + +read_until_eof(Reader0) -> + case emqx_bridge_s3_aggreg_buffer:read(Reader0) of + {Term, Reader} -> + [Term | read_until_eof(Reader)]; + eof -> + [] + end. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl new file mode 100644 index 000000000..6da70c6fe --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_csv_tests.erl @@ -0,0 +1,72 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_csv_tests). + +-include_lib("eunit/include/eunit.hrl"). + +encoding_test() -> + CSV = emqx_bridge_s3_aggreg_csv:new(#{}), + ?assertEqual( + "A,B,Ç\n" + "1.2345,string,0.0\n" + "0.3333333333,\"[]\",-0.0\n" + "111111,🫠,0.0\n" + "111.111,\"\"\"quoted\"\"\",\"line\r\nbreak\"\n" + "222.222,,\n", + fill_close(CSV, [ + [ + #{<<"A">> => 1.2345, <<"B">> => "string", <<"Ç"/utf8>> => +0.0}, + #{<<"A">> => 1 / 3, <<"B">> => "[]", <<"Ç"/utf8>> => -0.0}, + #{<<"A">> => 111111, <<"B">> => "🫠", <<"Ç"/utf8>> => 0.0}, + #{<<"A">> => 111.111, <<"B">> => "\"quoted\"", <<"Ç"/utf8>> => "line\r\nbreak"}, + #{<<"A">> => 222.222, <<"B">> => "", <<"Ç"/utf8>> => undefined} + ] + ]) + ). + +column_order_test() -> + Order = [<<"ID">>, <<"TS">>], + CSV = emqx_bridge_s3_aggreg_csv:new(#{column_order => Order}), + ?assertEqual( + "ID,TS,A,B,D\n" + "1,2024-01-01,12.34,str,\"[]\"\n" + "2,2024-01-02,23.45,ing,\n" + "3,,45,,'\n" + "4,2024-01-04,,,\n", + fill_close(CSV, [ + [ + #{ + <<"A">> => 12.34, + <<"B">> => "str", + <<"ID">> => 1, + <<"TS">> => "2024-01-01", + <<"D">> => <<"[]">> + }, + #{ + <<"TS">> => "2024-01-02", + <<"C">> => <<"null">>, + <<"ID">> => 2, + <<"A">> => 23.45, + <<"B">> => "ing" + } + ], + [ + #{<<"A">> => 45, <<"D">> => <<"'">>, <<"ID">> => 3}, + #{<<"ID">> => 4, <<"TS">> => "2024-01-04"} + ] + ]) + ). + +fill_close(CSV, LRecords) -> + string(fill_close_(CSV, LRecords)). + +fill_close_(CSV0, [Records | LRest]) -> + {Writes, CSV} = emqx_bridge_s3_aggreg_csv:fill(Records, CSV0), + [Writes | fill_close_(CSV, LRest)]; +fill_close_(CSV, []) -> + [emqx_bridge_s3_aggreg_csv:close(CSV)]. + +string(Writes) -> + unicode:characters_to_list(Writes). diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl new file mode 100644 index 000000000..45a830294 --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -0,0 +1,372 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_aggreg_upload_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +-import(emqx_utils_conv, [bin/1]). + +%% See `emqx_bridge_s3.hrl`. +-define(BRIDGE_TYPE, <<"s3_aggregated_upload">>). +-define(CONNECTOR_TYPE, <<"s3">>). + +-define(PROXY_NAME, "minio_tcp"). + +-define(CONF_TIME_INTERVAL, 4000). +-define(CONF_MAX_RECORDS, 100). +-define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])). +-define(CONF_COLUMN_ORDER(T), [ + <<"publish_received_at">>, + <<"clientid">>, + <<"topic">>, + <<"payload">> + | T +]). + +-define(LIMIT_TOLERANCE, 1.1). + +%% CT Setup + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + _ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_s3, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [ + {apps, Apps}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {proxy_name, ?PROXY_NAME} + | Config + ]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +%% Testcases + +init_per_testcase(TestCase, Config) -> + ct:timetrap(timer:seconds(10)), + ok = snabbkaffe:start_trace(), + TS = erlang:system_time(), + Name = iolist_to_binary(io_lib:format("~s-~p", [TestCase, TS])), + Bucket = unicode:characters_to_list(string:replace(Name, "_", "-", all)), + ConnectorConfig = connector_config(Name, Config), + ActionConfig = action_config(Name, Name, Bucket), + ok = emqx_bridge_s3_test_helpers:create_bucket(Bucket), + [ + {connector_type, ?CONNECTOR_TYPE}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, ?BRIDGE_TYPE}, + {bridge_name, Name}, + {bridge_config, ActionConfig}, + {s3_bucket, Bucket} + | Config + ]. + +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), + ok. + +connector_config(Name, _Config) -> + BaseConf = emqx_s3_test_helpers:base_raw_config(tcp), + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"connectors">>, ?CONNECTOR_TYPE, Name, #{ + <<"enable">> => true, + <<"description">> => <<"S3 Connector">>, + <<"host">> => emqx_utils_conv:bin(maps:get(<<"host">>, BaseConf)), + <<"port">> => maps:get(<<"port">>, BaseConf), + <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), + <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), + <<"transport_options">> => #{ + <<"connect_timeout">> => <<"500ms">>, + <<"request_timeout">> => <<"1s">>, + <<"pool_size">> => 4, + <<"max_retries">> => 0 + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">> + } + } + ). + +action_config(Name, ConnectorId, Bucket) -> + emqx_bridge_s3_test_helpers:parse_and_check_config( + <<"actions">>, ?BRIDGE_TYPE, Name, #{ + <<"enable">> => true, + <<"connector">> => ConnectorId, + <<"parameters">> => #{ + <<"bucket">> => unicode:characters_to_binary(Bucket), + <<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>, + <<"acl">> => <<"public_read">>, + <<"aggregation">> => #{ + <<"time_interval">> => <<"4s">>, + <<"max_records">> => ?CONF_MAX_RECORDS + }, + <<"container">> => #{ + <<"type">> => <<"csv">>, + <<"column_order">> => ?CONF_COLUMN_ORDER + } + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">>, + <<"max_buffer_bytes">> => <<"64MB">>, + <<"query_mode">> => <<"async">>, + <<"worker_pool_size">> => 4 + } + } + ). + +t_start_stop(Config) -> + emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). + +t_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{}). + +t_aggreg_upload(Config) -> + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + BridgeNameString = unicode:characters_to_list(BridgeName), + NodeString = atom_to_list(node()), + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Prepare some sample messages that look like Rule SQL productions. + MessageEvents = lists:map(fun mk_message_event/1, [ + {<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>}, + {<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>}, + {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} + ]), + ok = send_messages(BridgeName, MessageEvents), + %% Wait until the delivery is completed. + ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check the uploaded objects. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + ?assertMatch( + [BridgeNameString, NodeString, _Datetime, _Seq = "0"], + string:split(Key, "/", all) + ), + _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + %% Verify that column order is respected. + ?assertMatch( + {ok, [ + ?CONF_COLUMN_ORDER(_), + [TS, <<"C1">>, T1, P1 | _], + [TS, <<"C2">>, T2, P2 | _], + [TS, <<"C3">>, T3, P3 | _] + ]}, + erl_csv:decode(Content) + ). + +t_aggreg_upload_restart(Config) -> + %% NOTE + %% This test verifies that the bridge will reuse existing aggregation buffer + %% after a restart. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Send some sample messages that look like Rule SQL productions. + MessageEvents = lists:map(fun mk_message_event/1, [ + {<<"C1">>, T1 = <<"a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>}, + {<<"C2">>, T2 = <<"foo/bar">>, P2 = <<"baz">>}, + {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} + ]), + ok = send_messages(BridgeName, MessageEvents), + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), + %% Restart the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + %% Send some more messages. + ok = send_messages(BridgeName, MessageEvents), + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_records_written, action := BridgeName}), + %% Wait until the delivery is completed. + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check there's still only one upload. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + %% Verify that column order is respected. + ?assertMatch( + {ok, [ + _Header = [_ | _], + [TS1, <<"C1">>, T1, P1 | _], + [TS1, <<"C2">>, T2, P2 | _], + [TS1, <<"C3">>, T3, P3 | _], + [TS2, <<"C1">>, T1, P1 | _], + [TS2, <<"C2">>, T2, P2 | _], + [TS2, <<"C3">>, T3, P3 | _] + ]}, + erl_csv:decode(Content) + ). + +t_aggreg_upload_restart_corrupted(Config) -> + %% NOTE + %% This test verifies that the bridge can recover from a buffer file corruption, + %% and does so while preserving uncompromised data. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + BatchSize = ?CONF_MAX_RECORDS div 2, + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Send some sample messages that look like Rule SQL productions. + Messages1 = [ + {integer_to_binary(N), <<"a/b/c">>, <<"{\"hello\":\"world\"}">>} + || N <- lists:seq(1, BatchSize) + ], + %% Ensure that they span multiple batch queries. + ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), + {ok, _} = ?block_until( + #{?snk_kind := s3_aggreg_records_written, action := BridgeName}, + infinity, + 0 + ), + %% Find out the buffer file. + {ok, #{filename := Filename}} = ?block_until( + #{?snk_kind := s3_aggreg_buffer_allocated, action := BridgeName} + ), + %% Stop the bridge, corrupt the buffer file, and restart the bridge. + {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + BufferFileSize = filelib:file_size(Filename), + ok = emqx_bridge_s3_test_helpers:truncate_at(Filename, BufferFileSize div 2), + {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + %% Send some more messages. + Messages2 = [ + {integer_to_binary(N), <<"c/d/e">>, <<"{\"hello\":\"world\"}">>} + || N <- lists:seq(1, BatchSize) + ], + ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), + %% Wait until the delivery is completed. + {ok, _} = ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check that upload contains part of the first batch and all of the second batch. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), + NRows = length(Rows), + ?assert( + NRows > BatchSize, + CSV + ), + ?assertEqual( + lists:sublist(Messages1, NRows - BatchSize) ++ Messages2, + [{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows], + CSV + ). + +t_aggreg_next_rotate(Config) -> + %% NOTE + %% This is essentially a stress test that tries to verify that buffer rotation + %% and windowing work correctly under high rate, high concurrency conditions. + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + NSenders = 4, + %% Create a bridge with the sample configuration. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + %% Start separate processes to send messages. + Senders = [ + spawn_link(fun() -> run_message_sender(BridgeName, N) end) + || N <- lists:seq(1, NSenders) + ], + %% Give them some time to send messages so that rotation and windowing will happen. + ok = timer:sleep(round(?CONF_TIME_INTERVAL * 1.5)), + %% Stop the senders. + _ = [Sender ! {stop, self()} || Sender <- Senders], + NSent = receive_sender_reports(Senders), + %% Wait for the last delivery to complete. + ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), + ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}, infinity, 0), + %% There should be at least 2 time windows of aggregated records. + Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], + DTs = [DT || K <- Uploads, [_Action, _Node, DT | _] <- [string:split(K, "/", all)]], + ?assert( + ordsets:size(ordsets:from_list(DTs)) > 1, + Uploads + ), + %% Uploads should not contain more than max allowed records. + CSVs = [{K, fetch_parse_csv(Bucket, K)} || K <- Uploads], + NRecords = [{K, length(CSV) - 1} || {K, CSV} <- CSVs], + ?assertEqual( + [], + [{K, NR} || {K, NR} <- NRecords, NR > ?CONF_MAX_RECORDS * ?LIMIT_TOLERANCE] + ), + %% No message should be lost. + ?assertEqual( + NSent, + lists:sum([NR || {_, NR} <- NRecords]) + ). + +run_message_sender(BridgeName, N) -> + ClientID = integer_to_binary(N), + Topic = <<"a/b/c/", ClientID/binary>>, + run_message_sender(BridgeName, N, ClientID, Topic, N, 0). + +run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent) -> + Payload = integer_to_binary(N * 1_000_000 + NSent), + Message = emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload), + _ = send_message(BridgeName, Message), + receive + {stop, From} -> + From ! {sent, self(), NSent + 1} + after Delay -> + run_message_sender(BridgeName, N, ClientID, Topic, Delay, NSent + 1) + end. + +receive_sender_reports([Sender | Rest]) -> + receive + {sent, Sender, NSent} -> NSent + receive_sender_reports(Rest) + end; +receive_sender_reports([]) -> + 0. + +%% + +mk_message_event({ClientID, Topic, Payload}) -> + emqx_bridge_s3_test_helpers:mk_message_event(ClientID, Topic, Payload). + +send_messages(BridgeName, MessageEvents) -> + lists:foreach( + fun(M) -> send_message(BridgeName, M) end, + MessageEvents + ). + +send_messages_delayed(BridgeName, MessageEvents, Delay) -> + lists:foreach( + fun(M) -> + send_message(BridgeName, M), + timer:sleep(Delay) + end, + MessageEvents + ). + +send_message(BridgeName, Message) -> + ?assertEqual(ok, emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{})). + +fetch_parse_csv(Bucket, Key) -> + #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), + {ok, CSV} = erl_csv:decode(Content), + CSV. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl new file mode 100644 index 000000000..de19c8028 --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_test_helpers.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_test_helpers). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_utils_conv, [bin/1]). + +parse_and_check_config(Root, Type, Name, Config) -> + Schema = + case Root of + <<"connectors">> -> emqx_connector_schema; + <<"actions">> -> emqx_bridge_v2_schema + end, + #{Root := #{Type := #{Name := _ConfigParsed}}} = + hocon_tconf:check_plain( + Schema, + #{Root => #{Type => #{Name => Config}}}, + #{required => false, atom_key => false} + ), + Config. + +mk_message_event(ClientId, Topic, Payload) -> + Message = emqx_message:make(bin(ClientId), bin(Topic), Payload), + {Event, _} = emqx_rule_events:eventmsg_publish(Message), + emqx_utils_maps:binary_key_map(Event). + +create_bucket(Bucket) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + erlcloud_s3:create_bucket(Bucket, AwsConfig). + +list_objects(Bucket) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + Response = erlcloud_s3:list_objects(Bucket, AwsConfig), + false = proplists:get_value(is_truncated, Response), + Contents = proplists:get_value(contents, Response), + lists:map(fun maps:from_list/1, Contents). + +get_object(Bucket, Key) -> + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + maps:from_list(erlcloud_s3:get_object(Bucket, Key, AwsConfig)). + +%% File utilities + +truncate_at(Filename, Pos) -> + {ok, FD} = file:open(Filename, [read, write, binary]), + {ok, Pos} = file:position(FD, Pos), + ok = file:truncate(FD), + ok = file:close(FD).