diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index dc71d5806..1620e271a 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -118,8 +118,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_pulsar_action_info, emqx_bridge_greptimedb_action_info, emqx_bridge_tdengine_action_info, - emqx_bridge_s3_upload_action_info, - emqx_bridge_s3_aggreg_upload_action_info + emqx_bridge_s3_upload_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index 05c8592d8..01a3e6c7c 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -12,8 +12,7 @@ ]}, {env, [ {emqx_action_info_modules, [ - emqx_bridge_s3_upload_action_info, - emqx_bridge_s3_aggreg_upload_action_info + emqx_bridge_s3_upload_action_info ]}, {emqx_connector_info_modules, [ emqx_bridge_s3_connector_info diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl index 62a80d260..0fd738255 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl @@ -8,8 +8,6 @@ %% Actions -define(ACTION_UPLOAD, s3). -define(BRIDGE_TYPE_UPLOAD, <<"s3">>). --define(ACTION_AGGREGATED_UPLOAD, s3_aggregated_upload). --define(BRIDGE_TYPE_AGGREGATED_UPLOAD, <<"s3_aggregated_upload">>). -define(CONNECTOR, s3). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl deleted file mode 100644 index cea54f71f..000000000 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl +++ /dev/null @@ -1,275 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_s3_aggreg_upload). - --include_lib("typerefl/include/types.hrl"). --include_lib("hocon/include/hoconsc.hrl"). --include("emqx_bridge_s3.hrl"). - --define(ACTION, ?ACTION_AGGREGATED_UPLOAD). - --define(DEFAULT_BATCH_SIZE, 100). --define(DEFAULT_BATCH_TIME, <<"10ms">>). - --behaviour(hocon_schema). --export([ - namespace/0, - roots/0, - fields/1, - desc/1 -]). - -%% Interpreting options --export([ - mk_key_template/1, - mk_upload_options/1 -]). - -%% emqx_bridge_v2_schema API --export([bridge_v2_examples/1]). - -%%------------------------------------------------------------------------------------------------- -%% `hocon_schema' API -%%------------------------------------------------------------------------------------------------- - -namespace() -> - "bridge_s3_aggreg_upload". - -roots() -> - []. - -fields(Field) when - Field == "get_bridge_v2"; - Field == "put_bridge_v2"; - Field == "post_bridge_v2" --> - emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); -fields(action) -> - {?ACTION, - hoconsc:mk( - hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), - #{ - desc => <<"S3 Aggregated Upload Action Config">>, - required => false - } - )}; -fields(?ACTION) -> - emqx_bridge_v2_schema:make_producer_action_schema( - hoconsc:mk( - ?R_REF(s3_aggregated_upload_parameters), - #{ - required => true, - desc => ?DESC(s3_aggregated_upload_parameters) - } - ), - #{ - resource_opts_ref => ?R_REF(s3_aggreg_upload_resource_opts) - } - ); -fields(s3_aggregated_upload_parameters) -> - lists:append([ - [ - {container, - hoconsc:mk( - %% TODO: Support selectors once there are more than one container. - hoconsc:union(fun - (all_union_members) -> [?REF(s3_aggregated_container_csv)]; - ({value, _Valur}) -> [?REF(s3_aggregated_container_csv)] - end), - #{ - required => true, - default => #{<<"type">> => <<"csv">>}, - desc => ?DESC(s3_aggregated_container) - } - )}, - {aggregation, - hoconsc:mk( - ?REF(s3_aggregation), - #{ - required => true, - desc => ?DESC(s3_aggregation) - } - )} - ], - emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ - {key, #{desc => ?DESC(s3_aggregated_upload_key)}} - ]), - emqx_s3_schema:fields(s3_uploader) - ]); -fields(s3_aggregated_container_csv) -> - [ - {type, - hoconsc:mk( - csv, - #{ - required => true, - desc => ?DESC(s3_aggregated_container_csv) - } - )}, - {column_order, - hoconsc:mk( - hoconsc:array(string()), - #{ - required => false, - default => [], - desc => ?DESC(s3_aggregated_container_csv_column_order) - } - )} - ]; -fields(s3_aggregation) -> - [ - %% TODO: Needs bucketing? (e.g. messages falling in this 1h interval) - {time_interval, - hoconsc:mk( - emqx_schema:duration_s(), - #{ - required => false, - default => <<"1h">>, - desc => ?DESC(s3_aggregation_interval) - } - )}, - {max_records, - hoconsc:mk( - pos_integer(), - #{ - required => false, - default => <<"1000000">>, - desc => ?DESC(s3_aggregation_max_records) - } - )} - ]; -fields(s3_aggreg_upload_resource_opts) -> - %% NOTE: This action should benefit from generous batching defaults. - emqx_bridge_v2_schema:action_resource_opts_fields([ - {batch_size, #{default => ?DEFAULT_BATCH_SIZE}}, - {batch_time, #{default => ?DEFAULT_BATCH_TIME}} - ]). - -desc(Name) when - Name == s3_aggregated_upload; - Name == s3_aggregated_upload_parameters; - Name == s3_aggregation; - Name == s3_aggregated_container_csv --> - ?DESC(Name); -desc(s3_aggreg_upload_resource_opts) -> - ?DESC(emqx_resource_schema, resource_opts); -desc(_Name) -> - undefined. - -%% Interpreting options - --spec mk_key_template(_Parameters :: map()) -> emqx_template:str(). -mk_key_template(#{key := Key}) -> - Template = emqx_template:parse(Key), - {_, BindingErrors} = emqx_template:render(Template, #{}), - {UsedBindings, _} = lists:unzip(BindingErrors), - SuffixTemplate = mk_suffix_template(UsedBindings), - case emqx_template:is_const(SuffixTemplate) of - true -> - Template; - false -> - Template ++ SuffixTemplate - end. - -mk_suffix_template(UsedBindings) -> - RequiredBindings = ["action", "node", "datetime.", "sequence"], - SuffixBindings = [ - mk_default_binding(RB) - || RB <- RequiredBindings, - lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings) - ], - SuffixTemplate = [["/", B] || B <- SuffixBindings], - emqx_template:parse(SuffixTemplate). - -mk_default_binding("datetime.") -> - "${datetime.rfc3339utc}"; -mk_default_binding(Binding) -> - "${" ++ Binding ++ "}". - --spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options(). -mk_upload_options(Parameters) -> - Headers = mk_upload_headers(Parameters), - #{ - headers => Headers, - acl => maps:get(acl, Parameters, undefined) - }. - -mk_upload_headers(Parameters = #{container := Container}) -> - Headers = normalize_headers(maps:get(headers, Parameters, #{})), - ContainerHeaders = mk_container_headers(Container), - maps:merge(ContainerHeaders, Headers). - -normalize_headers(Headers) -> - maps:fold( - fun(Header, Value, Acc) -> - maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc) - end, - #{}, - Headers - ). - -mk_container_headers(#{type := csv}) -> - #{"content-type" => "text/csv"}; -mk_container_headers(#{}) -> - #{}. - -%% Examples - -bridge_v2_examples(Method) -> - [ - #{ - <<"s3_aggregated_upload">> => #{ - summary => <<"S3 Aggregated Upload">>, - value => s3_action_example(Method) - } - } - ]. - -s3_action_example(post) -> - maps:merge( - s3_action_example(put), - #{ - type => atom_to_binary(?ACTION_UPLOAD), - name => <<"my_s3_action">> - } - ); -s3_action_example(get) -> - maps:merge( - s3_action_example(put), - #{ - status => <<"connected">>, - node_status => [ - #{ - node => <<"emqx@localhost">>, - status => <<"connected">> - } - ] - } - ); -s3_action_example(put) -> - #{ - enable => true, - connector => <<"my_s3_connector">>, - description => <<"My action">>, - parameters => #{ - bucket => <<"mqtt-aggregated">>, - key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>, - acl => <<"public_read">>, - aggregation => #{ - time_interval => <<"15m">>, - max_records => 100_000 - }, - <<"container">> => #{ - type => <<"csv">>, - column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>] - } - }, - resource_opts => #{ - health_check_interval => <<"10s">>, - query_mode => <<"async">>, - inflight_window => 100 - } - }. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl deleted file mode 100644 index b179073e5..000000000 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload_action_info.erl +++ /dev/null @@ -1,21 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_s3_aggreg_upload_action_info). - --behaviour(emqx_action_info). - --include("emqx_bridge_s3.hrl"). - --export([ - action_type_name/0, - connector_type_name/0, - schema_module/0 -]). - -action_type_name() -> ?ACTION_AGGREGATED_UPLOAD. - -connector_type_name() -> s3. - -schema_module() -> emqx_bridge_s3_aggreg_upload. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index ce1bee8d1..1fff09644 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -187,22 +187,24 @@ on_get_channel_status(_InstId, ChannelId, State = #{channels := Channels}) -> start_channel(_State, #{ bridge_type := ?BRIDGE_TYPE_UPLOAD, parameters := Parameters = #{ + mode := Mode = direct, bucket := Bucket, key := Key, content := Content } }) -> #{ - type => ?ACTION_UPLOAD, + mode => Mode, bucket => emqx_template:parse(Bucket), key => emqx_template:parse(Key), content => emqx_template:parse(Content), upload_options => upload_options(Parameters) }; start_channel(State, #{ - bridge_type := Type = ?BRIDGE_TYPE_AGGREGATED_UPLOAD, + bridge_type := Type = ?BRIDGE_TYPE_UPLOAD, bridge_name := Name, parameters := Parameters = #{ + mode := Mode = aggregated, aggregation := #{ time_interval := TimeInterval, max_records := MaxRecords @@ -219,9 +221,9 @@ start_channel(State, #{ }, DeliveryOpts = #{ bucket => Bucket, - key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters), + key => emqx_bridge_s3_upload:mk_key_template(Parameters), container => Container, - upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters), + upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters), callback_module => ?MODULE, client_config => maps:get(client_config, State), uploader_config => maps:with([min_part_size, max_part_size], Parameters) @@ -235,7 +237,7 @@ start_channel(State, #{ restart => permanent }), #{ - type => ?ACTION_AGGREGATED_UPLOAD, + mode => Mode, name => Name, aggreg_id => AggregId, bucket => Bucket, @@ -254,14 +256,12 @@ stop_channel(#{on_stop := OnStop}) -> stop_channel(_ChannelState) -> ok. -channel_status(#{type := ?ACTION_UPLOAD}, _State) -> +channel_status(#{mode := direct}, _State) -> %% TODO %% Since bucket name may be templated, we can't really provide any additional %% information regarding the channel health. ?status_connected; -channel_status( - #{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State -) -> +channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, State) -> %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. Timestamp = erlang:system_time(second), ok = emqx_connector_aggregator:tick(AggregId, Timestamp), @@ -305,9 +305,9 @@ check_aggreg_upload_errors(AggregId) -> {ok, _Result} | {error, _Reason}. on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) -> case maps:get(Tag, Channels, undefined) of - ChannelState = #{type := ?ACTION_UPLOAD} -> + ChannelState = #{mode := direct} -> run_simple_upload(InstId, Tag, Data, ChannelState, Config); - ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} -> + ChannelState = #{mode := aggregated} -> run_aggregated_upload(InstId, [Data], ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} @@ -317,7 +317,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) {ok, _Result} | {error, _Reason}. on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) -> case maps:get(Tag, Channels, undefined) of - ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} -> + ChannelState = #{mode := aggregated} -> Records = [Data0 | [Data || {_, Data} <- Rest]], run_aggregated_upload(InstId, Records, ChannelState); undefined -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl index 44e2360b8..6f79de907 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -18,10 +18,22 @@ desc/1 ]). +%% Interpreting options +-export([ + mk_key_template/1, + mk_upload_options/1 +]). + -export([ bridge_v2_examples/1 ]). +%% Internal exports +-export([convert_actions/2]). + +-define(DEFAULT_AGGREG_BATCH_SIZE, 100). +-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>). + %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API %%------------------------------------------------------------------------------------------------- @@ -44,25 +56,37 @@ fields(action) -> hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), #{ desc => <<"S3 Upload Action Config">>, - required => false + required => false, + converter => fun ?MODULE:convert_actions/2 } )}; fields(?ACTION) -> emqx_bridge_v2_schema:make_producer_action_schema( hoconsc:mk( - ?R_REF(s3_upload_parameters), + mkunion(mode, #{ + <<"direct">> => ?R_REF(s3_direct_upload_parameters), + <<"aggregated">> => ?R_REF(s3_aggregated_upload_parameters) + }), #{ required => true, desc => ?DESC(s3_upload) } ), #{ - resource_opts_ref => ?R_REF(s3_action_resource_opts) + resource_opts_ref => ?R_REF(s3_upload_resource_opts) } ); -fields(s3_upload_parameters) -> +fields(s3_direct_upload_parameters) -> emqx_s3_schema:fields(s3_upload) ++ [ + {mode, + hoconsc:mk( + direct, + #{ + required => true, + desc => ?DESC(s3_direct_upload_mode) + } + )}, {content, hoconsc:mk( emqx_schema:template(), @@ -73,49 +97,224 @@ fields(s3_upload_parameters) -> } )} ]; -fields(s3_action_resource_opts) -> - UnsupportedOpts = [batch_size, batch_time], - lists:filter( - fun({N, _}) -> not lists:member(N, UnsupportedOpts) end, - emqx_bridge_v2_schema:action_resource_opts_fields() - ). +fields(s3_aggregated_upload_parameters) -> + lists:append([ + [ + {mode, + hoconsc:mk( + aggregated, + #{ + required => true, + desc => ?DESC(s3_aggregated_upload_mode) + } + )}, + {container, + hoconsc:mk( + mkunion(type, #{ + <<"csv">> => ?REF(s3_aggregated_container_csv) + }), + #{ + required => true, + default => #{<<"type">> => <<"csv">>}, + desc => ?DESC(s3_aggregated_container) + } + )}, + {aggregation, + hoconsc:mk( + ?REF(s3_aggregation), + #{ + required => true, + desc => ?DESC(s3_aggregation) + } + )} + ], + emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ + {key, #{desc => ?DESC(s3_aggregated_upload_key)}} + ]), + emqx_s3_schema:fields(s3_uploader) + ]); +fields(s3_aggregated_container_csv) -> + [ + {type, + hoconsc:mk( + csv, + #{ + required => true, + desc => ?DESC(s3_aggregated_container_csv) + } + )}, + {column_order, + hoconsc:mk( + hoconsc:array(string()), + #{ + required => false, + default => [], + desc => ?DESC(s3_aggregated_container_csv_column_order) + } + )} + ]; +fields(s3_aggregation) -> + [ + %% TODO: Needs bucketing? (e.g. messages falling in this 1h interval) + {time_interval, + hoconsc:mk( + emqx_schema:duration_s(), + #{ + required => false, + default => <<"1h">>, + desc => ?DESC(s3_aggregation_interval) + } + )}, + {max_records, + hoconsc:mk( + pos_integer(), + #{ + required => false, + default => <<"1000000">>, + desc => ?DESC(s3_aggregation_max_records) + } + )} + ]; +fields(s3_upload_resource_opts) -> + %% NOTE: Aggregated action should benefit from generous batching defaults. + emqx_bridge_v2_schema:action_resource_opts_fields([ + {batch_size, #{default => ?DEFAULT_AGGREG_BATCH_SIZE}}, + {batch_time, #{default => ?DEFAULT_AGGREG_BATCH_TIME}} + ]). + +mkunion(Field, Schemas) -> + hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end). + +scunion(_Field, Schemas, all_union_members) -> + maps:values(Schemas); +scunion(Field, Schemas, {value, Value}) -> + Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined), + case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of + {ok, Schema} -> + [Schema]; + _Error -> + throw(#{field_name => Field, expected => maps:keys(Schemas)}) + end. desc(s3) -> ?DESC(s3_upload); desc(Name) when Name == s3_upload; - Name == s3_upload_parameters + Name == s3_direct_upload_parameters; + Name == s3_aggregated_upload_parameters; + Name == s3_aggregation; + Name == s3_aggregated_container_csv -> ?DESC(Name); -desc(s3_action_resource_opts) -> +desc(s3_upload_resource_opts) -> ?DESC(emqx_resource_schema, resource_opts); desc(_Name) -> undefined. +convert_actions(Conf = #{}, Opts) -> + maps:map(fun(_Name, ConfAction) -> convert_action(ConfAction, Opts) end, Conf); +convert_actions(undefined, _) -> + undefined. + +convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := ResourceOpts}, _) -> + case Params of + #{<<"mode">> := <<"direct">>} -> + %% NOTE: Disable batching for direct uploads. + NResourceOpts = ResourceOpts#{<<"batch_size">> => 1, <<"batch_time">> => 0}, + Conf#{<<"resource_opts">> := NResourceOpts}; + #{} -> + Conf + end. + +%% Interpreting options + +-spec mk_key_template(_Parameters :: map()) -> emqx_template:str(). +mk_key_template(#{key := Key}) -> + Template = emqx_template:parse(Key), + {_, BindingErrors} = emqx_template:render(Template, #{}), + {UsedBindings, _} = lists:unzip(BindingErrors), + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + Template; + false -> + Template ++ SuffixTemplate + end. + +mk_suffix_template(UsedBindings) -> + RequiredBindings = ["action", "node", "datetime.", "sequence"], + SuffixBindings = [ + mk_default_binding(RB) + || RB <- RequiredBindings, + lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings) + ], + SuffixTemplate = [["/", B] || B <- SuffixBindings], + emqx_template:parse(SuffixTemplate). + +mk_default_binding("datetime.") -> + "${datetime.rfc3339utc}"; +mk_default_binding(Binding) -> + "${" ++ Binding ++ "}". + +-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options(). +mk_upload_options(Parameters) -> + Headers = mk_upload_headers(Parameters), + #{ + headers => Headers, + acl => maps:get(acl, Parameters, undefined) + }. + +mk_upload_headers(Parameters = #{container := Container}) -> + Headers = normalize_headers(maps:get(headers, Parameters, #{})), + ContainerHeaders = mk_container_headers(Container), + maps:merge(ContainerHeaders, Headers). + +normalize_headers(Headers) -> + maps:fold( + fun(Header, Value, Acc) -> + maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc) + end, + #{}, + Headers + ). + +mk_container_headers(#{type := csv}) -> + #{"content-type" => "text/csv"}; +mk_container_headers(#{}) -> + #{}. + %% Examples bridge_v2_examples(Method) -> [ #{ <<"s3">> => #{ - summary => <<"S3 Simple Upload">>, - value => s3_upload_action_example(Method) + summary => <<"S3 Direct Upload">>, + value => s3_upload_action_example(Method, direct) + }, + <<"s3_aggreg">> => #{ + summary => <<"S3 Aggregated Upload">>, + value => s3_upload_action_example(Method, aggreg) } } ]. -s3_upload_action_example(post) -> +s3_upload_action_example(post, Mode) -> maps:merge( - s3_upload_action_example(put), + s3_upload_action_example(put, Mode), #{ type => atom_to_binary(?ACTION_UPLOAD), - name => <<"my_s3_action">> + name => <<"my_s3_action">>, + enable => true, + connector => <<"my_s3_connector">> } ); -s3_upload_action_example(get) -> +s3_upload_action_example(get, Mode) -> maps:merge( - s3_upload_action_example(put), + s3_upload_action_example(put, Mode), #{ + enable => true, + connector => <<"my_s3_connector">>, status => <<"connected">>, node_status => [ #{ @@ -125,12 +324,11 @@ s3_upload_action_example(get) -> ] } ); -s3_upload_action_example(put) -> +s3_upload_action_example(put, direct) -> #{ - enable => true, - connector => <<"my_s3_connector">>, - description => <<"My action">>, + description => <<"My upload action">>, parameters => #{ + mode => <<"direct">>, bucket => <<"${clientid}">>, key => <<"${topic}">>, content => <<"${payload}">>, @@ -140,4 +338,27 @@ s3_upload_action_example(put) -> query_mode => <<"sync">>, inflight_window => 10 } + }; +s3_upload_action_example(put, aggreg) -> + #{ + description => <<"My aggregated upload action">>, + parameters => #{ + mode => <<"aggregated">>, + bucket => <<"mqtt-aggregated">>, + key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>, + acl => <<"public_read">>, + aggregation => #{ + time_interval => <<"15m">>, + max_records => 100_000 + }, + <<"container">> => #{ + type => <<"csv">>, + column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>] + } + }, + resource_opts => #{ + health_check_interval => <<"10s">>, + query_mode => <<"async">>, + inflight_window => 100 + } }. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index 322666b1f..f8eaa1b3a 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -108,6 +108,7 @@ action_config(Name, ConnectorId) -> <<"enable">> => true, <<"connector">> => ConnectorId, <<"parameters">> => #{ + <<"mode">> => <<"direct">>, <<"bucket">> => <<"${clientid}">>, <<"key">> => <<"${topic}">>, <<"content">> => <<"${payload}">>, @@ -122,6 +123,8 @@ action_config(Name, ConnectorId) -> <<"metrics_flush_interval">> => <<"1s">>, <<"query_mode">> => <<"sync">>, <<"request_ttl">> => <<"60s">>, + <<"batch_size">> => 42, + <<"batch_time">> => <<"100ms">>, <<"resume_interval">> => <<"3s">>, <<"worker_pool_size">> => <<"4">> } @@ -131,6 +134,13 @@ action_config(Name, ConnectorId) -> t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). +t_ignore_batch_opts(Config) -> + {ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config), + ?assertMatch( + #{<<"resource_opts">> := #{<<"batch_size">> := 1, <<"batch_time">> := 0}}, + Bridge + ). + t_start_broken_update_restart(Config) -> Name = ?config(connector_name, Config), Type = ?config(connector_type, Config), diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index af121ed8d..538e7b306 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -14,7 +14,7 @@ -import(emqx_utils_conv, [bin/1]). %% See `emqx_bridge_s3.hrl`. --define(BRIDGE_TYPE, <<"s3_aggregated_upload">>). +-define(BRIDGE_TYPE, <<"s3">>). -define(CONNECTOR_TYPE, <<"s3">>). -define(PROXY_NAME, "minio_tcp"). @@ -122,6 +122,7 @@ action_config(Name, ConnectorId, Bucket) -> <<"enable">> => true, <<"connector">> => ConnectorId, <<"parameters">> => #{ + <<"mode">> => <<"aggregated">>, <<"bucket">> => unicode:characters_to_binary(Bucket), <<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>, <<"acl">> => <<"public_read">>, diff --git a/rel/i18n/emqx_bridge_s3_aggreg_upload.hocon b/rel/i18n/emqx_bridge_s3_aggreg_upload.hocon deleted file mode 100644 index 07239a32d..000000000 --- a/rel/i18n/emqx_bridge_s3_aggreg_upload.hocon +++ /dev/null @@ -1,64 +0,0 @@ -emqx_bridge_s3_aggreg_upload { - -s3_aggregated_upload.label: -"""S3 Aggregated Upload""" -s3_aggregated_upload.desc: -"""Action that enables time-based aggregation of incoming events and uploading them to the S3 service as a single object.""" - -s3_aggregated_upload_parameters.label: -"""S3 Aggregated Upload action parameters""" -s3_aggregated_upload_parameters.desc: -"""Set of parameters for the aggregated upload action.""" - -s3_aggregation.label: -"""Aggregation parameters""" -s3_aggregation.desc: -"""Set of parameters governing the aggregation process.""" - -s3_aggregation_interval.label: -"""Time interval""" -s3_aggregation_interval.desc: -"""Amount of time events will be aggregated in a single object before uploading.""" - -s3_aggregation_max_records.label: -"""Maximum number of records""" -s3_aggregation_max_records.desc: -"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.
-If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key.""" - -s3_aggregated_container.label: -"""Container for aggregated events""" -s3_aggregated_container.desc: -"""Settings governing the file format of an upload containing aggregated events.""" - -s3_aggregated_container_csv.label: -"""CSV container""" -s3_aggregated_container_csv.desc: -"""Records (events) will be aggregated and uploaded as a CSV file.""" - -s3_aggregated_container_csv_column_order.label: -"""CSV column order""" -s3_aggregated_container_csv_column_order.desc: -"""Event fields that will be ordered first as columns in the resulting CSV file.
-Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order.""" - -s3_aggregated_upload_key.label: -"""S3 object key template""" -s3_aggregated_upload_key.desc: -"""Template for the S3 object key of an aggregated upload.
-Template may contain placeholders for the following variables: - -All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template.""" -} diff --git a/rel/i18n/emqx_bridge_s3_upload.hocon b/rel/i18n/emqx_bridge_s3_upload.hocon index 7d08cfaa5..8b3768078 100644 --- a/rel/i18n/emqx_bridge_s3_upload.hocon +++ b/rel/i18n/emqx_bridge_s3_upload.hocon @@ -1,13 +1,23 @@ emqx_bridge_s3_upload { s3_upload.label: -"""S3 Simple Upload""" +"""Upload to S3""" s3_upload.desc: -"""Action to upload a single object to the S3 service.""" +"""Action that takes incoming events and uploads them to the S3 API compatible service.""" -s3_upload_parameters.label: -"""S3 Upload action parameters""" -s3_upload_parameters.desc: +s3_parameters.label: +"""S3 Upload parameters""" +s3_parameters.desc: +"""Set of parameters for the upload action.""" + +s3_direct_upload_mode.label: +"""Direct S3 Upload""" +s3_direct_upload_mode.desc: +"""Enables uploading of events to the S3 service as separate objects.""" + +s3_direct_upload_parameters.label: +"""Direct S3 Upload action parameters""" +s3_direct_upload_parameters.desc: """Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content.""" s3_object_content.label: @@ -15,4 +25,66 @@ s3_object_content.label: s3_object_content.desc: """Content of the S3 object being uploaded. Supports templates.""" +s3_aggregated_upload_mode.label: +"""Aggregated S3 Upload""" +s3_aggregated_upload_mode.desc: +"""Enables time-based aggregation of incoming events and uploading them to the S3 service as a single object.""" + +s3_aggregated_upload_parameters.label: +"""Aggregated S3 Upload action parameters""" +s3_aggregated_upload_parameters.desc: +"""Set of parameters for the aggregated upload action.""" + +s3_aggregation.label: +"""Aggregation parameters""" +s3_aggregation.desc: +"""Set of parameters governing the aggregation process.""" + +s3_aggregation_interval.label: +"""Time interval""" +s3_aggregation_interval.desc: +"""Amount of time events will be aggregated in a single object before uploading.""" + +s3_aggregation_max_records.label: +"""Maximum number of records""" +s3_aggregation_max_records.desc: +"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.
+If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key.""" + +s3_aggregated_container.label: +"""Container for aggregated events""" +s3_aggregated_container.desc: +"""Settings governing the file format of an upload containing aggregated events.""" + +s3_aggregated_container_csv.label: +"""CSV container""" +s3_aggregated_container_csv.desc: +"""Records (events) will be aggregated and uploaded as a CSV file.""" + +s3_aggregated_container_csv_column_order.label: +"""CSV column order""" +s3_aggregated_container_csv_column_order.desc: +"""Event fields that will be ordered first as columns in the resulting CSV file.
+Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order.""" + +s3_aggregated_upload_key.label: +"""S3 object key template""" +s3_aggregated_upload_key.desc: +"""Template for the S3 object key of an aggregated upload.
+Template may contain placeholders for the following variables: + +All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template.""" + }