diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e74e6aa3e..f70d58e23 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -106,7 +106,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_opents_action_info, emqx_bridge_rabbitmq_action_info, emqx_bridge_greptimedb_action_info, - emqx_bridge_tdengine_action_info + emqx_bridge_tdengine_action_info, + emqx_bridge_s3_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_s3/BSL.txt b/apps/emqx_bridge_s3/BSL.txt new file mode 100644 index 000000000..f0cd31c6f --- /dev/null +++ b/apps/emqx_bridge_s3/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2028-01-26 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_s3/README.md b/apps/emqx_bridge_s3/README.md new file mode 100644 index 000000000..ac542b468 --- /dev/null +++ b/apps/emqx_bridge_s3/README.md @@ -0,0 +1,16 @@ +# EMQX S3 Bridge + +This application provides connector and action implementations for the EMQX to integrate with Amazon S3 compatible storage services as part of the EMQX data integration pipelines. +Users can leverage [EMQX Rule Engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) to create rules that publish message data to S3 storage service. + +## Documentation + +Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) for the EMQX rules engine introduction. + +## Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +## License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_bridge_s3/docker-ct b/apps/emqx_bridge_s3/docker-ct new file mode 100644 index 000000000..a5a001815 --- /dev/null +++ b/apps/emqx_bridge_s3/docker-ct @@ -0,0 +1,2 @@ +minio +toxiproxy diff --git a/apps/emqx_bridge_s3/rebar.config b/apps/emqx_bridge_s3/rebar.config new file mode 100644 index 000000000..51bf0e0b6 --- /dev/null +++ b/apps/emqx_bridge_s3/rebar.config @@ -0,0 +1,6 @@ +%% -*- mode: erlang; -*- + +{erl_opts, [debug_info]}. +{deps, [ + {emqx_resource, {path, "../../apps/emqx_resource"}} +]}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src new file mode 100644 index 000000000..0047b5e51 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -0,0 +1,17 @@ +{application, emqx_bridge_s3, [ + {description, "EMQX Enterprise S3 Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + erlcloud, + emqx_resource, + emqx_s3 + ]}, + {env, [ + {emqx_action_info_modules, [emqx_bridge_s3_action_info]} + ]}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl new file mode 100644 index 000000000..eff5282db --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.erl @@ -0,0 +1,217 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_s3.hrl"). + +-behaviour(hocon_schema). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-export([ + bridge_v2_examples/1, + connector_examples/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_s3". + +roots() -> + []. + +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config)); +fields(Field) when + Field == "get_bridge_v2"; + Field == "put_bridge_v2"; + Field == "post_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION)); +fields(action) -> + {?ACTION, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)), + #{ + desc => <<"S3 Action Config">>, + required => false + } + )}; +fields("config_connector") -> + lists:append([ + emqx_connector_schema:common_fields(), + fields(s3_connector_config), + emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts) + ]); +fields(?ACTION) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + ?R_REF(s3_upload_parameters), + #{ + required => true, + desc => ?DESC(s3_upload) + } + ), + #{ + resource_opts_ref => ?R_REF(s3_action_resource_opts) + } + ); +fields(s3_connector_config) -> + emqx_s3_schema:fields(s3_client); +fields(s3_upload_parameters) -> + emqx_s3_schema:fields(s3_upload) ++ + [ + {content, + hoconsc:mk( + string(), + #{ + required => false, + default => <<"${.}">>, + desc => ?DESC(s3_object_content) + } + )} + ]; +fields(s3_action_resource_opts) -> + UnsupportedOpts = [batch_size, batch_time], + lists:filter( + fun({N, _}) -> not lists:member(N, UnsupportedOpts) end, + emqx_bridge_v2_schema:action_resource_opts_fields() + ); +fields(s3_connector_resource_opts) -> + CommonOpts = emqx_connector_schema:common_resource_opts_subfields(), + lists:filter( + fun({N, _}) -> lists:member(N, CommonOpts) end, + emqx_connector_schema:resource_opts_fields() + ). + +desc("config_connector") -> + ?DESC(config_connector); +desc(?ACTION) -> + ?DESC(s3_upload); +desc(s3_upload) -> + ?DESC(s3_upload); +desc(s3_upload_parameters) -> + ?DESC(s3_upload_parameters); +desc(s3_action_resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(s3_connector_resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"s3">> => #{ + summary => <<"S3 Simple Upload">>, + value => action_example(Method) + } + } + ]. + +action_example(post) -> + maps:merge( + action_example(put), + #{ + type => atom_to_binary(?ACTION), + name => <<"my_s3_action">> + } + ); +action_example(get) -> + maps:merge( + action_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +action_example(put) -> + #{ + enable => true, + connector => <<"my_s3_connector">>, + description => <<"My action">>, + parameters => #{ + bucket => <<"${clientid}">>, + key => <<"${topic}">>, + content => <<"${payload}">>, + acl => <<"public_read">> + }, + resource_opts => #{ + query_mode => <<"sync">>, + inflight_window => 10 + } + }. + +connector_examples(Method) -> + [ + #{ + <<"s3_aws">> => #{ + summary => <<"S3 Connector">>, + value => connector_example(Method) + } + } + ]. + +connector_example(get) -> + maps:merge( + connector_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +connector_example(post) -> + maps:merge( + connector_example(put), + #{ + type => atom_to_binary(?CONNECTOR), + name => <<"my_s3_connector">> + } + ); +connector_example(put) -> + #{ + enable => true, + description => <<"My S3 connector">>, + host => <<"s3.eu-east-1.amazonaws.com">>, + port => 443, + access_key_id => <<"ACCESS">>, + secret_access_key => <<"SECRET">>, + transport_options => #{ + ssl => #{ + enable => true, + verify => <<"verify_peer">> + }, + connect_timeout => <<"1s">>, + request_timeout => <<"60s">>, + pool_size => 4, + max_retries => 1, + enable_pipelining => 1 + } + }. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl new file mode 100644 index 000000000..6d500d056 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.hrl @@ -0,0 +1,11 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(__EMQX_BRIDGE_S3_HRL__). +-define(__EMQX_BRIDGE_S3_HRL__, true). + +-define(ACTION, s3). +-define(CONNECTOR, s3). + +-endif. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl new file mode 100644 index 000000000..646173bf4 --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_action_info.erl @@ -0,0 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_action_info). + +-behaviour(emqx_action_info). + +-export([ + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +action_type_name() -> s3. + +connector_type_name() -> s3. + +schema_module() -> emqx_bridge_s3. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl new file mode 100644 index 000000000..9a0f110fe --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -0,0 +1,222 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_connector). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-behaviour(emqx_resource). +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_query/3, + % on_batch_query/3, + on_get_status/2, + on_get_channel_status/3 +]). + +-type config() :: #{ + access_key_id => string(), + secret_access_key => emqx_secret:t(string()), + host := string(), + port := pos_integer(), + transport_options => emqx_s3:transport_options() +}. + +-type channel_config() :: #{ + parameters := #{ + bucket := string(), + key := string(), + content := string(), + acl => emqx_s3:acl() + } +}. + +-type channel_state() :: #{ + bucket := emqx_template:str(), + key := emqx_template:str(), + upload_options => emqx_s3_client:upload_options() +}. + +-type state() :: #{ + pool_name := resource_id(), + pool_pid => pid(), + client_config := emqx_s3_client:config(), + channels := #{channel_id() => channel_state()} +}. + +%% + +-spec callback_mode() -> callback_mode(). +callback_mode() -> + always_sync. + +%% Management + +-spec on_start(_InstanceId :: resource_id(), config()) -> + {ok, state()} | {error, _Reason}. +on_start(InstId, Config) -> + PoolName = InstId, + S3Config = Config#{url_expire_time => 0}, + State = #{ + pool_name => PoolName, + client_config => emqx_s3_profile_conf:client_config(S3Config, PoolName), + channels => #{} + }, + HttpConfig = emqx_s3_profile_conf:http_config(Config), + case ehttpc_sup:start_pool(PoolName, HttpConfig) of + {ok, Pid} -> + ?SLOG(info, #{msg => "s3_connector_start_http_pool_success", pool_name => PoolName}), + {ok, State#{pool_pid => Pid}}; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "s3_connector_start_http_pool_fail", + pool_name => PoolName, + http_config => HttpConfig, + reason => Reason + }), + Error + end. + +-spec on_stop(_InstanceId :: resource_id(), state()) -> + ok. +on_stop(InstId, _State = #{pool_name := PoolName}) -> + case ehttpc_sup:stop_pool(PoolName) of + ok -> + ?tp(s3_bridge_stopped, #{instance_id => InstId}), + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "s3_connector_http_pool_stop_fail", + pool_name => PoolName, + reason => Reason + }), + ok + end. + +-spec on_get_status(_InstanceId :: resource_id(), state()) -> + health_check_status(). +on_get_status(_InstId, State = #{client_config := Config}) -> + try erlcloud_s3:list_buckets(emqx_s3_client:aws_config(Config)) of + Props when is_list(Props) -> + ?status_connected + catch + error:{aws_error, {http_error, _Code, _, Reason}} -> + {?status_disconnected, State, Reason}; + error:{aws_error, {socket_error, Reason}} -> + {?status_disconnected, State, Reason} + end. + +-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> + {ok, state()} | {error, _Reason}. +on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> + ChannelState = init_channel_state(Config), + {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. + +-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> + {ok, state()}. +on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) -> + {ok, State#{channels => maps:remove(ChannelId, Channels)}}. + +-spec on_get_channels(_InstanceId :: resource_id()) -> + [_ChannelConfig]. +on_get_channels(InstId) -> + emqx_bridge_v2:get_channels_for_connector(InstId). + +-spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) -> + channel_status(). +on_get_channel_status(_InstId, ChannelId, #{channels := Channels}) -> + case maps:get(ChannelId, Channels, undefined) of + _ChannelState = #{} -> + %% TODO + %% Since bucket name may be templated, we can't really provide any + %% additional information regarding the channel health. + ?status_connected; + undefined -> + ?status_disconnected + end. + +init_channel_state(#{parameters := Parameters}) -> + #{ + bucket => emqx_template:parse(maps:get(bucket, Parameters)), + key => emqx_template:parse(maps:get(key, Parameters)), + content => emqx_template:parse(maps:get(content, Parameters)), + upload_options => #{ + acl => maps:get(acl, Parameters, undefined) + } + }. + +%% Queries + +-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}. + +-spec on_query(_InstanceId :: resource_id(), query(), state()) -> + {ok, _Result} | {error, _Reason}. +on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) -> + case maps:get(Tag, Channels, undefined) of + ChannelState = #{} -> + run_simple_upload(InstId, Data, ChannelState, Config); + undefined -> + {error, {unrecoverable_error, {invalid_message_tag, Tag}}} + end. + +run_simple_upload( + InstId, + Data, + #{ + bucket := BucketTemplate, + key := KeyTemplate, + content := ContentTemplate, + upload_options := UploadOpts + }, + Config +) -> + Bucket = render_bucket(BucketTemplate, Data), + Client = emqx_s3_client:create(Bucket, Config), + Key = render_key(KeyTemplate, Data), + Content = render_content(ContentTemplate, Data), + case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of + ok -> + ?tp(s3_bridge_connector_upload_ok, #{ + instance_id => InstId, + bucket => Bucket, + key => Key + }), + ok; + {error, Reason} -> + {error, map_error(Reason)} + end. + +map_error({socket_error, _} = Reason) -> + {recoverable_error, Reason}; +map_error(Reason) -> + %% TODO: Recoverable errors. + {unrecoverable_error, Reason}. + +render_bucket(Template, Data) -> + case emqx_template:render(Template, {emqx_jsonish, Data}) of + {Result, []} -> + iolist_to_string(Result); + {_, Errors} -> + erlang:error({unrecoverable_error, {bucket_undefined, Errors}}) + end. + +render_key(Template, Data) -> + %% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`. + {Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}), + iolist_to_string(Result). + +render_content(Template, Data) -> + %% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`. + {Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}), + Result. + +iolist_to_string(IOList) -> + unicode:characters_to_list(IOList). diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl new file mode 100644 index 000000000..5de30578b --- /dev/null +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -0,0 +1,171 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-import(emqx_utils_conv, [bin/1]). + +%% See `emqx_bridge_s3.hrl`. +-define(BRIDGE_TYPE, <<"s3">>). +-define(CONNECTOR_TYPE, <<"s3">>). + +-define(PROXY_NAME, "minio_tcp"). +-define(CONTENT_TYPE, "application/x-emqx-payload"). + +%% CT Setup + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + _ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_s3, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [ + {apps, Apps}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {proxy_name, ?PROXY_NAME} + | Config + ]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +%% Testcases + +init_per_testcase(TestCase, Config) -> + ct:timetrap(timer:seconds(30)), + ok = snabbkaffe:start_trace(), + Name = iolist_to_binary(io_lib:format("~s~p", [TestCase, erlang:unique_integer()])), + ConnectorConfig = connector_config(Name, Config), + ActionConfig = action_config(Name, Name), + [ + {connector_type, ?CONNECTOR_TYPE}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, ?BRIDGE_TYPE}, + {bridge_name, Name}, + {bridge_config, ActionConfig} + | Config + ]. + +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), + ok. + +connector_config(Name, _Config) -> + BaseConf = emqx_s3_test_helpers:base_raw_config(tcp), + parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{ + <<"enable">> => true, + <<"description">> => <<"S3 Connector">>, + <<"host">> => maps:get(<<"host">>, BaseConf), + <<"port">> => maps:get(<<"port">>, BaseConf), + <<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf), + <<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf), + <<"transport_options">> => #{ + <<"headers">> => #{ + <<"content-type">> => <> + }, + <<"connect_timeout">> => 1000, + <<"request_timeout">> => 1000, + <<"pool_size">> => 4, + <<"max_retries">> => 0, + <<"enable_pipelining">> => 1 + } + }). + +action_config(Name, ConnectorId) -> + parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{ + <<"enable">> => true, + <<"connector">> => ConnectorId, + <<"parameters">> => #{ + <<"bucket">> => <<"${clientid}">>, + <<"key">> => <<"${topic}">>, + <<"content">> => <<"${payload}">>, + <<"acl">> => <<"public_read">> + }, + <<"resource_opts">> => #{ + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"5s">>, + <<"inflight_window">> => 40, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"60s">>, + <<"resume_interval">> => <<"5s">>, + <<"worker_pool_size">> => <<"4">> + } + }). + +parse_and_check_config(Root, Type, Name, ConfigIn) -> + Schema = + case Root of + <<"connectors">> -> emqx_connector_schema; + <<"actions">> -> emqx_bridge_v2_schema + end, + #{Root := #{Type := #{Name := Config}}} = + hocon_tconf:check_plain( + Schema, + #{Root => #{Type => #{Name => ConfigIn}}}, + #{required => false, atom_key => false} + ), + ct:pal("parsed config: ~p", [Config]), + ConfigIn. + +t_start_stop(Config) -> + emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). + +t_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{}). + +t_sync_query(Config) -> + Bucket = emqx_s3_test_helpers:unique_bucket(), + Topic = "a/b/c", + Payload = rand:bytes(1024), + AwsConfig = emqx_s3_test_helpers:aws_config(tcp), + ok = erlcloud_s3:create_bucket(Bucket, AwsConfig), + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + fun() -> mk_message(Bucket, Topic, Payload) end, + fun(Res) -> ?assertMatch(ok, Res) end, + s3_bridge_connector_upload_ok + ), + ?assertMatch( + #{ + content := Payload, + content_type := ?CONTENT_TYPE + }, + maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig)) + ). + +mk_message(ClientId, Topic, Payload) -> + Message = emqx_message:make(bin(ClientId), bin(Topic), Payload), + {Event, _} = emqx_rule_events:eventmsg_publish(Message), + Event. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 2935233be..40e3e0e40 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -66,6 +66,8 @@ resource_type(tdengine) -> emqx_bridge_tdengine_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; +resource_type(s3) -> + emqx_bridge_s3_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -270,6 +272,14 @@ connector_structs() -> desc => <<"RabbitMQ Connector Config">>, required => false } + )}, + {s3, + mk( + hoconsc:map(name, ref(emqx_bridge_s3, "config_connector")), + #{ + desc => <<"S3 Connector Config">>, + required => false + } )} ]. @@ -296,7 +306,8 @@ schema_modules() -> emqx_bridge_rabbitmq_connector_schema, emqx_bridge_opents_connector, emqx_bridge_greptimedb, - emqx_bridge_tdengine_connector + emqx_bridge_tdengine_connector, + emqx_bridge_s3 ]. api_schemas(Method) -> @@ -332,7 +343,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"), - api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method) + api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method), + api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index b803ab9f2..b4e299ab2 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -167,7 +167,9 @@ connector_type_to_bridge_types(greptimedb) -> connector_type_to_bridge_types(tdengine) -> [tdengine]; connector_type_to_bridge_types(rabbitmq) -> - [rabbitmq]. + [rabbitmq]; +connector_type_to_bridge_types(s3) -> + [s3]. actions_config_name(action) -> <<"actions">>; actions_config_name(source) -> <<"sources">>. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 56ca4acb0..e5810dcc5 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -364,8 +364,8 @@ parse_spec_ref(Module, Path, Options) -> -ifdef(TEST). -spec failed_to_generate_swagger_spec(_, _, _, _, _) -> no_return(). -failed_to_generate_swagger_spec(Module, Path, _Error, _Reason, _Stacktrace) -> - error({failed_to_generate_swagger_spec, Module, Path}). +failed_to_generate_swagger_spec(Module, Path, Error, Reason, Stacktrace) -> + error({failed_to_generate_swagger_spec, Module, Path, Error, Reason, Stacktrace}). -else. -spec failed_to_generate_swagger_spec(_, _, _, _, _) -> no_return(). failed_to_generate_swagger_spec(Module, Path, Error, Reason, Stacktrace) -> diff --git a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl index 1bb42f324..0e1264aeb 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl @@ -310,7 +310,7 @@ t_nest_ref(_Config) -> t_none_ref(_Config) -> Path = "/ref/none", ?assertError( - {failed_to_generate_swagger_spec, ?MODULE, Path}, + {failed_to_generate_swagger_spec, ?MODULE, Path, error, _FunctionClause, _Stacktrace}, emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}) ), ok. diff --git a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl index 85cc4b16b..5ccb01b3e 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl @@ -282,7 +282,7 @@ t_bad_ref(_Config) -> t_none_ref(_Config) -> Path = "/ref/none", ?assertError( - {failed_to_generate_swagger_spec, ?MODULE, Path}, + {failed_to_generate_swagger_spec, ?MODULE, Path, error, _FunctionClause, _Stacktrace}, validate(Path, #{}, []) ), ok. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index 844896a2f..ad061ca0f 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -69,11 +69,9 @@ -spec start_export(options(), transfer(), filemeta()) -> {ok, export_st()} | {error, term()}. start_export(_Options, Transfer, Filemeta) -> - Options = #{ - key => s3_key(Transfer, Filemeta), - headers => s3_headers(Transfer, Filemeta) - }, - case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of + Key = s3_key(Transfer, Filemeta), + UploadOpts = #{headers => s3_headers(Transfer, Filemeta)}, + case emqx_s3:start_uploader(?S3_PROFILE_ID, Key, UploadOpts) of {ok, Pid} -> true = erlang:link(Pid), {ok, #{filemeta => Filemeta, pid => Pid}}; @@ -180,22 +178,24 @@ list_pages(Client, Marker, Limit, Acc) -> ListOptions = [{marker, Marker} || Marker =/= undefined], case list_key_info(Client, [{max_keys, MaxKeys} | ListOptions]) of {ok, {Exports, NextMarker}} -> - list_accumulate(Client, Limit, NextMarker, [Exports | Acc]); + Left = update_limit(Limit, Exports), + NextAcc = [Exports | Acc], + case NextMarker of + undefined -> + {ok, {flatten_pages(NextAcc), undefined}}; + _ when Left =< 0 -> + {ok, {flatten_pages(NextAcc), NextMarker}}; + _ -> + list_pages(Client, NextMarker, Left, NextAcc) + end; {error, _Reason} = Error -> Error end. -list_accumulate(_Client, _Limit, undefined, Acc) -> - {ok, {flatten_pages(Acc), undefined}}; -list_accumulate(Client, undefined, Marker, Acc) -> - list_pages(Client, Marker, undefined, Acc); -list_accumulate(Client, Limit, Marker, Acc = [Exports | _]) -> - case Limit - length(Exports) of - 0 -> - {ok, {flatten_pages(Acc), Marker}}; - Left -> - list_pages(Client, Marker, Left, Acc) - end. +update_limit(undefined, _Exports) -> + undefined; +update_limit(Limit, Exports) -> + Limit - length(Exports). flatten_pages(Pages) -> lists:append(lists:reverse(Pages)). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index f7e78c360..a6559bcab 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -114,6 +114,7 @@ emqx_bridge_oracle, emqx_bridge_rabbitmq, emqx_bridge_azure_event_hub, + emqx_bridge_s3, emqx_schema_registry, emqx_eviction_agent, emqx_node_rebalance, diff --git a/apps/emqx_s3/README.md b/apps/emqx_s3/README.md index 4ce1b0c0a..c2627cc08 100644 --- a/apps/emqx_s3/README.md +++ b/apps/emqx_s3/README.md @@ -18,7 +18,7 @@ The steps to integrate this application are: `ProfileName` is a unique name used to distinguish different sets of S3 settings. Each profile has its own connection pool and configuration. To use S3 from a _client_ application: -* Create an uploader process with `{ok, Pid} = emqx_s3:start_uploader(ProfileName, #{key => MyKey})`. +* Create an uploader process with `{ok, Pid} = emqx_s3:start_uploader(ProfileName, MyKey, _Opts = #{})`. * Write data with `emqx_s3_uploader:write(Pid, <<"data">>)`. * Finish the uploader with `emqx_s3_uploader:complete(Pid)` or `emqx_s3_uploader:abort(Pid)`. diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl index 87996e2fc..b499fbfd1 100644 --- a/apps/emqx_s3/src/emqx_s3.erl +++ b/apps/emqx_s3/src/emqx_s3.erl @@ -10,7 +10,7 @@ start_profile/2, stop_profile/1, update_profile/2, - start_uploader/2, + start_uploader/3, with_client/2 ]). @@ -22,6 +22,7 @@ -export_type([ profile_id/0, profile_config/0, + transport_options/0, acl/0 ]). @@ -81,18 +82,18 @@ stop_profile(ProfileId) when ?IS_PROFILE_ID(ProfileId) -> update_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) -> emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig). --spec start_uploader(profile_id(), emqx_s3_uploader:opts()) -> +-spec start_uploader(profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) -> emqx_types:startlink_ret() | {error, profile_not_found}. -start_uploader(ProfileId, Opts) when ?IS_PROFILE_ID(ProfileId) -> - emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts). +start_uploader(ProfileId, Key, Props) when ?IS_PROFILE_ID(ProfileId) -> + emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Key, Props). -spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) -> {error, profile_not_found} | Result. with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(ProfileId) -> case emqx_s3_profile_conf:checkout_config(ProfileId) of - {ok, ClientConfig, _UploadConfig} -> + {Bucket, ClientConfig, _UploadOpts, _UploadConfig} -> try - Fun(emqx_s3_client:create(ClientConfig)) + Fun(emqx_s3_client:create(Bucket, ClientConfig)) after emqx_s3_profile_conf:checkin_config(ProfileId) end; diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index fe0058433..e02134dc1 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -9,12 +9,11 @@ -include_lib("erlcloud/include/erlcloud_aws.hrl"). -export([ - create/1, + create/2, put_object/3, put_object/4, - start_multipart/2, start_multipart/3, upload_part/5, complete_multipart/4, @@ -26,10 +25,15 @@ format_request/1 ]). +%% For connectors +-export([aws_config/1]). + -export_type([ client/0, headers/0, + bucket/0, key/0, + upload_options/0, upload_id/0, etag/0, part_number/0, @@ -39,18 +43,17 @@ -type headers() :: #{binary() | string() => iodata()}. -type erlcloud_headers() :: list({string(), iodata()}). +-type bucket() :: string(). -type key() :: string(). -type part_number() :: non_neg_integer(). -type upload_id() :: string(). -type etag() :: string(). -type http_pool() :: ehttpc:pool_name(). -type pool_type() :: random | hash. --type upload_options() :: list({acl, emqx_s3:acl()}). -opaque client() :: #{ aws_config := aws_config(), - upload_options := upload_options(), - bucket := string(), + bucket := bucket(), headers := erlcloud_headers(), url_expire_time := non_neg_integer(), pool_type := pool_type() @@ -60,9 +63,7 @@ scheme := string(), host := string(), port := part_number(), - bucket := string(), headers := headers(), - acl := emqx_s3:acl() | undefined, url_expire_time := pos_integer(), access_key_id := string() | undefined, secret_access_key := emqx_secret:t(string()) | undefined, @@ -72,6 +73,11 @@ max_retries := non_neg_integer() | undefined }. +-type upload_options() :: #{ + acl => emqx_s3:acl() | undefined, + headers => headers() +}. + -type s3_options() :: proplists:proplist(). -define(DEFAULT_REQUEST_TIMEOUT, 30000). @@ -81,12 +87,11 @@ %% API %%-------------------------------------------------------------------- --spec create(config()) -> client(). -create(Config) -> +-spec create(bucket(), config()) -> client(). +create(Bucket, Config) -> #{ aws_config => aws_config(Config), - upload_options => upload_options(Config), - bucket => maps:get(bucket, Config), + bucket => Bucket, url_expire_time => maps:get(url_expire_time, Config), headers => headers(Config), pool_type => maps:get(pool_type, Config) @@ -94,17 +99,19 @@ create(Config) -> -spec put_object(client(), key(), iodata()) -> ok_or_error(term()). put_object(Client, Key, Value) -> - put_object(Client, #{}, Key, Value). + put_object(Client, Key, #{}, Value). --spec put_object(client(), headers(), key(), iodata()) -> ok_or_error(term()). +-spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()). put_object( - #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig}, - SpecialHeaders, + #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig}, Key, - Value + UploadOpts, + Content ) -> - AllHeaders = join_headers(Headers, SpecialHeaders), - try erlcloud_s3:put_object(Bucket, erlcloud_key(Key), Value, Options, AllHeaders, AwsConfig) of + ECKey = erlcloud_key(Key), + ECOpts = erlcloud_upload_options(UploadOpts), + Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)), + try erlcloud_s3:put_object(Bucket, ECKey, Content, ECOpts, Headers, AwsConfig) of Props when is_list(Props) -> ok catch @@ -113,18 +120,16 @@ put_object( {error, Reason} end. --spec start_multipart(client(), key()) -> ok_or_error(upload_id(), term()). -start_multipart(Client, Key) -> - start_multipart(Client, #{}, Key). - --spec start_multipart(client(), headers(), key()) -> ok_or_error(upload_id(), term()). +-spec start_multipart(client(), key(), upload_options()) -> ok_or_error(upload_id(), term()). start_multipart( - #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig}, - SpecialHeaders, - Key + #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig}, + Key, + UploadOpts ) -> - AllHeaders = join_headers(Headers, SpecialHeaders), - case erlcloud_s3:start_multipart(Bucket, erlcloud_key(Key), Options, AllHeaders, AwsConfig) of + ECKey = erlcloud_key(Key), + ECOpts = erlcloud_upload_options(UploadOpts), + Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)), + case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of {ok, Props} -> {ok, response_property('uploadId', Props)}; {error, Reason} -> @@ -204,11 +209,11 @@ format(#{aws_config := AwsConfig} = Client) -> %% Internal functions %%-------------------------------------------------------------------- -upload_options(#{acl := Acl}) when Acl =/= undefined -> +erlcloud_upload_options(#{acl := Acl}) when Acl =/= undefined -> [ {acl, Acl} ]; -upload_options(#{}) -> +erlcloud_upload_options(#{}) -> []. headers(#{headers := Headers}) -> @@ -273,10 +278,10 @@ request_fun(HttpPool, PoolType, MaxRetries) -> end) end. -ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) -> - try timer:tc(fun() -> ehttpc:request(HttpPool, Method, Request, Timeout, MaxRetries) end) of +ehttpc_request(Worker, Method, Request, Timeout, MaxRetries) -> + try timer:tc(fun() -> ehttpc:request(Worker, Method, Request, Timeout, MaxRetries) end) of {Time, {ok, StatusCode, RespHeaders}} -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => "s3_ehttpc_request_ok", status_code => StatusCode, headers => RespHeaders, @@ -286,7 +291,7 @@ ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) -> {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), undefined }}; {Time, {ok, StatusCode, RespHeaders, RespBody}} -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => "s3_ehttpc_request_ok", status_code => StatusCode, headers => RespHeaders, @@ -297,31 +302,31 @@ ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) -> {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), RespBody }}; {Time, {error, Reason}} -> - ?SLOG(error, #{ + ?SLOG(warning, #{ msg => "s3_ehttpc_request_fail", reason => Reason, timeout => Timeout, - pool => HttpPool, + worker => Worker, method => Method, time => Time }), {error, Reason} catch error:badarg -> - ?SLOG(error, #{ + ?SLOG(warning, #{ msg => "s3_ehttpc_request_fail", reason => badarg, timeout => Timeout, - pool => HttpPool, + worker => Worker, method => Method }), {error, no_ehttpc_pool}; error:Reason -> - ?SLOG(error, #{ + ?SLOG(warning, #{ msg => "s3_ehttpc_request_fail", reason => Reason, timeout => Timeout, - pool => HttpPool, + worker => Worker, method => Method }), {error, Reason} @@ -401,6 +406,8 @@ headers_ehttpc_to_erlcloud_response(EhttpcHeaders) -> headers_erlcloud_request_to_ehttpc(ErlcloudHeaders) -> [{to_binary(K), V} || {K, V} <- ErlcloudHeaders]. +join_headers(ErlcloudHeaders, undefined) -> + ErlcloudHeaders; join_headers(ErlcloudHeaders, UserSpecialHeaders) -> ErlcloudHeaders ++ headers_user_to_erlcloud_request(UserSpecialHeaders). diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index a449640a6..49c531777 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -37,13 +37,25 @@ code_change/3 ]). -%% For test purposes +%% For connectors -export([ client_config/2, + http_config/1 +]). + +%% For test purposes +-export([ start_http_pool/2, id/1 ]). +-type config_checkout() :: { + emqx_s3_client:bucket(), + emqx_s3_client:config(), + emqx_s3_client:upload_options(), + emqx_s3_uploader:config() +}. + -define(DEFAULT_CALL_TIMEOUT, 5000). -define(DEFAULT_HTTP_POOL_TIMEOUT, 60000). @@ -78,12 +90,12 @@ update_config(ProfileId, ProfileConfig, Timeout) -> ?SAFE_CALL_VIA_GPROC(ProfileId, {update_config, ProfileConfig}, Timeout). -spec checkout_config(emqx_s3:profile_id()) -> - {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. + config_checkout() | {error, profile_not_found}. checkout_config(ProfileId) -> checkout_config(ProfileId, ?DEFAULT_CALL_TIMEOUT). -spec checkout_config(emqx_s3:profile_id(), timeout()) -> - {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. + config_checkout() | {error, profile_not_found}. checkout_config(ProfileId, Timeout) -> ?SAFE_CALL_VIA_GPROC(ProfileId, {checkout_config, self()}, Timeout). @@ -108,6 +120,8 @@ init([ProfileId, ProfileConfig]) -> {ok, #{ profile_id => ProfileId, profile_config => ProfileConfig, + bucket => bucket(ProfileConfig), + upload_options => upload_options(ProfileConfig), client_config => client_config(ProfileConfig, PoolName), uploader_config => uploader_config(ProfileConfig), pool_name => PoolName, @@ -128,12 +142,14 @@ handle_call( {checkout_config, Pid}, _From, #{ + bucket := Bucket, + upload_options := Options, client_config := ClientConfig, uploader_config := UploaderConfig } = State ) -> ok = register_client(Pid, State), - {reply, {ok, ClientConfig, UploaderConfig}, State}; + {reply, {Bucket, ClientConfig, Options, UploaderConfig}, State}; handle_call({checkin_config, Pid}, _From, State) -> ok = unregister_client(Pid, State), {reply, ok, State}; @@ -146,6 +162,8 @@ handle_call( {ok, PoolName} -> NewState = State#{ profile_config => NewProfileConfig, + bucket => bucket(NewProfileConfig), + upload_options => upload_options(NewProfileConfig), client_config => client_config(NewProfileConfig, PoolName), uploader_config => uploader_config(NewProfileConfig), http_pool_timeout => http_pool_timeout(NewProfileConfig), @@ -198,8 +216,6 @@ client_config(ProfileConfig, PoolName) -> port => maps:get(port, ProfileConfig), url_expire_time => maps:get(url_expire_time, ProfileConfig), headers => maps:get(headers, HTTPOpts, #{}), - acl => maps:get(acl, ProfileConfig, undefined), - bucket => maps:get(bucket, ProfileConfig), access_key_id => maps:get(access_key_id, ProfileConfig, undefined), secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined), request_timeout => maps:get(request_timeout, HTTPOpts, undefined), @@ -214,6 +230,12 @@ uploader_config(#{max_part_size := MaxPartSize, min_part_size := MinPartSize} = max_part_size => MaxPartSize }. +bucket(ProfileConfig) -> + maps:get(bucket, ProfileConfig). + +upload_options(ProfileConfig) -> + #{acl => maps:get(acl, ProfileConfig, undefined)}. + scheme(#{ssl := #{enable := true}}) -> "https://"; scheme(_TransportOpts) -> "http://". diff --git a/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl b/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl index 67a36a793..aea8334e8 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl @@ -15,7 +15,7 @@ start_link/1, child_spec/1, id/1, - start_uploader/2 + start_uploader/3 ]). -export([init/1]). @@ -43,10 +43,10 @@ child_spec(ProfileId) -> id(ProfileId) -> {?MODULE, ProfileId}. --spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) -> +-spec start_uploader(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) -> emqx_types:startlink_ret() | {error, profile_not_found}. -start_uploader(ProfileId, Opts) -> - try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Opts]) of +start_uploader(ProfileId, Key, UploadOpts) -> + try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Key, UploadOpts]) of Result -> Result catch exit:{noproc, _} -> {error, profile_not_found} diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index 5478f6416..ed818af69 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -14,9 +14,6 @@ -export([translate/1]). -export([translate/2]). --type secret_access_key() :: string() | function(). --reflect_type([secret_access_key/0]). - roots() -> [s3]. @@ -26,6 +23,13 @@ tags() -> [<<"S3">>]. fields(s3) -> + lists:append([ + fields(s3_client), + fields(s3_uploader), + fields(s3_url_options), + props_with([bucket, acl], fields(s3_upload)) + ]); +fields(s3_client) -> [ {access_key_id, mk( @@ -36,21 +40,9 @@ fields(s3) -> } )}, {secret_access_key, - mk( - typerefl:alias("string", secret_access_key()), + emqx_schema_secret:mk( #{ - desc => ?DESC("secret_access_key"), - required => false, - sensitive => true, - converter => fun secret/2 - } - )}, - {bucket, - mk( - string(), - #{ - desc => ?DESC("bucket"), - required => true + desc => ?DESC("secret_access_key") } )}, {host, @@ -69,16 +61,51 @@ fields(s3) -> required => true } )}, - {url_expire_time, + {transport_options, mk( - %% not used in a `receive ... after' block, just timestamp comparison - emqx_schema:duration_s(), + ref(?MODULE, transport_options), #{ - default => <<"1h">>, - desc => ?DESC("url_expire_time"), + desc => ?DESC("transport_options"), required => false } + )} + ]; +fields(s3_upload) -> + [ + {bucket, + mk( + string(), + #{ + desc => ?DESC("bucket"), + required => true + } )}, + {key, + mk( + string(), + #{ + desc => ?DESC("key"), + required => true + } + )}, + {acl, + mk( + hoconsc:enum([ + private, + public_read, + public_read_write, + authenticated_read, + bucket_owner_read, + bucket_owner_full_control + ]), + #{ + desc => ?DESC("acl"), + required => false + } + )} + ]; +fields(s3_uploader) -> + [ {min_part_size, mk( emqx_schema:bytesize(), @@ -98,27 +125,17 @@ fields(s3) -> required => true, validator => fun part_size_validator/1 } - )}, - {acl, + )} + ]; +fields(s3_url_options) -> + [ + {url_expire_time, mk( - hoconsc:enum([ - private, - public_read, - public_read_write, - authenticated_read, - bucket_owner_read, - bucket_owner_full_control - ]), + %% not used in a `receive ... after' block, just timestamp comparison + emqx_schema:duration_s(), #{ - desc => ?DESC("acl"), - required => false - } - )}, - {transport_options, - mk( - ref(?MODULE, transport_options), - #{ - desc => ?DESC("transport_options"), + default => <<"1h">>, + desc => ?DESC("url_expire_time"), required => false } )} @@ -145,17 +162,13 @@ fields(transport_options) -> desc(s3) -> "S3 connection options"; +desc(s3_client) -> + "S3 connection options"; +desc(s3_upload) -> + "S3 upload options"; desc(transport_options) -> "Options for the HTTP transport layer used by the S3 client". -secret(undefined, #{}) -> - undefined; -secret(Secret, #{make_serializable := true}) -> - unicode:characters_to_binary(emqx_secret:unwrap(Secret)); -secret(Secret, #{}) -> - _ = is_binary(Secret) orelse throw({expected_type, string}), - emqx_secret:wrap(unicode:characters_to_list(Secret)). - translate(Conf) -> translate(Conf, #{}). diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl index 548eaf8c6..50736fe7c 100644 --- a/apps/emqx_s3/src/emqx_s3_uploader.erl +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -9,7 +9,7 @@ -behaviour(gen_statem). -export([ - start_link/2, + start_link/3, write/2, write/3, @@ -33,30 +33,25 @@ format_status/2 ]). --export_type([opts/0, config/0]). +-export_type([config/0]). -type config() :: #{ min_part_size => pos_integer(), max_part_size => pos_integer() }. --type opts() :: #{ - key := string(), - headers => emqx_s3_client:headers() -}. - -type data() :: #{ - profile_id := emqx_s3:profile_id(), + profile_id => emqx_s3:profile_id(), client := emqx_s3_client:client(), key := emqx_s3_client:key(), + upload_opts := emqx_s3_client:upload_options(), buffer := iodata(), buffer_size := non_neg_integer(), min_part_size := pos_integer(), max_part_size := pos_integer(), upload_id := undefined | emqx_s3_client:upload_id(), etags := [emqx_s3_client:etag()], - part_number := emqx_s3_client:part_number(), - headers := emqx_s3_client:headers() + part_number := emqx_s3_client:part_number() }. %% 5MB @@ -66,9 +61,10 @@ -define(DEFAULT_TIMEOUT, 30000). --spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret(). -start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) -> - gen_statem:start_link(?MODULE, [ProfileId, Opts], []). +-spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) -> + gen_statem:start_ret(). +start_link(ProfileId, Key, UploadOpts) when is_list(Key) -> + gen_statem:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []). -spec write(pid(), iodata()) -> ok_or_error(term()). write(Pid, WriteData) -> @@ -105,19 +101,23 @@ shutdown(Pid) -> callback_mode() -> handle_event_function. -init([ProfileId, #{key := Key} = Opts]) -> - process_flag(trap_exit, true), - {ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId), - Client = client(ClientConfig), - {ok, upload_not_started, #{ +init({profile, ProfileId, Key, UploadOpts}) -> + {Bucket, ClientConfig, BaseOpts, UploaderConfig} = + emqx_s3_profile_conf:checkout_config(ProfileId), + Upload = #{ profile_id => ProfileId, - client => Client, - headers => maps:get(headers, Opts, #{}), + client => client(Bucket, ClientConfig), key => Key, + upload_opts => maps:merge(BaseOpts, UploadOpts) + }, + init({upload, UploaderConfig, Upload}); +init({upload, Config, Upload}) -> + process_flag(trap_exit, true), + {ok, upload_not_started, Upload#{ buffer => [], buffer_size => 0, - min_part_size => maps:get(min_part_size, UploaderConfig, ?DEFAULT_MIN_PART_SIZE), - max_part_size => maps:get(max_part_size, UploaderConfig, ?DEFAULT_MAX_PART_SIZE), + min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE), + max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE), upload_id => undefined, etags => [], part_number => 1 @@ -221,8 +221,8 @@ maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = end. -spec start_upload(data()) -> {started, data()} | {error, term()}. -start_upload(#{client := Client, key := Key, headers := Headers} = Data) -> - case emqx_s3_client:start_multipart(Client, Headers, Key) of +start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) -> + case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of {ok, UploadId} -> NewData = Data#{upload_id => UploadId}, {started, NewData}; @@ -274,12 +274,9 @@ complete_upload( } = Data0 ) -> case upload_part(Data0) of - {ok, #{etags := ETags} = Data1} -> - case - emqx_s3_client:complete_multipart( - Client, Key, UploadId, lists:reverse(ETags) - ) - of + {ok, #{etags := ETagsRev} = Data1} -> + ETags = lists:reverse(ETagsRev), + case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of ok -> {ok, Data1}; {error, _} = Error -> @@ -309,11 +306,11 @@ put_object( #{ client := Client, key := Key, - buffer := Buffer, - headers := Headers + upload_opts := UploadOpts, + buffer := Buffer } ) -> - case emqx_s3_client:put_object(Client, Headers, Key, Buffer) of + case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of ok -> ok; {error, _} = Error -> @@ -337,5 +334,5 @@ unwrap(WrappedData) -> is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> BufferSize + iolist_size(WriteData) =< MaxPartSize. -client(Config) -> - emqx_s3_client:create(Config). +client(Bucket, Config) -> + emqx_s3_client:create(Bucket, Config). diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl index 4db6245eb..619a09e76 100644 --- a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl @@ -79,7 +79,7 @@ t_multipart_upload(Config) -> Client = client(Config), - {ok, UploadId} = emqx_s3_client:start_multipart(Client, Key), + {ok, UploadId} = emqx_s3_client:start_multipart(Client, Key, #{}), Data = data(6_000_000), @@ -97,7 +97,7 @@ t_simple_put(Config) -> Data = data(6_000_000), - ok = emqx_s3_client:put_object(Client, Key, Data). + ok = emqx_s3_client:put_object(Client, Key, #{acl => private}, Data). t_list(Config) -> Key = ?config(key, Config), @@ -123,7 +123,7 @@ t_url(Config) -> Key = ?config(key, Config), Client = client(Config), - ok = emqx_s3_client:put_object(Client, Key, <<"data">>), + ok = emqx_s3_client:put_object(Client, Key, #{acl => public_read}, <<"data">>), Url = emqx_s3_client:uri(Client, Key), @@ -135,20 +135,18 @@ t_url(Config) -> t_no_acl(Config) -> Key = ?config(key, Config), - ClientConfig = emqx_s3_profile_conf:client_config( - profile_config(Config), ?config(ehttpc_pool_name, Config) - ), - Client = emqx_s3_client:create(maps:without([acl], ClientConfig)), + Client = client(Config), - ok = emqx_s3_client:put_object(Client, Key, <<"data">>). + ok = emqx_s3_client:put_object(Client, Key, #{}, <<"data">>). t_extra_headers(Config0) -> Config = [{extra_headers, #{'Content-Type' => <<"application/json">>}} | Config0], Key = ?config(key, Config), Client = client(Config), + Opts = #{acl => public_read}, Data = #{foo => bar}, - ok = emqx_s3_client:put_object(Client, Key, emqx_utils_json:encode(Data)), + ok = emqx_s3_client:put_object(Client, Key, Opts, emqx_utils_json:encode(Data)), Url = emqx_s3_client:uri(Client, Key), @@ -164,10 +162,11 @@ t_extra_headers(Config0) -> %%-------------------------------------------------------------------- client(Config) -> + Bucket = ?config(bucket, Config), ClientConfig = emqx_s3_profile_conf:client_config( profile_config(Config), ?config(ehttpc_pool_name, Config) ), - emqx_s3_client:create(ClientConfig). + emqx_s3_client:create(Bucket, ClientConfig). profile_config(Config) -> ProfileConfig0 = emqx_s3_test_helpers:base_config(?config(conn_type, Config)), @@ -175,7 +174,6 @@ profile_config(Config) -> fun inject_config/3, ProfileConfig0, #{ - bucket => ?config(bucket, Config), [transport_options, pool_type] => ?config(pool_type, Config), [transport_options, headers] => ?config(extra_headers, Config) } diff --git a/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl b/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl index 433cfe07b..699162bfd 100644 --- a/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl @@ -46,7 +46,7 @@ end_per_testcase(_TestCase, _Config) -> t_regular_outdated_pool_cleanup(Config) -> _ = process_flag(trap_exit, true), Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), [OldPool] = emqx_s3_profile_http_pools:all(profile_id()), @@ -94,7 +94,7 @@ t_timeout_pool_cleanup(Config) -> %% Start uploader Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), ok = emqx_s3_uploader:write(Pid, <<"data">>), [OldPool] = emqx_s3_profile_http_pools:all(profile_id()), diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl index ad887d1a6..323dd05c2 100644 --- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -132,7 +132,7 @@ t_sensitive_config_no_leak(_Config) -> Error = #{ kind := validation_error, path := "s3.secret_access_key", - reason := {expected_type, string} + reason := invalid_type } ]} when map_size(Error) == 3, emqx_s3_schema:translate( diff --git a/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl b/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl index 90a32d948..292e065cf 100644 --- a/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl @@ -133,7 +133,7 @@ end_per_testcase(_TestCase, _Config) -> t_happy_path_simple_put(Config) -> Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -165,7 +165,7 @@ t_happy_path_simple_put(Config) -> t_happy_path_multi(Config) -> Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -233,7 +233,7 @@ t_signed_nonascii_url_download(_Config) -> t_abort_multi(Config) -> Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -260,7 +260,7 @@ t_abort_multi(Config) -> t_abort_simple_put(_Config) -> Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -278,7 +278,7 @@ t_abort_simple_put(_Config) -> t_config_switch(Config) -> Key = emqx_s3_test_helpers:unique_key(), OldBucket = ?config(bucket, Config), - {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}), [Data0, Data1] = data($a, 6 * 1024 * 1024, 2), @@ -304,7 +304,7 @@ t_config_switch(Config) -> ), %% Now check that new uploader uses new config - {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}), ok = emqx_s3_uploader:write(Pid1, Data0), ok = emqx_s3_uploader:complete(Pid1), @@ -318,7 +318,7 @@ t_config_switch(Config) -> t_config_switch_http_settings(Config) -> Key = emqx_s3_test_helpers:unique_key(), OldBucket = ?config(bucket, Config), - {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}), [Data0, Data1] = data($a, 6 * 1024 * 1024, 2), @@ -345,7 +345,7 @@ t_config_switch_http_settings(Config) -> ), %% Now check that new uploader uses new config - {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}), ok = emqx_s3_uploader:write(Pid1, Data0), ok = emqx_s3_uploader:complete(Pid1), @@ -360,7 +360,7 @@ t_start_multipart_error(Config) -> _ = process_flag(trap_exit, true), Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -386,7 +386,7 @@ t_upload_part_error(Config) -> _ = process_flag(trap_exit, true), Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -414,7 +414,7 @@ t_abort_multipart_error(Config) -> _ = process_flag(trap_exit, true), Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -442,7 +442,7 @@ t_complete_multipart_error(Config) -> _ = process_flag(trap_exit, true), Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -470,7 +470,7 @@ t_put_object_error(Config) -> _ = process_flag(trap_exit, true), Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -496,7 +496,7 @@ t_put_object_error(Config) -> t_too_large(Config) -> Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -533,7 +533,7 @@ t_tls_error(Config) -> ), ok = emqx_s3:update_profile(profile_id(), ProfileConfig), Key = emqx_s3_test_helpers:unique_key(), - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), @@ -553,7 +553,7 @@ t_no_profile(_Config) -> Key = emqx_s3_test_helpers:unique_key(), ?assertMatch( {error, profile_not_found}, - emqx_s3:start_uploader(<<"no-profile">>, #{key => Key}) + emqx_s3:start_uploader(<<"no-profile">>, Key, #{}) ). %%-------------------------------------------------------------------- @@ -572,7 +572,7 @@ list_objects(Config) -> proplists:get_value(contents, Props). upload(Key, ChunkSize, ChunkCount) -> - {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}), _ = erlang:monitor(process, Pid), diff --git a/mix.exs b/mix.exs index 5bb099828..c2ef491e9 100644 --- a/mix.exs +++ b/mix.exs @@ -182,6 +182,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_ft, :emqx_license, :emqx_s3, + :emqx_bridge_s3, :emqx_schema_registry, :emqx_enterprise, :emqx_bridge_kinesis, diff --git a/rebar.config.erl b/rebar.config.erl index afcd305c3..4600a4a83 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -103,6 +103,7 @@ is_community_umbrella_app("apps/emqx_oracle") -> false; is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false; is_community_umbrella_app("apps/emqx_ft") -> false; is_community_umbrella_app("apps/emqx_s3") -> false; +is_community_umbrella_app("apps/emqx_bridge_s3") -> false; is_community_umbrella_app("apps/emqx_schema_registry") -> false; is_community_umbrella_app("apps/emqx_enterprise") -> false; is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false; diff --git a/rel/i18n/emqx_bridge_s3.hocon b/rel/i18n/emqx_bridge_s3.hocon new file mode 100644 index 000000000..fe10313e0 --- /dev/null +++ b/rel/i18n/emqx_bridge_s3.hocon @@ -0,0 +1,23 @@ +emqx_bridge_s3 { + +config_connector.label: +"""S3 Connector Configuration""" +config_connector.desc: +"""Configuration for a connector to S3 API compatible storage service.""" + +s3_upload.label: +"""S3 Simple Upload""" +s3_upload.desc: +"""Action to upload a single object to the S3 service.""" + +s3_upload_parameters.label: +"""S3 Upload action parameters""" +s3_upload_parameters.desc: +"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content.""" + +s3_object_content.label: +"""S3 Object Content""" +s3_object_content.desc: +"""Content of the S3 object being uploaded. Supports templates.""" + +} diff --git a/rel/i18n/emqx_s3_schema.hocon b/rel/i18n/emqx_s3_schema.hocon index df4b973fa..44f4bbc56 100644 --- a/rel/i18n/emqx_s3_schema.hocon +++ b/rel/i18n/emqx_s3_schema.hocon @@ -9,6 +9,15 @@ secret_access_key.desc: bucket.desc: """The name of the S3 bucket.""" +bucket.label: +"""Bucket""" + +key.desc: +"""Key of the S3 object.""" + +key.label: +"""Object Key""" + host.desc: """The host of the S3 endpoint."""