feat(s3): introduce S3 connector and action
This is a trivial connector based on `emqx_s3` and simple action that maps each incoming event into an S3 object upload. Due to current `emqx_s3` limitation this bridge is compatible with backends providing S3 API with path-style bucket access.
This commit is contained in:
parent
8f66bd9ddf
commit
802c760406
|
@ -106,7 +106,8 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_opents_action_info,
|
||||
emqx_bridge_rabbitmq_action_info,
|
||||
emqx_bridge_greptimedb_action_info,
|
||||
emqx_bridge_tdengine_action_info
|
||||
emqx_bridge_tdengine_action_info,
|
||||
emqx_bridge_s3_action_info
|
||||
].
|
||||
-else.
|
||||
hard_coded_action_info_modules_ee() ->
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
Business Source License 1.1
|
||||
|
||||
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||
Licensed Work: EMQX Enterprise Edition
|
||||
The Licensed Work is (c) 2023
|
||||
Hangzhou EMQ Technologies Co., Ltd.
|
||||
Additional Use Grant: Students and educators are granted right to copy,
|
||||
modify, and create derivative work for research
|
||||
or education.
|
||||
Change Date: 2028-01-26
|
||||
Change License: Apache License, Version 2.0
|
||||
|
||||
For information about alternative licensing arrangements for the Software,
|
||||
please contact Licensor: https://www.emqx.com/en/contact
|
||||
|
||||
Notice
|
||||
|
||||
The Business Source License (this document, or the “License”) is not an Open
|
||||
Source license. However, the Licensed Work will eventually be made available
|
||||
under an Open Source License, as stated in this License.
|
||||
|
||||
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
|
||||
“Business Source License” is a trademark of MariaDB Corporation Ab.
|
||||
|
||||
-----------------------------------------------------------------------------
|
||||
|
||||
Business Source License 1.1
|
||||
|
||||
Terms
|
||||
|
||||
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||
works, redistribute, and make non-production use of the Licensed Work. The
|
||||
Licensor may make an Additional Use Grant, above, permitting limited
|
||||
production use.
|
||||
|
||||
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||
available distribution of a specific version of the Licensed Work under this
|
||||
License, whichever comes first, the Licensor hereby grants you rights under
|
||||
the terms of the Change License, and the rights granted in the paragraph
|
||||
above terminate.
|
||||
|
||||
If your use of the Licensed Work does not comply with the requirements
|
||||
currently in effect as described in this License, you must purchase a
|
||||
commercial license from the Licensor, its affiliated entities, or authorized
|
||||
resellers, or you must refrain from using the Licensed Work.
|
||||
|
||||
All copies of the original and modified Licensed Work, and derivative works
|
||||
of the Licensed Work, are subject to this License. This License applies
|
||||
separately for each version of the Licensed Work and the Change Date may vary
|
||||
for each version of the Licensed Work released by Licensor.
|
||||
|
||||
You must conspicuously display this License on each original or modified copy
|
||||
of the Licensed Work. If you receive the Licensed Work in original or
|
||||
modified form from a third party, the terms and conditions set forth in this
|
||||
License apply to your use of that work.
|
||||
|
||||
Any use of the Licensed Work in violation of this License will automatically
|
||||
terminate your rights under this License for the current and all other
|
||||
versions of the Licensed Work.
|
||||
|
||||
This License does not grant you any right in any trademark or logo of
|
||||
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||
Licensor as expressly required by this License).
|
||||
|
||||
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||
TITLE.
|
||||
|
||||
MariaDB hereby grants you permission to use this License’s text to license
|
||||
your works, and to refer to it using the trademark “Business Source License”,
|
||||
as long as you comply with the Covenants of Licensor below.
|
||||
|
||||
Covenants of Licensor
|
||||
|
||||
In consideration of the right to use this License’s text and the “Business
|
||||
Source License” name and trademark, Licensor covenants to MariaDB, and to all
|
||||
other recipients of the licensed work to be provided by Licensor:
|
||||
|
||||
1. To specify as the Change License the GPL Version 2.0 or any later version,
|
||||
or a license that is compatible with GPL Version 2.0 or a later version,
|
||||
where “compatible” means that software provided under the Change License can
|
||||
be included in a program with software provided under GPL Version 2.0 or a
|
||||
later version. Licensor may specify additional Change Licenses without
|
||||
limitation.
|
||||
|
||||
2. To either: (a) specify an additional grant of rights to use that does not
|
||||
impose any additional restriction on the right granted in this License, as
|
||||
the Additional Use Grant; or (b) insert the text “None”.
|
||||
|
||||
3. To specify a Change Date.
|
||||
|
||||
4. Not to modify this License in any other way.
|
|
@ -0,0 +1,16 @@
|
|||
# EMQX S3 Bridge
|
||||
|
||||
This application provides connector and action implementations for the EMQX to integrate with Amazon S3 compatible storage services as part of the EMQX data integration pipelines.
|
||||
Users can leverage [EMQX Rule Engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) to create rules that publish message data to S3 storage service.
|
||||
|
||||
## Documentation
|
||||
|
||||
Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) for the EMQX rules engine introduction.
|
||||
|
||||
## Contributing
|
||||
|
||||
Please see our [contributing.md](../../CONTRIBUTING.md).
|
||||
|
||||
## License
|
||||
|
||||
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).
|
|
@ -0,0 +1,2 @@
|
|||
minio
|
||||
toxiproxy
|
|
@ -0,0 +1,6 @@
|
|||
%% -*- mode: erlang; -*-
|
||||
|
||||
{erl_opts, [debug_info]}.
|
||||
{deps, [
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||
]}.
|
|
@ -0,0 +1,17 @@
|
|||
{application, emqx_bridge_s3, [
|
||||
{description, "EMQX Enterprise S3 Bridge"},
|
||||
{vsn, "0.1.0"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
erlcloud,
|
||||
emqx_resource,
|
||||
emqx_s3
|
||||
]},
|
||||
{env, [
|
||||
{emqx_action_info_modules, [emqx_bridge_s3_action_info]}
|
||||
]},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
]}.
|
|
@ -0,0 +1,221 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include("emqx_bridge_s3.hrl").
|
||||
|
||||
-behaviour(hocon_schema).
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
connector_examples/1
|
||||
]).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `hocon_schema' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
namespace() ->
|
||||
"bridge_s3".
|
||||
|
||||
roots() ->
|
||||
[].
|
||||
|
||||
fields(Field) when
|
||||
Field == "get_connector";
|
||||
Field == "put_connector";
|
||||
Field == "post_connector"
|
||||
->
|
||||
emqx_connector_schema:api_fields(Field, ?CONNECTOR, fields(s3_connector_config));
|
||||
fields(Field) when
|
||||
Field == "get_bridge_v2";
|
||||
Field == "put_bridge_v2";
|
||||
Field == "post_bridge_v2"
|
||||
->
|
||||
emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
|
||||
fields(action) ->
|
||||
{?ACTION,
|
||||
hoconsc:mk(
|
||||
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
|
||||
#{
|
||||
desc => <<"S3 Action Config">>,
|
||||
required => false
|
||||
}
|
||||
)};
|
||||
fields("config_connector") ->
|
||||
lists:append([
|
||||
emqx_connector_schema:common_fields(),
|
||||
fields(s3_connector_config),
|
||||
[
|
||||
{resource_opts,
|
||||
hoconsc:mk(
|
||||
?R_REF(s3_connector_resource_opts),
|
||||
emqx_resource_schema:resource_opts_meta()
|
||||
)}
|
||||
]
|
||||
]);
|
||||
fields(?ACTION) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
hoconsc:mk(
|
||||
?R_REF(s3_upload_parameters),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_upload)
|
||||
}
|
||||
),
|
||||
#{
|
||||
resource_opts_ref => ?R_REF(s3_action_resource_opts)
|
||||
}
|
||||
);
|
||||
fields(s3_connector_config) ->
|
||||
emqx_s3_schema:fields(s3_client);
|
||||
fields(s3_upload_parameters) ->
|
||||
emqx_s3_schema:fields(s3_upload) ++
|
||||
[
|
||||
{content,
|
||||
hoconsc:mk(
|
||||
string(),
|
||||
#{
|
||||
required => false,
|
||||
default => <<"${.}">>,
|
||||
desc => ?DESC(s3_object_content)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(s3_action_resource_opts) ->
|
||||
UnsupportedOpts = [batch_size, batch_time],
|
||||
lists:filter(
|
||||
fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
|
||||
emqx_bridge_v2_schema:action_resource_opts_fields()
|
||||
);
|
||||
fields(s3_connector_resource_opts) ->
|
||||
CommonOpts = emqx_connector_schema:common_resource_opts_subfields(),
|
||||
lists:filter(
|
||||
fun({N, _}) -> lists:member(N, CommonOpts) end,
|
||||
emqx_connector_schema:resource_opts_fields()
|
||||
).
|
||||
|
||||
desc("config_connector") ->
|
||||
?DESC(config_connector);
|
||||
desc(s3_upload) ->
|
||||
?DESC(s3_upload);
|
||||
desc(s3_upload_parameters) ->
|
||||
?DESC(s3_upload_parameters);
|
||||
desc(s3_action_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, resource_opts);
|
||||
desc(s3_connector_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, resource_opts);
|
||||
desc(_Name) ->
|
||||
undefined.
|
||||
|
||||
%% Examples
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"s3">> => #{
|
||||
summary => <<"S3 Simple Upload">>,
|
||||
value => action_example(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
action_example(post) ->
|
||||
maps:merge(
|
||||
action_example(put),
|
||||
#{
|
||||
type => atom_to_binary(?ACTION),
|
||||
name => <<"my_s3_action">>
|
||||
}
|
||||
);
|
||||
action_example(get) ->
|
||||
maps:merge(
|
||||
action_example(put),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
action_example(put) ->
|
||||
#{
|
||||
enable => true,
|
||||
connector => <<"my_s3_connector">>,
|
||||
description => <<"My action">>,
|
||||
parameters => #{
|
||||
bucket => <<"${clientid}">>,
|
||||
key => <<"${topic}">>,
|
||||
content => <<"${payload}">>,
|
||||
acl => <<"public_read">>
|
||||
},
|
||||
resource_opts => #{
|
||||
query_mode => <<"sync">>,
|
||||
inflight_window => 10
|
||||
}
|
||||
}.
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"s3_aws">> => #{
|
||||
summary => <<"S3 Connector">>,
|
||||
value => connector_example(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_example(get) ->
|
||||
maps:merge(
|
||||
connector_example(put),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
connector_example(post) ->
|
||||
maps:merge(
|
||||
connector_example(put),
|
||||
#{
|
||||
type => atom_to_binary(?CONNECTOR),
|
||||
name => <<"my_s3_connector">>
|
||||
}
|
||||
);
|
||||
connector_example(put) ->
|
||||
#{
|
||||
enable => true,
|
||||
description => <<"My S3 connector">>,
|
||||
host => <<"s3.eu-east-1.amazonaws.com">>,
|
||||
port => 443,
|
||||
access_key_id => <<"ACCESS">>,
|
||||
secret_access_key => <<"SECRET">>,
|
||||
transport_options => #{
|
||||
ssl => #{
|
||||
enable => true,
|
||||
verify => <<"verify_peer">>
|
||||
},
|
||||
connect_timeout => <<"1s">>,
|
||||
request_timeout => <<"60s">>,
|
||||
pool_size => 4,
|
||||
max_retries => 1,
|
||||
enable_pipelining => 1
|
||||
}
|
||||
}.
|
|
@ -0,0 +1,11 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-ifndef(__EMQX_BRIDGE_S3_HRL__).
|
||||
-define(__EMQX_BRIDGE_S3_HRL__, true).
|
||||
|
||||
-define(ACTION, s3_upload).
|
||||
-define(CONNECTOR, s3).
|
||||
|
||||
-endif.
|
|
@ -0,0 +1,19 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-export([
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
]).
|
||||
|
||||
action_type_name() -> s3_upload.
|
||||
|
||||
connector_type_name() -> s3.
|
||||
|
||||
schema_module() -> emqx_bridge_s3.
|
|
@ -0,0 +1,222 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_connector).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
-export([
|
||||
callback_mode/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_query/3,
|
||||
% on_batch_query/3,
|
||||
on_get_status/2,
|
||||
on_get_channel_status/3
|
||||
]).
|
||||
|
||||
-type config() :: #{
|
||||
access_key_id => string(),
|
||||
secret_access_key => emqx_secret:t(string()),
|
||||
host := string(),
|
||||
port := pos_integer(),
|
||||
transport_options => emqx_s3:transport_options()
|
||||
}.
|
||||
|
||||
-type channel_config() :: #{
|
||||
parameters := #{
|
||||
bucket := string(),
|
||||
key := string(),
|
||||
content := string(),
|
||||
acl => emqx_s3:acl()
|
||||
}
|
||||
}.
|
||||
|
||||
-type channel_state() :: #{
|
||||
bucket := emqx_template:str(),
|
||||
key := emqx_template:str(),
|
||||
upload_options => emqx_s3_client:upload_options()
|
||||
}.
|
||||
|
||||
-type state() :: #{
|
||||
pool_name => resource_id(),
|
||||
pool_pid => pid(),
|
||||
client_config := emqx_s3_client:config(),
|
||||
channels := #{channel_id() => channel_state()}
|
||||
}.
|
||||
|
||||
%%
|
||||
|
||||
-spec callback_mode() -> callback_mode().
|
||||
callback_mode() ->
|
||||
always_sync.
|
||||
|
||||
%% Management
|
||||
|
||||
-spec on_start(_InstanceId :: resource_id(), config()) ->
|
||||
{ok, state()} | {error, _Reason}.
|
||||
on_start(InstId, Config) ->
|
||||
PoolName = InstId,
|
||||
S3Config = Config#{url_expire_time => 0},
|
||||
State = #{
|
||||
pool_name => PoolName,
|
||||
client_config => emqx_s3_profile_conf:client_config(S3Config, PoolName),
|
||||
channels => #{}
|
||||
},
|
||||
HttpConfig = emqx_s3_profile_conf:http_config(Config),
|
||||
case ehttpc_sup:start_pool(PoolName, HttpConfig) of
|
||||
{ok, Pid} ->
|
||||
?SLOG(info, #{msg => "s3_connector_start_http_pool_success", pool_name => PoolName}),
|
||||
{ok, State#{pool_pid => Pid}};
|
||||
{error, Reason} = Error ->
|
||||
?SLOG(error, #{
|
||||
msg => "s3_connector_start_http_pool_fail",
|
||||
pool_name => PoolName,
|
||||
http_config => HttpConfig,
|
||||
reason => Reason
|
||||
}),
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec on_stop(_InstanceId :: resource_id(), state()) ->
|
||||
ok.
|
||||
on_stop(InstId, _State = #{pool_name := PoolName}) ->
|
||||
case ehttpc_sup:stop_pool(PoolName) of
|
||||
ok ->
|
||||
?tp(s3_bridge_stopped, #{instance_id => InstId}),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "s3_connector_http_pool_stop_fail",
|
||||
pool_name => PoolName,
|
||||
reason => Reason
|
||||
}),
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec on_get_status(_InstanceId :: resource_id(), state()) ->
|
||||
health_check_status().
|
||||
on_get_status(_InstId, State = #{client_config := Config}) ->
|
||||
try erlcloud_s3:list_buckets(emqx_s3_client:aws_config(Config)) of
|
||||
Props when is_list(Props) ->
|
||||
?status_connected
|
||||
catch
|
||||
error:{aws_error, {http_error, _Code, _, Reason}} ->
|
||||
{?status_disconnected, State, Reason};
|
||||
error:{aws_error, {socket_error, Reason}} ->
|
||||
{?status_disconnected, State, Reason}
|
||||
end.
|
||||
|
||||
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
|
||||
{ok, state()} | {error, _Reason}.
|
||||
on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) ->
|
||||
ChannelState = init_channel_state(Config),
|
||||
{ok, State#{channels => Channels#{ChannelId => ChannelState}}}.
|
||||
|
||||
-spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) ->
|
||||
{ok, state()}.
|
||||
on_remove_channel(_InstId, State = #{channels := Channels}, ChannelId) ->
|
||||
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
|
||||
|
||||
-spec on_get_channels(_InstanceId :: resource_id()) ->
|
||||
[_ChannelConfig].
|
||||
on_get_channels(InstId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(InstId).
|
||||
|
||||
-spec on_get_channel_status(_InstanceId :: resource_id(), channel_id(), state()) ->
|
||||
channel_status().
|
||||
on_get_channel_status(_InstId, ChannelId, #{channels := Channels}) ->
|
||||
case maps:get(ChannelId, Channels, undefined) of
|
||||
_ChannelState = #{} ->
|
||||
%% TODO
|
||||
%% Since bucket name may be templated, we can't really provide any
|
||||
%% additional information regarding the channel health.
|
||||
?status_connected;
|
||||
undefined ->
|
||||
?status_disconnected
|
||||
end.
|
||||
|
||||
init_channel_state(#{parameters := Parameters}) ->
|
||||
#{
|
||||
bucket => emqx_template:parse(maps:get(bucket, Parameters)),
|
||||
key => emqx_template:parse(maps:get(key, Parameters)),
|
||||
content => emqx_template:parse(maps:get(content, Parameters)),
|
||||
upload_options => #{
|
||||
acl => maps:get(acl, Parameters, undefined)
|
||||
}
|
||||
}.
|
||||
|
||||
%% Queries
|
||||
|
||||
-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
|
||||
|
||||
-spec on_query(_InstanceId :: resource_id(), query(), state()) ->
|
||||
{ok, _Result} | {error, _Reason}.
|
||||
on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
|
||||
case maps:get(Tag, Channels, undefined) of
|
||||
ChannelState = #{} ->
|
||||
run_simple_upload(InstId, Data, ChannelState, Config);
|
||||
undefined ->
|
||||
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
||||
end.
|
||||
|
||||
run_simple_upload(
|
||||
InstId,
|
||||
Data,
|
||||
#{
|
||||
bucket := BucketTemplate,
|
||||
key := KeyTemplate,
|
||||
content := ContentTemplate,
|
||||
upload_options := UploadOpts
|
||||
},
|
||||
Config
|
||||
) ->
|
||||
Bucket = render_bucket(BucketTemplate, Data),
|
||||
Client = emqx_s3_client:create(Bucket, Config),
|
||||
Key = render_key(KeyTemplate, Data),
|
||||
Content = render_content(ContentTemplate, Data),
|
||||
case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
|
||||
ok ->
|
||||
?tp(s3_bridge_connector_upload_ok, #{
|
||||
instance_id => InstId,
|
||||
bucket => Bucket,
|
||||
key => Key
|
||||
}),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
{error, map_error(Reason)}
|
||||
end.
|
||||
|
||||
map_error({socket_error, _} = Reason) ->
|
||||
{recoverable_error, Reason};
|
||||
map_error(Reason) ->
|
||||
%% TODO: Recoverable errors.
|
||||
{unrecoverable_error, Reason}.
|
||||
|
||||
render_bucket(Template, Data) ->
|
||||
case emqx_template:render(Template, {emqx_jsonish, Data}) of
|
||||
{Result, []} ->
|
||||
iolist_to_string(Result);
|
||||
{_, Errors} ->
|
||||
erlang:error({unrecoverable_error, {bucket_undefined, Errors}})
|
||||
end.
|
||||
|
||||
render_key(Template, Data) ->
|
||||
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
|
||||
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
|
||||
iolist_to_string(Result).
|
||||
|
||||
render_content(Template, Data) ->
|
||||
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
|
||||
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
|
||||
Result.
|
||||
|
||||
iolist_to_string(IOList) ->
|
||||
unicode:characters_to_list(IOList).
|
|
@ -0,0 +1,171 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
%% See `emqx_bridge_s3.hrl`.
|
||||
-define(BRIDGE_TYPE, <<"s3_upload">>).
|
||||
-define(CONNECTOR_TYPE, <<"s3">>).
|
||||
|
||||
-define(PROXY_NAME, "minio_tcp").
|
||||
-define(CONTENT_TYPE, "application/x-emqx-payload").
|
||||
|
||||
%% CT Setup
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
% Setup toxiproxy
|
||||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
_ = emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
emqx,
|
||||
emqx_conf,
|
||||
emqx_connector,
|
||||
emqx_bridge_s3,
|
||||
emqx_bridge,
|
||||
emqx_rule_engine,
|
||||
emqx_management,
|
||||
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
{ok, _} = emqx_common_test_http:create_default_app(),
|
||||
[
|
||||
{apps, Apps},
|
||||
{proxy_host, ProxyHost},
|
||||
{proxy_port, ProxyPort},
|
||||
{proxy_name, ?PROXY_NAME}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
ok = emqx_cth_suite:stop(?config(apps, Config)).
|
||||
|
||||
%% Testcases
|
||||
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
ct:timetrap(timer:seconds(30)),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
Name = iolist_to_binary(io_lib:format("~s~p", [TestCase, erlang:unique_integer()])),
|
||||
ConnectorConfig = connector_config(Name, Config),
|
||||
ActionConfig = action_config(Name, Name),
|
||||
[
|
||||
{connector_type, ?CONNECTOR_TYPE},
|
||||
{connector_name, Name},
|
||||
{connector_config, ConnectorConfig},
|
||||
{bridge_type, ?BRIDGE_TYPE},
|
||||
{bridge_name, Name},
|
||||
{bridge_config, ActionConfig}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ok = snabbkaffe:stop(),
|
||||
ok.
|
||||
|
||||
connector_config(Name, _Config) ->
|
||||
BaseConf = emqx_s3_test_helpers:base_raw_config(tcp),
|
||||
parse_and_check_config(<<"connectors">>, ?CONNECTOR_TYPE, Name, #{
|
||||
<<"enable">> => true,
|
||||
<<"description">> => <<"S3 Connector">>,
|
||||
<<"host">> => maps:get(<<"host">>, BaseConf),
|
||||
<<"port">> => maps:get(<<"port">>, BaseConf),
|
||||
<<"access_key_id">> => maps:get(<<"access_key_id">>, BaseConf),
|
||||
<<"secret_access_key">> => maps:get(<<"secret_access_key">>, BaseConf),
|
||||
<<"transport_options">> => #{
|
||||
<<"headers">> => #{
|
||||
<<"content-type">> => <<?CONTENT_TYPE>>
|
||||
},
|
||||
<<"connect_timeout">> => 1000,
|
||||
<<"request_timeout">> => 1000,
|
||||
<<"pool_size">> => 4,
|
||||
<<"max_retries">> => 0,
|
||||
<<"enable_pipelining">> => 1
|
||||
}
|
||||
}).
|
||||
|
||||
action_config(Name, ConnectorId) ->
|
||||
parse_and_check_config(<<"actions">>, ?BRIDGE_TYPE, Name, #{
|
||||
<<"enable">> => true,
|
||||
<<"connector">> => ConnectorId,
|
||||
<<"parameters">> => #{
|
||||
<<"bucket">> => <<"${clientid}">>,
|
||||
<<"key">> => <<"${topic}">>,
|
||||
<<"content">> => <<"${payload}">>,
|
||||
<<"acl">> => <<"public_read">>
|
||||
},
|
||||
<<"resource_opts">> => #{
|
||||
<<"buffer_mode">> => <<"memory_only">>,
|
||||
<<"buffer_seg_bytes">> => <<"10MB">>,
|
||||
<<"health_check_interval">> => <<"5s">>,
|
||||
<<"inflight_window">> => 40,
|
||||
<<"max_buffer_bytes">> => <<"256MB">>,
|
||||
<<"metrics_flush_interval">> => <<"1s">>,
|
||||
<<"query_mode">> => <<"sync">>,
|
||||
<<"request_ttl">> => <<"60s">>,
|
||||
<<"resume_interval">> => <<"5s">>,
|
||||
<<"worker_pool_size">> => <<"4">>
|
||||
}
|
||||
}).
|
||||
|
||||
parse_and_check_config(Root, Type, Name, ConfigIn) ->
|
||||
Schema =
|
||||
case Root of
|
||||
<<"connectors">> -> emqx_connector_schema;
|
||||
<<"actions">> -> emqx_bridge_v2_schema
|
||||
end,
|
||||
#{Root := #{Type := #{Name := Config}}} =
|
||||
hocon_tconf:check_plain(
|
||||
Schema,
|
||||
#{Root => #{Type => #{Name => ConfigIn}}},
|
||||
#{required => false, atom_key => false}
|
||||
),
|
||||
ct:pal("parsed config: ~p", [Config]),
|
||||
ConfigIn.
|
||||
|
||||
t_start_stop(Config) ->
|
||||
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
|
||||
|
||||
t_create_via_http(Config) ->
|
||||
emqx_bridge_v2_testlib:t_create_via_http(Config).
|
||||
|
||||
t_on_get_status(Config) ->
|
||||
emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
|
||||
|
||||
t_sync_query(Config) ->
|
||||
Bucket = emqx_s3_test_helpers:unique_bucket(),
|
||||
Topic = "a/b/c",
|
||||
Payload = rand:bytes(1024),
|
||||
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
|
||||
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
|
||||
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||
Config,
|
||||
fun() -> mk_message(Bucket, Topic, Payload) end,
|
||||
fun(Res) -> ?assertMatch(ok, Res) end,
|
||||
s3_bridge_connector_upload_ok
|
||||
),
|
||||
?assertMatch(
|
||||
#{
|
||||
content := Payload,
|
||||
content_type := ?CONTENT_TYPE
|
||||
},
|
||||
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
|
||||
).
|
||||
|
||||
mk_message(ClientId, Topic, Payload) ->
|
||||
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
|
||||
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
|
||||
Event.
|
|
@ -66,6 +66,8 @@ resource_type(tdengine) ->
|
|||
emqx_bridge_tdengine_connector;
|
||||
resource_type(rabbitmq) ->
|
||||
emqx_bridge_rabbitmq_connector;
|
||||
resource_type(s3) ->
|
||||
emqx_bridge_s3_connector;
|
||||
resource_type(Type) ->
|
||||
error({unknown_connector_type, Type}).
|
||||
|
||||
|
@ -270,6 +272,14 @@ connector_structs() ->
|
|||
desc => <<"RabbitMQ Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{s3,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_s3, "config_connector")),
|
||||
#{
|
||||
desc => <<"S3 Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
|
@ -296,7 +306,8 @@ schema_modules() ->
|
|||
emqx_bridge_rabbitmq_connector_schema,
|
||||
emqx_bridge_opents_connector,
|
||||
emqx_bridge_greptimedb,
|
||||
emqx_bridge_tdengine_connector
|
||||
emqx_bridge_tdengine_connector,
|
||||
emqx_bridge_s3
|
||||
].
|
||||
|
||||
api_schemas(Method) ->
|
||||
|
@ -332,7 +343,8 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
|
||||
api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method),
|
||||
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method)
|
||||
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method),
|
||||
api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector")
|
||||
].
|
||||
|
||||
api_ref(Module, Type, Method) ->
|
||||
|
|
|
@ -167,7 +167,9 @@ connector_type_to_bridge_types(greptimedb) ->
|
|||
connector_type_to_bridge_types(tdengine) ->
|
||||
[tdengine];
|
||||
connector_type_to_bridge_types(rabbitmq) ->
|
||||
[rabbitmq].
|
||||
[rabbitmq];
|
||||
connector_type_to_bridge_types(s3) ->
|
||||
[s3].
|
||||
|
||||
actions_config_name(action) -> <<"actions">>;
|
||||
actions_config_name(source) -> <<"sources">>.
|
||||
|
|
1
mix.exs
1
mix.exs
|
@ -182,6 +182,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
:emqx_ft,
|
||||
:emqx_license,
|
||||
:emqx_s3,
|
||||
:emqx_bridge_s3,
|
||||
:emqx_schema_registry,
|
||||
:emqx_enterprise,
|
||||
:emqx_bridge_kinesis,
|
||||
|
|
|
@ -103,6 +103,7 @@ is_community_umbrella_app("apps/emqx_oracle") -> false;
|
|||
is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false;
|
||||
is_community_umbrella_app("apps/emqx_ft") -> false;
|
||||
is_community_umbrella_app("apps/emqx_s3") -> false;
|
||||
is_community_umbrella_app("apps/emqx_bridge_s3") -> false;
|
||||
is_community_umbrella_app("apps/emqx_schema_registry") -> false;
|
||||
is_community_umbrella_app("apps/emqx_enterprise") -> false;
|
||||
is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false;
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
emqx_bridge_s3 {
|
||||
|
||||
config_connector.label:
|
||||
"""S3 Connector Configuration"""
|
||||
config_connector.desc:
|
||||
"""Configuration for a connector to S3 API compatible storage service."""
|
||||
|
||||
s3_upload.label:
|
||||
"""S3 Simple Upload"""
|
||||
s3_upload.desc:
|
||||
"""Action to upload a single object to the S3 service."""
|
||||
|
||||
s3_upload_parameters.label:
|
||||
"""S3 Upload action parameters"""
|
||||
s3_upload_parameters.desc:
|
||||
"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content."""
|
||||
|
||||
s3_object_content.label:
|
||||
"""S3 Object Content"""
|
||||
s3_object_content.desc:
|
||||
"""Content of the S3 object being uploaded. Supports templates."""
|
||||
|
||||
}
|
|
@ -9,6 +9,9 @@ secret_access_key.desc:
|
|||
bucket.desc:
|
||||
"""The name of the S3 bucket."""
|
||||
|
||||
key.desc:
|
||||
"""Key of the S3 object being manipulated."""
|
||||
|
||||
host.desc:
|
||||
"""The host of the S3 endpoint."""
|
||||
|
||||
|
|
Loading…
Reference in New Issue