Merge pull request #12495 from keynslug/ft/EMQX-11830/s3-bridge-simple

feat(s3): introduce S3 connector and action
This commit is contained in:
Andrew Mayorov 2024-02-12 16:05:22 +01:00 committed by GitHub
commit a9fdf9f1f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1060 additions and 197 deletions

View File

@ -106,7 +106,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_opents_action_info,
emqx_bridge_rabbitmq_action_info,
emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info
emqx_bridge_tdengine_action_info,
emqx_bridge_s3_action_info
].
-else.
hard_coded_action_info_modules_ee() ->

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2028-01-26
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,16 @@
# EMQX S3 Bridge
This application provides connector and action implementations for the EMQX to integrate with Amazon S3 compatible storage services as part of the EMQX data integration pipelines.
Users can leverage [EMQX Rule Engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) to create rules that publish message data to S3 storage service.
## Documentation
Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) for the EMQX rules engine introduction.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -0,0 +1,2 @@
minio
toxiproxy

View File

@ -0,0 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -0,0 +1,17 @@
{application, emqx_bridge_s3, [
{description, "EMQX Enterprise S3 Bridge"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
erlcloud,
emqx_resource,
emqx_s3
]},
{env, [
{emqx_action_info_modules, [emqx_bridge_s3_action_info]}
]},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,217 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_s3.hrl").
-behaviour(hocon_schema).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-export([
bridge_v2_examples/1,
connector_examples/1
]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"bridge_s3".
roots() ->
[].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config));
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
fields(action) ->
{?ACTION,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
#{
desc => <<"S3 Action Config">>,
required => false
}
)};
fields("config_connector") ->
lists:append([
emqx_connector_schema:common_fields(),
fields(s3_connector_config),
emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts)
]);
fields(?ACTION) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
?R_REF(s3_upload_parameters),
#{
required => true,
desc => ?DESC(s3_upload)
}
),
#{
resource_opts_ref => ?R_REF(s3_action_resource_opts)
}
);
fields(s3_connector_config) ->
emqx_s3_schema:fields(s3_client);
fields(s3_upload_parameters) ->
emqx_s3_schema:fields(s3_upload) ++
[
{content,
hoconsc:mk(
string(),
#{
required => false,
default => <<"${.}">>,
desc => ?DESC(s3_object_content)
}
)}
];
fields(s3_action_resource_opts) ->
UnsupportedOpts = [batch_size, batch_time],
lists:filter(
fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
emqx_bridge_v2_schema:action_resource_opts_fields()
);
fields(s3_connector_resource_opts) ->
CommonOpts = emqx_connector_schema:common_resource_opts_subfields(),
lists:filter(
fun({N, _}) -> lists:member(N, CommonOpts) end,
emqx_connector_schema:resource_opts_fields()
).
desc("config_connector") ->
?DESC(config_connector);
desc(?ACTION) ->
?DESC(s3_upload);
desc(s3_upload) ->
?DESC(s3_upload);
desc(s3_upload_parameters) ->
?DESC(s3_upload_parameters);
desc(s3_action_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(s3_connector_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"s3">> => #{
summary => <<"S3 Simple Upload">>,
value => action_example(Method)
}
}
].
action_example(post) ->
maps:merge(
action_example(put),
#{
type => atom_to_binary(?ACTION),
name => <<"my_s3_action">>
}
);
action_example(get) ->
maps:merge(
action_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
action_example(put) ->
#{
enable => true,
connector => <<"my_s3_connector">>,
description => <<"My action">>,
parameters => #{
bucket => <<"${clientid}">>,
key => <<"${topic}">>,
content => <<"${payload}">>,
acl => <<"public_read">>
},
resource_opts => #{
query_mode => <<"sync">>,
inflight_window => 10
}
}.
connector_examples(Method) ->
[
#{
<<"s3_aws">> => #{
summary => <<"S3 Connector">>,
value => connector_example(Method)
}
}
].
connector_example(get) ->
maps:merge(
connector_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
connector_example(post) ->
maps:merge(
connector_example(put),
#{
type => atom_to_binary(?CONNECTOR),
name => <<"my_s3_connector">>
}
);
connector_example(put) ->
#{
enable => true,
description => <<"My S3 connector">>,
host => <<"s3.eu-east-1.amazonaws.com">>,
port => 443,
access_key_id => <<"ACCESS">>,
secret_access_key => <<"SECRET">>,
transport_options => #{
ssl => #{
enable => true,
verify => <<"verify_peer">>
},
connect_timeout => <<"1s">>,
request_timeout => <<"60s">>,
pool_size => 4,
max_retries => 1,
enable_pipelining => 1
}
}.

View File

@ -0,0 +1,11 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(__EMQX_BRIDGE_S3_HRL__).
-define(__EMQX_BRIDGE_S3_HRL__, true).
-define(ACTION, s3).
-define(CONNECTOR, s3).
-endif.

View File

@ -0,0 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_action_info).
-behaviour(emqx_action_info).
-export([
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
action_type_name() -> s3.
connector_type_name() -> s3.
schema_module() -> emqx_bridge_s3.

View File

@ -0,0 +1,222 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_connector).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-behaviour(emqx_resource).
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_query/3,
% on_batch_query/3,
on_get_status/2,
on_get_channel_status/3
]).
-type config() :: #{
access_key_id => string(),
secret_access_key => emqx_secret:t(string()),
host := string(),
port := pos_integer(),
transport_options => emqx_s3:transport_options()
}.
-type channel_config() :: #{
parameters := #{
bucket := string(),
key := string(),
content := string(),
acl => emqx_s3:acl()
}
}.
-type channel_state() :: #{
bucket := emqx_template:str(),
key := emqx_template:str(),
upload_options => emqx_s3_client:upload_options()
}.
-type state() :: #{
pool_name := resource_id(),
pool_pid => pid(),
client_config := emqx_s3_client:config(),
channels := #{channel_id() => channel_state()}
}.
%%
-spec callback_mode() -> callback_mode().
callback_mode() ->
always_sync.
%% Management
-spec on_start(_InstanceId :: resource_id(), config()) ->
{ok, state()} | {error, _Reason}.
on_start(InstId, Config) ->
PoolName = InstId,
S3Config = Config#{url_expire_time => 0},
State = #{
pool_name => PoolName,
client_config => emqx_s3_profile_conf:client_config(S3Config, PoolName),
channels => #{}
},
HttpConfig = emqx_s3_profile_conf:http_config(Config),
case ehttpc_sup:start_pool(PoolName, HttpConfig) of
{ok, Pid} ->
?SLOG(info, #{msg => "s3_connector_start_http_pool_success", pool_name => PoolName}),
{ok, State#{pool_pid => Pid}};
{error, Reason} = Error ->
?SLOG(error, #{
msg => "s3_connector_start_http_pool_fail",
pool_name => PoolName,
http_config => HttpConfig,
reason => Reason
}),
Error
end.
-spec on_stop(_InstanceId :: resource_id(), state()) ->
ok.
on_stop(InstId, _State = #{pool_name := PoolName}) ->
case ehttpc_sup:stop_pool(PoolName) of
ok ->
?tp(s3_bridge_stopped, #{instance_id => InstId}),
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "s3_connector_http_pool_stop_fail",
pool_name => PoolName,
reason => Reason
}),
ok
end.
-spec on_get_status(_InstanceId :: resource_id(), state()) ->
health_check_status().
on_get_status(_InstId, State = #{client_config := Config}) ->
try erlcloud_s3:list_buckets(emqx_s3_client:aws_config(Config)) of
Props when is_list(Props) ->
?status_connected
catch
error:{aws_error, {http_error, _Code, _, Reason}} ->
{?status_disconnected, State, Reason};
error:{aws_error, {socket_error, Reason}} ->
{?status_disconnected, State, Reason}
end.
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
{ok, state()} | {error, _Reason}.
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
ChannelState = init_channel_state(Config),
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
{ok, state()}.
on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) ->
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
-spec on_get_channels(_InstanceId :: resource_id()) ->
[_ChannelConfig].
on_get_channels(InstId) ->
emqx_bridge_v2:get_channels_for_connector(InstId).
-spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) ->
channel_status().
on_get_channel_status(_InstId, ChannelId, #{channels := Channels}) ->
case maps:get(ChannelId, Channels, undefined) of
_ChannelState = #{} ->
%% TODO
%% Since bucket name may be templated, we can't really provide any
%% additional information regarding the channel health.
?status_connected;
undefined ->
?status_disconnected
end.
init_channel_state(#{parameters := Parameters}) ->
#{
bucket => emqx_template:parse(maps:get(bucket, Parameters)),
key => emqx_template:parse(maps:get(key, Parameters)),
content => emqx_template:parse(maps:get(content, Parameters)),
upload_options => #{
acl => maps:get(acl, Parameters, undefined)
}
}.
%% Queries
-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
-spec on_query(_InstanceId :: resource_id(), query(), state()) ->
{ok, _Result} | {error, _Reason}.
on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
case maps:get(Tag, Channels, undefined) of
ChannelState = #{} ->
run_simple_upload(InstId, Data, ChannelState, Config);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
run_simple_upload(
InstId,
Data,
#{
bucket := BucketTemplate,
key := KeyTemplate,
content := ContentTemplate,
upload_options := UploadOpts
},
Config
) ->
Bucket = render_bucket(BucketTemplate, Data),
Client = emqx_s3_client:create(Bucket, Config),
Key = render_key(KeyTemplate, Data),
Content = render_content(ContentTemplate, Data),
case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
ok ->
?tp(s3_bridge_connector_upload_ok, #{
instance_id => InstId,
bucket => Bucket,
key => Key
}),
ok;
{error, Reason} ->
{error, map_error(Reason)}
end.
map_error({socket_error, _} = Reason) ->
{recoverable_error, Reason};
map_error(Reason) ->
%% TODO: Recoverable errors.
{unrecoverable_error, Reason}.
render_bucket(Template, Data) ->
case emqx_template:render(Template, {emqx_jsonish, Data}) of
{Result, []} ->
iolist_to_string(Result);
{_, Errors} ->
erlang:error({unrecoverable_error, {bucket_undefined, Errors}})
end.
render_key(Template, Data) ->
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
iolist_to_string(Result).
render_content(Template, Data) ->
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
Result.
iolist_to_string(IOList) ->
unicode:characters_to_list(IOList).

View File

@ -0,0 +1,171 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_utils_conv, [bin/1]).
%% See `emqx_bridge_s3.hrl`.
-define(BRIDGE_TYPE, <<"s3">>).
-define(CONNECTOR_TYPE, <<"s3">>).
-define(PROXY_NAME, "minio_tcp").
-define(CONTENT_TYPE, "application/x-emqx-payload").
%% CT Setup
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
% Setup toxiproxy
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
_ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_s3,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[
{apps, Apps},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{proxy_name, ?PROXY_NAME}
| Config
].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
%% Testcases
init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(30)),
ok = snabbkaffe:start_trace(),
Name = iolist_to_binary(io_lib:format("~s~p", [TestCase, erlang:unique_integer()])),
ConnectorConfig = connector_config(Name, Config),
ActionConfig = action_config(Name, Name),
[
{connector_type, ?CONNECTOR_TYPE},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name},
{bridge_config, ActionConfig}
| Config
].
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
ok.
connector_config(Name, _Config) ->
BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
<<"enable">> => true,
<<"description">> => <<"S3 Connector">>,
<<"host">> => maps:get(<<"host">>, BaseConf),
<<"port">> => maps:get(<<"port">>, BaseConf),
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
<<"transport_options">> => #{
<<"headers">> => #{
<<"content-type">> => <<?CONTENT_TYPE>>
},
<<"connect_timeout">> => 1000,
<<"request_timeout">> => 1000,
<<"pool_size">> => 4,
<<"max_retries">> => 0,
<<"enable_pipelining">> => 1
}
}).
action_config(Name, ConnectorId) ->
parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> => #{
<<"bucket">> => <<"${clientid}">>,
<<"key">> => <<"${topic}">>,
<<"content">> => <<"${payload}">>,
<<"acl">> => <<"public_read">>
},
<<"resource_opts">> => #{
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"5s">>,
<<"inflight_window">> => 40,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"60s">>,
<<"resume_interval">> => <<"5s">>,
<<"worker_pool_size">> => <<"4">>
}
}).
parse_and_check_config(Root, Type, Name, ConfigIn) ->
Schema =
case Root of
<<"connectors">> -> emqx_connector_schema;
<<"actions">> -> emqx_bridge_v2_schema
end,
#{Root := #{Type := #{Name := Config}}} =
hocon_tconf:check_plain(
Schema,
#{Root => #{Type => #{Name => ConfigIn}}},
#{required => false, atom_key => false}
),
ct:pal("parsed config: ~p", [Config]),
ConfigIn.
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config).
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
t_sync_query(Config) ->
Bucket = emqx_s3_test_helpers:unique_bucket(),
Topic = "a/b/c",
Payload = rand:bytes(1024),
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun() -> mk_message(Bucket, Topic, Payload) end,
fun(Res) -> ?assertMatch(ok, Res) end,
s3_bridge_connector_upload_ok
),
?assertMatch(
#{
content := Payload,
content_type := ?CONTENT_TYPE
},
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
).
mk_message(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
Event.

View File

@ -66,6 +66,8 @@ resource_type(tdengine) ->
emqx_bridge_tdengine_connector;
resource_type(rabbitmq) ->
emqx_bridge_rabbitmq_connector;
resource_type(s3) ->
emqx_bridge_s3_connector;
resource_type(Type) ->
error({unknown_connector_type, Type}).
@ -270,6 +272,14 @@ connector_structs() ->
desc => <<"RabbitMQ Connector Config">>,
required => false
}
)},
{s3,
mk(
hoconsc:map(name, ref(emqx_bridge_s3, "config_connector")),
#{
desc => <<"S3 Connector Config">>,
required => false
}
)}
].
@ -296,7 +306,8 @@ schema_modules() ->
emqx_bridge_rabbitmq_connector_schema,
emqx_bridge_opents_connector,
emqx_bridge_greptimedb,
emqx_bridge_tdengine_connector
emqx_bridge_tdengine_connector,
emqx_bridge_s3
].
api_schemas(Method) ->
@ -332,7 +343,8 @@ api_schemas(Method) ->
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method),
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method)
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method),
api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->

View File

@ -167,7 +167,9 @@ connector_type_to_bridge_types(greptimedb) ->
connector_type_to_bridge_types(tdengine) ->
[tdengine];
connector_type_to_bridge_types(rabbitmq) ->
[rabbitmq].
[rabbitmq];
connector_type_to_bridge_types(s3) ->
[s3].
actions_config_name(action) -> <<"actions">>;
actions_config_name(source) -> <<"sources">>.

View File

@ -364,8 +364,8 @@ parse_spec_ref(Module, Path, Options) ->
-ifdef(TEST).
-spec failed_to_generate_swagger_spec(_, _, _, _, _) -> no_return().
failed_to_generate_swagger_spec(Module, Path, _Error, _Reason, _Stacktrace) ->
error({failed_to_generate_swagger_spec, Module, Path}).
failed_to_generate_swagger_spec(Module, Path, Error, Reason, Stacktrace) ->
error({failed_to_generate_swagger_spec, Module, Path, Error, Reason, Stacktrace}).
-else.
-spec failed_to_generate_swagger_spec(_, _, _, _, _) -> no_return().
failed_to_generate_swagger_spec(Module, Path, Error, Reason, Stacktrace) ->

View File

@ -310,7 +310,7 @@ t_nest_ref(_Config) ->
t_none_ref(_Config) ->
Path = "/ref/none",
?assertError(
{failed_to_generate_swagger_spec, ?MODULE, Path},
{failed_to_generate_swagger_spec, ?MODULE, Path, error, _FunctionClause, _Stacktrace},
emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{})
),
ok.

View File

@ -282,7 +282,7 @@ t_bad_ref(_Config) ->
t_none_ref(_Config) ->
Path = "/ref/none",
?assertError(
{failed_to_generate_swagger_spec, ?MODULE, Path},
{failed_to_generate_swagger_spec, ?MODULE, Path, error, _FunctionClause, _Stacktrace},
validate(Path, #{}, [])
),
ok.

View File

@ -69,11 +69,9 @@
-spec start_export(options(), transfer(), filemeta()) ->
{ok, export_st()} | {error, term()}.
start_export(_Options, Transfer, Filemeta) ->
Options = #{
key => s3_key(Transfer, Filemeta),
headers => s3_headers(Transfer, Filemeta)
},
case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of
Key = s3_key(Transfer, Filemeta),
UploadOpts = #{headers => s3_headers(Transfer, Filemeta)},
case emqx_s3:start_uploader(?S3_PROFILE_ID, Key, UploadOpts) of
{ok, Pid} ->
true = erlang:link(Pid),
{ok, #{filemeta => Filemeta, pid => Pid}};
@ -180,22 +178,24 @@ list_pages(Client, Marker, Limit, Acc) ->
ListOptions = [{marker, Marker} || Marker =/= undefined],
case list_key_info(Client, [{max_keys, MaxKeys} | ListOptions]) of
{ok, {Exports, NextMarker}} ->
list_accumulate(Client, Limit, NextMarker, [Exports | Acc]);
Left = update_limit(Limit, Exports),
NextAcc = [Exports | Acc],
case NextMarker of
undefined ->
{ok, {flatten_pages(NextAcc), undefined}};
_ when Left =< 0 ->
{ok, {flatten_pages(NextAcc), NextMarker}};
_ ->
list_pages(Client, NextMarker, Left, NextAcc)
end;
{error, _Reason} = Error ->
Error
end.
list_accumulate(_Client, _Limit, undefined, Acc) ->
{ok, {flatten_pages(Acc), undefined}};
list_accumulate(Client, undefined, Marker, Acc) ->
list_pages(Client, Marker, undefined, Acc);
list_accumulate(Client, Limit, Marker, Acc = [Exports | _]) ->
case Limit - length(Exports) of
0 ->
{ok, {flatten_pages(Acc), Marker}};
Left ->
list_pages(Client, Marker, Left, Acc)
end.
update_limit(undefined, _Exports) ->
undefined;
update_limit(Limit, Exports) ->
Limit - length(Exports).
flatten_pages(Pages) ->
lists:append(lists:reverse(Pages)).

View File

@ -114,6 +114,7 @@
emqx_bridge_oracle,
emqx_bridge_rabbitmq,
emqx_bridge_azure_event_hub,
emqx_bridge_s3,
emqx_schema_registry,
emqx_eviction_agent,
emqx_node_rebalance,

View File

@ -18,7 +18,7 @@ The steps to integrate this application are:
`ProfileName` is a unique name used to distinguish different sets of S3 settings. Each profile has its own connection pool and configuration.
To use S3 from a _client_ application:
* Create an uploader process with `{ok, Pid} = emqx_s3:start_uploader(ProfileName, #{key => MyKey})`.
* Create an uploader process with `{ok, Pid} = emqx_s3:start_uploader(ProfileName, MyKey, _Opts = #{})`.
* Write data with `emqx_s3_uploader:write(Pid, <<"data">>)`.
* Finish the uploader with `emqx_s3_uploader:complete(Pid)` or `emqx_s3_uploader:abort(Pid)`.

View File

@ -10,7 +10,7 @@
start_profile/2,
stop_profile/1,
update_profile/2,
start_uploader/2,
start_uploader/3,
with_client/2
]).
@ -22,6 +22,7 @@
-export_type([
profile_id/0,
profile_config/0,
transport_options/0,
acl/0
]).
@ -81,18 +82,18 @@ stop_profile(ProfileId) when ?IS_PROFILE_ID(ProfileId) ->
update_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig).
-spec start_uploader(profile_id(), emqx_s3_uploader:opts()) ->
-spec start_uploader(profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
emqx_types:startlink_ret() | {error, profile_not_found}.
start_uploader(ProfileId, Opts) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts).
start_uploader(ProfileId, Key, Props) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Key, Props).
-spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) ->
{error, profile_not_found} | Result.
with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(ProfileId) ->
case emqx_s3_profile_conf:checkout_config(ProfileId) of
{ok, ClientConfig, _UploadConfig} ->
{Bucket, ClientConfig, _UploadOpts, _UploadConfig} ->
try
Fun(emqx_s3_client:create(ClientConfig))
Fun(emqx_s3_client:create(Bucket, ClientConfig))
after
emqx_s3_profile_conf:checkin_config(ProfileId)
end;

View File

@ -9,12 +9,11 @@
-include_lib("erlcloud/include/erlcloud_aws.hrl").
-export([
create/1,
create/2,
put_object/3,
put_object/4,
start_multipart/2,
start_multipart/3,
upload_part/5,
complete_multipart/4,
@ -26,10 +25,15 @@
format_request/1
]).
%% For connectors
-export([aws_config/1]).
-export_type([
client/0,
headers/0,
bucket/0,
key/0,
upload_options/0,
upload_id/0,
etag/0,
part_number/0,
@ -39,18 +43,17 @@
-type headers() :: #{binary() | string() => iodata()}.
-type erlcloud_headers() :: list({string(), iodata()}).
-type bucket() :: string().
-type key() :: string().
-type part_number() :: non_neg_integer().
-type upload_id() :: string().
-type etag() :: string().
-type http_pool() :: ehttpc:pool_name().
-type pool_type() :: random | hash.
-type upload_options() :: list({acl, emqx_s3:acl()}).
-opaque client() :: #{
aws_config := aws_config(),
upload_options := upload_options(),
bucket := string(),
bucket := bucket(),
headers := erlcloud_headers(),
url_expire_time := non_neg_integer(),
pool_type := pool_type()
@ -60,9 +63,7 @@
scheme := string(),
host := string(),
port := part_number(),
bucket := string(),
headers := headers(),
acl := emqx_s3:acl() | undefined,
url_expire_time := pos_integer(),
access_key_id := string() | undefined,
secret_access_key := emqx_secret:t(string()) | undefined,
@ -72,6 +73,11 @@
max_retries := non_neg_integer() | undefined
}.
-type upload_options() :: #{
acl => emqx_s3:acl() | undefined,
headers => headers()
}.
-type s3_options() :: proplists:proplist().
-define(DEFAULT_REQUEST_TIMEOUT, 30000).
@ -81,12 +87,11 @@
%% API
%%--------------------------------------------------------------------
-spec create(config()) -> client().
create(Config) ->
-spec create(bucket(), config()) -> client().
create(Bucket, Config) ->
#{
aws_config => aws_config(Config),
upload_options => upload_options(Config),
bucket => maps:get(bucket, Config),
bucket => Bucket,
url_expire_time => maps:get(url_expire_time, Config),
headers => headers(Config),
pool_type => maps:get(pool_type, Config)
@ -94,17 +99,19 @@ create(Config) ->
-spec put_object(client(), key(), iodata()) -> ok_or_error(term()).
put_object(Client, Key, Value) ->
put_object(Client, #{}, Key, Value).
put_object(Client, Key, #{}, Value).
-spec put_object(client(), headers(), key(), iodata()) -> ok_or_error(term()).
-spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()).
put_object(
#{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
SpecialHeaders,
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
Key,
Value
UploadOpts,
Content
) ->
AllHeaders = join_headers(Headers, SpecialHeaders),
try erlcloud_s3:put_object(Bucket, erlcloud_key(Key), Value, Options, AllHeaders, AwsConfig) of
ECKey = erlcloud_key(Key),
ECOpts = erlcloud_upload_options(UploadOpts),
Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)),
try erlcloud_s3:put_object(Bucket, ECKey, Content, ECOpts, Headers, AwsConfig) of
Props when is_list(Props) ->
ok
catch
@ -113,18 +120,16 @@ put_object(
{error, Reason}
end.
-spec start_multipart(client(), key()) -> ok_or_error(upload_id(), term()).
start_multipart(Client, Key) ->
start_multipart(Client, #{}, Key).
-spec start_multipart(client(), headers(), key()) -> ok_or_error(upload_id(), term()).
-spec start_multipart(client(), key(), upload_options()) -> ok_or_error(upload_id(), term()).
start_multipart(
#{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig},
SpecialHeaders,
Key
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
Key,
UploadOpts
) ->
AllHeaders = join_headers(Headers, SpecialHeaders),
case erlcloud_s3:start_multipart(Bucket, erlcloud_key(Key), Options, AllHeaders, AwsConfig) of
ECKey = erlcloud_key(Key),
ECOpts = erlcloud_upload_options(UploadOpts),
Headers = join_headers(BaseHeaders, maps:get(headers, UploadOpts, undefined)),
case erlcloud_s3:start_multipart(Bucket, ECKey, ECOpts, Headers, AwsConfig) of
{ok, Props} ->
{ok, response_property('uploadId', Props)};
{error, Reason} ->
@ -204,11 +209,11 @@ format(#{aws_config := AwsConfig} = Client) ->
%% Internal functions
%%--------------------------------------------------------------------
upload_options(#{acl := Acl}) when Acl =/= undefined ->
erlcloud_upload_options(#{acl := Acl}) when Acl =/= undefined ->
[
{acl, Acl}
];
upload_options(#{}) ->
erlcloud_upload_options(#{}) ->
[].
headers(#{headers := Headers}) ->
@ -273,10 +278,10 @@ request_fun(HttpPool, PoolType, MaxRetries) ->
end)
end.
ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) ->
try timer:tc(fun() -> ehttpc:request(HttpPool, Method, Request, Timeout, MaxRetries) end) of
ehttpc_request(Worker, Method, Request, Timeout, MaxRetries) ->
try timer:tc(fun() -> ehttpc:request(Worker, Method, Request, Timeout, MaxRetries) end) of
{Time, {ok, StatusCode, RespHeaders}} ->
?SLOG(info, #{
?SLOG(debug, #{
msg => "s3_ehttpc_request_ok",
status_code => StatusCode,
headers => RespHeaders,
@ -286,7 +291,7 @@ ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) ->
{StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), undefined
}};
{Time, {ok, StatusCode, RespHeaders, RespBody}} ->
?SLOG(info, #{
?SLOG(debug, #{
msg => "s3_ehttpc_request_ok",
status_code => StatusCode,
headers => RespHeaders,
@ -297,31 +302,31 @@ ehttpc_request(HttpPool, Method, Request, Timeout, MaxRetries) ->
{StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), RespBody
}};
{Time, {error, Reason}} ->
?SLOG(error, #{
?SLOG(warning, #{
msg => "s3_ehttpc_request_fail",
reason => Reason,
timeout => Timeout,
pool => HttpPool,
worker => Worker,
method => Method,
time => Time
}),
{error, Reason}
catch
error:badarg ->
?SLOG(error, #{
?SLOG(warning, #{
msg => "s3_ehttpc_request_fail",
reason => badarg,
timeout => Timeout,
pool => HttpPool,
worker => Worker,
method => Method
}),
{error, no_ehttpc_pool};
error:Reason ->
?SLOG(error, #{
?SLOG(warning, #{
msg => "s3_ehttpc_request_fail",
reason => Reason,
timeout => Timeout,
pool => HttpPool,
worker => Worker,
method => Method
}),
{error, Reason}
@ -401,6 +406,8 @@ headers_ehttpc_to_erlcloud_response(EhttpcHeaders) ->
headers_erlcloud_request_to_ehttpc(ErlcloudHeaders) ->
[{to_binary(K), V} || {K, V} <- ErlcloudHeaders].
join_headers(ErlcloudHeaders, undefined) ->
ErlcloudHeaders;
join_headers(ErlcloudHeaders, UserSpecialHeaders) ->
ErlcloudHeaders ++ headers_user_to_erlcloud_request(UserSpecialHeaders).

View File

@ -37,13 +37,25 @@
code_change/3
]).
%% For test purposes
%% For connectors
-export([
client_config/2,
http_config/1
]).
%% For test purposes
-export([
start_http_pool/2,
id/1
]).
-type config_checkout() :: {
emqx_s3_client:bucket(),
emqx_s3_client:config(),
emqx_s3_client:upload_options(),
emqx_s3_uploader:config()
}.
-define(DEFAULT_CALL_TIMEOUT, 5000).
-define(DEFAULT_HTTP_POOL_TIMEOUT, 60000).
@ -78,12 +90,12 @@ update_config(ProfileId, ProfileConfig, Timeout) ->
?SAFE_CALL_VIA_GPROC(ProfileId, {update_config, ProfileConfig}, Timeout).
-spec checkout_config(emqx_s3:profile_id()) ->
{ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
config_checkout() | {error, profile_not_found}.
checkout_config(ProfileId) ->
checkout_config(ProfileId, ?DEFAULT_CALL_TIMEOUT).
-spec checkout_config(emqx_s3:profile_id(), timeout()) ->
{ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
config_checkout() | {error, profile_not_found}.
checkout_config(ProfileId, Timeout) ->
?SAFE_CALL_VIA_GPROC(ProfileId, {checkout_config, self()}, Timeout).
@ -108,6 +120,8 @@ init([ProfileId, ProfileConfig]) ->
{ok, #{
profile_id => ProfileId,
profile_config => ProfileConfig,
bucket => bucket(ProfileConfig),
upload_options => upload_options(ProfileConfig),
client_config => client_config(ProfileConfig, PoolName),
uploader_config => uploader_config(ProfileConfig),
pool_name => PoolName,
@ -128,12 +142,14 @@ handle_call(
{checkout_config, Pid},
_From,
#{
bucket := Bucket,
upload_options := Options,
client_config := ClientConfig,
uploader_config := UploaderConfig
} = State
) ->
ok = register_client(Pid, State),
{reply, {ok, ClientConfig, UploaderConfig}, State};
{reply, {Bucket, ClientConfig, Options, UploaderConfig}, State};
handle_call({checkin_config, Pid}, _From, State) ->
ok = unregister_client(Pid, State),
{reply, ok, State};
@ -146,6 +162,8 @@ handle_call(
{ok, PoolName} ->
NewState = State#{
profile_config => NewProfileConfig,
bucket => bucket(NewProfileConfig),
upload_options => upload_options(NewProfileConfig),
client_config => client_config(NewProfileConfig, PoolName),
uploader_config => uploader_config(NewProfileConfig),
http_pool_timeout => http_pool_timeout(NewProfileConfig),
@ -198,8 +216,6 @@ client_config(ProfileConfig, PoolName) ->
port => maps:get(port, ProfileConfig),
url_expire_time => maps:get(url_expire_time, ProfileConfig),
headers => maps:get(headers, HTTPOpts, #{}),
acl => maps:get(acl, ProfileConfig, undefined),
bucket => maps:get(bucket, ProfileConfig),
access_key_id => maps:get(access_key_id, ProfileConfig, undefined),
secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined),
request_timeout => maps:get(request_timeout, HTTPOpts, undefined),
@ -214,6 +230,12 @@ uploader_config(#{max_part_size := MaxPartSize, min_part_size := MinPartSize} =
max_part_size => MaxPartSize
}.
bucket(ProfileConfig) ->
maps:get(bucket, ProfileConfig).
upload_options(ProfileConfig) ->
#{acl => maps:get(acl, ProfileConfig, undefined)}.
scheme(#{ssl := #{enable := true}}) -> "https://";
scheme(_TransportOpts) -> "http://".

View File

@ -15,7 +15,7 @@
start_link/1,
child_spec/1,
id/1,
start_uploader/2
start_uploader/3
]).
-export([init/1]).
@ -43,10 +43,10 @@ child_spec(ProfileId) ->
id(ProfileId) ->
{?MODULE, ProfileId}.
-spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) ->
-spec start_uploader(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
emqx_types:startlink_ret() | {error, profile_not_found}.
start_uploader(ProfileId, Opts) ->
try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Opts]) of
start_uploader(ProfileId, Key, UploadOpts) ->
try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Key, UploadOpts]) of
Result -> Result
catch
exit:{noproc, _} -> {error, profile_not_found}

View File

@ -14,9 +14,6 @@
-export([translate/1]).
-export([translate/2]).
-type secret_access_key() :: string() | function().
-reflect_type([secret_access_key/0]).
roots() ->
[s3].
@ -26,6 +23,13 @@ tags() ->
[<<"S3">>].
fields(s3) ->
lists:append([
fields(s3_client),
fields(s3_uploader),
fields(s3_url_options),
props_with([bucket, acl], fields(s3_upload))
]);
fields(s3_client) ->
[
{access_key_id,
mk(
@ -36,21 +40,9 @@ fields(s3) ->
}
)},
{secret_access_key,
mk(
typerefl:alias("string", secret_access_key()),
emqx_schema_secret:mk(
#{
desc => ?DESC("secret_access_key"),
required => false,
sensitive => true,
converter => fun secret/2
}
)},
{bucket,
mk(
string(),
#{
desc => ?DESC("bucket"),
required => true
desc => ?DESC("secret_access_key")
}
)},
{host,
@ -69,16 +61,51 @@ fields(s3) ->
required => true
}
)},
{url_expire_time,
{transport_options,
mk(
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_s(),
ref(?MODULE, transport_options),
#{
default => <<"1h">>,
desc => ?DESC("url_expire_time"),
desc => ?DESC("transport_options"),
required => false
}
)}
];
fields(s3_upload) ->
[
{bucket,
mk(
string(),
#{
desc => ?DESC("bucket"),
required => true
}
)},
{key,
mk(
string(),
#{
desc => ?DESC("key"),
required => true
}
)},
{acl,
mk(
hoconsc:enum([
private,
public_read,
public_read_write,
authenticated_read,
bucket_owner_read,
bucket_owner_full_control
]),
#{
desc => ?DESC("acl"),
required => false
}
)}
];
fields(s3_uploader) ->
[
{min_part_size,
mk(
emqx_schema:bytesize(),
@ -98,27 +125,17 @@ fields(s3) ->
required => true,
validator => fun part_size_validator/1
}
)},
{acl,
)}
];
fields(s3_url_options) ->
[
{url_expire_time,
mk(
hoconsc:enum([
private,
public_read,
public_read_write,
authenticated_read,
bucket_owner_read,
bucket_owner_full_control
]),
%% not used in a `receive ... after' block, just timestamp comparison
emqx_schema:duration_s(),
#{
desc => ?DESC("acl"),
required => false
}
)},
{transport_options,
mk(
ref(?MODULE, transport_options),
#{
desc => ?DESC("transport_options"),
default => <<"1h">>,
desc => ?DESC("url_expire_time"),
required => false
}
)}
@ -145,17 +162,13 @@ fields(transport_options) ->
desc(s3) ->
"S3 connection options";
desc(s3_client) ->
"S3 connection options";
desc(s3_upload) ->
"S3 upload options";
desc(transport_options) ->
"Options for the HTTP transport layer used by the S3 client".
secret(undefined, #{}) ->
undefined;
secret(Secret, #{make_serializable := true}) ->
unicode:characters_to_binary(emqx_secret:unwrap(Secret));
secret(Secret, #{}) ->
_ = is_binary(Secret) orelse throw({expected_type, string}),
emqx_secret:wrap(unicode:characters_to_list(Secret)).
translate(Conf) ->
translate(Conf, #{}).

View File

@ -9,7 +9,7 @@
-behaviour(gen_statem).
-export([
start_link/2,
start_link/3,
write/2,
write/3,
@ -33,30 +33,25 @@
format_status/2
]).
-export_type([opts/0, config/0]).
-export_type([config/0]).
-type config() :: #{
min_part_size => pos_integer(),
max_part_size => pos_integer()
}.
-type opts() :: #{
key := string(),
headers => emqx_s3_client:headers()
}.
-type data() :: #{
profile_id := emqx_s3:profile_id(),
profile_id => emqx_s3:profile_id(),
client := emqx_s3_client:client(),
key := emqx_s3_client:key(),
upload_opts := emqx_s3_client:upload_options(),
buffer := iodata(),
buffer_size := non_neg_integer(),
min_part_size := pos_integer(),
max_part_size := pos_integer(),
upload_id := undefined | emqx_s3_client:upload_id(),
etags := [emqx_s3_client:etag()],
part_number := emqx_s3_client:part_number(),
headers := emqx_s3_client:headers()
part_number := emqx_s3_client:part_number()
}.
%% 5MB
@ -66,9 +61,10 @@
-define(DEFAULT_TIMEOUT, 30000).
-spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret().
start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) ->
gen_statem:start_link(?MODULE, [ProfileId, Opts], []).
-spec start_link(emqx_s3:profile_id(), emqx_s3_client:key(), emqx_s3_client:upload_options()) ->
gen_statem:start_ret().
start_link(ProfileId, Key, UploadOpts) when is_list(Key) ->
gen_statem:start_link(?MODULE, {profile, ProfileId, Key, UploadOpts}, []).
-spec write(pid(), iodata()) -> ok_or_error(term()).
write(Pid, WriteData) ->
@ -105,19 +101,23 @@ shutdown(Pid) ->
callback_mode() -> handle_event_function.
init([ProfileId, #{key := Key} = Opts]) ->
process_flag(trap_exit, true),
{ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId),
Client = client(ClientConfig),
{ok, upload_not_started, #{
init({profile, ProfileId, Key, UploadOpts}) ->
{Bucket, ClientConfig, BaseOpts, UploaderConfig} =
emqx_s3_profile_conf:checkout_config(ProfileId),
Upload = #{
profile_id => ProfileId,
client => Client,
headers => maps:get(headers, Opts, #{}),
client => client(Bucket, ClientConfig),
key => Key,
upload_opts => maps:merge(BaseOpts, UploadOpts)
},
init({upload, UploaderConfig, Upload});
init({upload, Config, Upload}) ->
process_flag(trap_exit, true),
{ok, upload_not_started, Upload#{
buffer => [],
buffer_size => 0,
min_part_size => maps:get(min_part_size, UploaderConfig, ?DEFAULT_MIN_PART_SIZE),
max_part_size => maps:get(max_part_size, UploaderConfig, ?DEFAULT_MAX_PART_SIZE),
min_part_size => maps:get(min_part_size, Config, ?DEFAULT_MIN_PART_SIZE),
max_part_size => maps:get(max_part_size, Config, ?DEFAULT_MAX_PART_SIZE),
upload_id => undefined,
etags => [],
part_number => 1
@ -221,8 +221,8 @@ maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} =
end.
-spec start_upload(data()) -> {started, data()} | {error, term()}.
start_upload(#{client := Client, key := Key, headers := Headers} = Data) ->
case emqx_s3_client:start_multipart(Client, Headers, Key) of
start_upload(#{client := Client, key := Key, upload_opts := UploadOpts} = Data) ->
case emqx_s3_client:start_multipart(Client, Key, UploadOpts) of
{ok, UploadId} ->
NewData = Data#{upload_id => UploadId},
{started, NewData};
@ -274,12 +274,9 @@ complete_upload(
} = Data0
) ->
case upload_part(Data0) of
{ok, #{etags := ETags} = Data1} ->
case
emqx_s3_client:complete_multipart(
Client, Key, UploadId, lists:reverse(ETags)
)
of
{ok, #{etags := ETagsRev} = Data1} ->
ETags = lists:reverse(ETagsRev),
case emqx_s3_client:complete_multipart(Client, Key, UploadId, ETags) of
ok ->
{ok, Data1};
{error, _} = Error ->
@ -309,11 +306,11 @@ put_object(
#{
client := Client,
key := Key,
buffer := Buffer,
headers := Headers
upload_opts := UploadOpts,
buffer := Buffer
}
) ->
case emqx_s3_client:put_object(Client, Headers, Key, Buffer) of
case emqx_s3_client:put_object(Client, Key, UploadOpts, Buffer) of
ok ->
ok;
{error, _} = Error ->
@ -337,5 +334,5 @@ unwrap(WrappedData) ->
is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) ->
BufferSize + iolist_size(WriteData) =< MaxPartSize.
client(Config) ->
emqx_s3_client:create(Config).
client(Bucket, Config) ->
emqx_s3_client:create(Bucket, Config).

View File

@ -79,7 +79,7 @@ t_multipart_upload(Config) ->
Client = client(Config),
{ok, UploadId} = emqx_s3_client:start_multipart(Client, Key),
{ok, UploadId} = emqx_s3_client:start_multipart(Client, Key, #{}),
Data = data(6_000_000),
@ -97,7 +97,7 @@ t_simple_put(Config) ->
Data = data(6_000_000),
ok = emqx_s3_client:put_object(Client, Key, Data).
ok = emqx_s3_client:put_object(Client, Key, #{acl => private}, Data).
t_list(Config) ->
Key = ?config(key, Config),
@ -123,7 +123,7 @@ t_url(Config) ->
Key = ?config(key, Config),
Client = client(Config),
ok = emqx_s3_client:put_object(Client, Key, <<"data">>),
ok = emqx_s3_client:put_object(Client, Key, #{acl => public_read}, <<"data">>),
Url = emqx_s3_client:uri(Client, Key),
@ -135,20 +135,18 @@ t_url(Config) ->
t_no_acl(Config) ->
Key = ?config(key, Config),
ClientConfig = emqx_s3_profile_conf:client_config(
profile_config(Config), ?config(ehttpc_pool_name, Config)
),
Client = emqx_s3_client:create(maps:without([acl], ClientConfig)),
Client = client(Config),
ok = emqx_s3_client:put_object(Client, Key, <<"data">>).
ok = emqx_s3_client:put_object(Client, Key, #{}, <<"data">>).
t_extra_headers(Config0) ->
Config = [{extra_headers, #{'Content-Type' => <<"application/json">>}} | Config0],
Key = ?config(key, Config),
Client = client(Config),
Opts = #{acl => public_read},
Data = #{foo => bar},
ok = emqx_s3_client:put_object(Client, Key, emqx_utils_json:encode(Data)),
ok = emqx_s3_client:put_object(Client, Key, Opts, emqx_utils_json:encode(Data)),
Url = emqx_s3_client:uri(Client, Key),
@ -164,10 +162,11 @@ t_extra_headers(Config0) ->
%%--------------------------------------------------------------------
client(Config) ->
Bucket = ?config(bucket, Config),
ClientConfig = emqx_s3_profile_conf:client_config(
profile_config(Config), ?config(ehttpc_pool_name, Config)
),
emqx_s3_client:create(ClientConfig).
emqx_s3_client:create(Bucket, ClientConfig).
profile_config(Config) ->
ProfileConfig0 = emqx_s3_test_helpers:base_config(?config(conn_type, Config)),
@ -175,7 +174,6 @@ profile_config(Config) ->
fun inject_config/3,
ProfileConfig0,
#{
bucket => ?config(bucket, Config),
[transport_options, pool_type] => ?config(pool_type, Config),
[transport_options, headers] => ?config(extra_headers, Config)
}

View File

@ -46,7 +46,7 @@ end_per_testcase(_TestCase, _Config) ->
t_regular_outdated_pool_cleanup(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
@ -94,7 +94,7 @@ t_timeout_pool_cleanup(Config) ->
%% Start uploader
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
ok = emqx_s3_uploader:write(Pid, <<"data">>),
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),

View File

@ -132,7 +132,7 @@ t_sensitive_config_no_leak(_Config) ->
Error = #{
kind := validation_error,
path := "s3.secret_access_key",
reason := {expected_type, string}
reason := invalid_type
}
]} when map_size(Error) == 3,
emqx_s3_schema:translate(

View File

@ -133,7 +133,7 @@ end_per_testcase(_TestCase, _Config) ->
t_happy_path_simple_put(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -165,7 +165,7 @@ t_happy_path_simple_put(Config) ->
t_happy_path_multi(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -233,7 +233,7 @@ t_signed_nonascii_url_download(_Config) ->
t_abort_multi(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -260,7 +260,7 @@ t_abort_multi(Config) ->
t_abort_simple_put(_Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -278,7 +278,7 @@ t_abort_simple_put(_Config) ->
t_config_switch(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
OldBucket = ?config(bucket, Config),
{ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}),
[Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
@ -304,7 +304,7 @@ t_config_switch(Config) ->
),
%% Now check that new uploader uses new config
{ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}),
ok = emqx_s3_uploader:write(Pid1, Data0),
ok = emqx_s3_uploader:complete(Pid1),
@ -318,7 +318,7 @@ t_config_switch(Config) ->
t_config_switch_http_settings(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
OldBucket = ?config(bucket, Config),
{ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid0} = emqx_s3:start_uploader(profile_id(), Key, #{}),
[Data0, Data1] = data($a, 6 * 1024 * 1024, 2),
@ -345,7 +345,7 @@ t_config_switch_http_settings(Config) ->
),
%% Now check that new uploader uses new config
{ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid1} = emqx_s3:start_uploader(profile_id(), Key, #{}),
ok = emqx_s3_uploader:write(Pid1, Data0),
ok = emqx_s3_uploader:complete(Pid1),
@ -360,7 +360,7 @@ t_start_multipart_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -386,7 +386,7 @@ t_upload_part_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -414,7 +414,7 @@ t_abort_multipart_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -442,7 +442,7 @@ t_complete_multipart_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -470,7 +470,7 @@ t_put_object_error(Config) ->
_ = process_flag(trap_exit, true),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -496,7 +496,7 @@ t_put_object_error(Config) ->
t_too_large(Config) ->
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -533,7 +533,7 @@ t_tls_error(Config) ->
),
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
Key = emqx_s3_test_helpers:unique_key(),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),
@ -553,7 +553,7 @@ t_no_profile(_Config) ->
Key = emqx_s3_test_helpers:unique_key(),
?assertMatch(
{error, profile_not_found},
emqx_s3:start_uploader(<<"no-profile">>, #{key => Key})
emqx_s3:start_uploader(<<"no-profile">>, Key, #{})
).
%%--------------------------------------------------------------------
@ -572,7 +572,7 @@ list_objects(Config) ->
proplists:get_value(contents, Props).
upload(Key, ChunkSize, ChunkCount) ->
{ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}),
{ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
_ = erlang:monitor(process, Pid),

View File

@ -182,6 +182,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_ft,
:emqx_license,
:emqx_s3,
:emqx_bridge_s3,
:emqx_schema_registry,
:emqx_enterprise,
:emqx_bridge_kinesis,

View File

@ -103,6 +103,7 @@ is_community_umbrella_app("apps/emqx_oracle") -> false;
is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false;
is_community_umbrella_app("apps/emqx_ft") -> false;
is_community_umbrella_app("apps/emqx_s3") -> false;
is_community_umbrella_app("apps/emqx_bridge_s3") -> false;
is_community_umbrella_app("apps/emqx_schema_registry") -> false;
is_community_umbrella_app("apps/emqx_enterprise") -> false;
is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false;

View File

@ -0,0 +1,23 @@
emqx_bridge_s3 {
config_connector.label:
"""S3 Connector Configuration"""
config_connector.desc:
"""Configuration for a connector to S3 API compatible storage service."""
s3_upload.label:
"""S3 Simple Upload"""
s3_upload.desc:
"""Action to upload a single object to the S3 service."""
s3_upload_parameters.label:
"""S3 Upload action parameters"""
s3_upload_parameters.desc:
"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content."""
s3_object_content.label:
"""S3 Object Content"""
s3_object_content.desc:
"""Content of the S3 object being uploaded. Supports templates."""
}

View File

@ -9,6 +9,15 @@ secret_access_key.desc:
bucket.desc:
"""The name of the S3 bucket."""
bucket.label:
"""Bucket"""
key.desc:
"""Key of the S3 object."""
key.label:
"""Object Key"""
host.desc:
"""The host of the S3 endpoint."""