feat(kinesis): accept wrapped secrets as passwords

This commit is contained in:
Andrew Mayorov 2023-11-09 17:45:37 +07:00
parent 93eaf0caee
commit e2b7b33d14
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 47 additions and 36 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kinesis, [ {application, emqx_bridge_kinesis, [
{description, "EMQX Enterprise Amazon Kinesis Bridge"}, {description, "EMQX Enterprise Amazon Kinesis Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -62,12 +62,10 @@ fields(connector_config) ->
} }
)}, )},
{aws_secret_access_key, {aws_secret_access_key,
mk( emqx_schema_secret:mk(
binary(),
#{ #{
required => true, required => true,
desc => ?DESC("aws_secret_access_key"), desc => ?DESC("aws_secret_access_key")
sensitive => true
} }
)}, )},
{endpoint, {endpoint,

View File

@ -97,7 +97,13 @@ init(#{
partition_key => PartitionKey, partition_key => PartitionKey,
stream_name => StreamName stream_name => StreamName
}, },
New = %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords.
ok = erlcloud_config:configure(
to_str(AwsAccessKey),
to_str(emqx_secret:unwrap(AwsSecretAccessKey)),
Host,
Port,
Scheme,
fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) -> fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) ->
Config0 = erlcloud_kinesis:new( Config0 = erlcloud_kinesis:new(
AccessKeyID, AccessKeyID,
@ -107,9 +113,7 @@ init(#{
ConnectionScheme ++ "://" ConnectionScheme ++ "://"
), ),
Config0#aws_config{retry_num = MaxRetries} Config0#aws_config{retry_num = MaxRetries}
end, end
erlcloud_config:configure(
to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New
), ),
% check the connection % check the connection
case erlcloud_kinesis:list_streams() of case erlcloud_kinesis:list_streams() of

View File

@ -15,7 +15,7 @@
-type config() :: #{ -type config() :: #{
aws_access_key_id := binary(), aws_access_key_id := binary(),
aws_secret_access_key := binary(), aws_secret_access_key := emqx_secret:t(binary()),
endpoint := binary(), endpoint := binary(),
stream_name := binary(), stream_name := binary(),
partition_key := binary(), partition_key := binary(),

View File

@ -11,10 +11,11 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(PRODUCER, emqx_bridge_kinesis_impl_producer).
-define(BRIDGE_TYPE, kinesis_producer). -define(BRIDGE_TYPE, kinesis_producer).
-define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>). -define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>).
-define(KINESIS_PORT, 4566). -define(KINESIS_PORT, 4566).
-define(KINESIS_ACCESS_KEY, "aws_access_key_id").
-define(KINESIS_SECRET_KEY, "aws_secret_access_key").
-define(TOPIC, <<"t/topic">>). -define(TOPIC, <<"t/topic">>).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -38,6 +39,8 @@ init_per_suite(Config) ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"), ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
ProxyName = "kinesis", ProxyName = "kinesis",
SecretFile = filename:join(?config(priv_dir, Config), "secret"),
ok = file:write_file(SecretFile, <<?KINESIS_SECRET_KEY>>),
ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
{ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_connector),
@ -46,6 +49,7 @@ init_per_suite(Config) ->
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
{proxy_port, ProxyPort}, {proxy_port, ProxyPort},
{kinesis_port, ?KINESIS_PORT}, {kinesis_port, ?KINESIS_PORT},
{kinesis_secretfile, SecretFile},
{proxy_name, ProxyName} {proxy_name, ProxyName}
| Config | Config
]. ].
@ -130,6 +134,7 @@ kinesis_config(Config) ->
Scheme = proplists:get_value(connection_scheme, Config, "http"), Scheme = proplists:get_value(connection_scheme, Config, "http"),
ProxyHost = proplists:get_value(proxy_host, Config), ProxyHost = proplists:get_value(proxy_host, Config),
KinesisPort = proplists:get_value(kinesis_port, Config), KinesisPort = proplists:get_value(kinesis_port, Config),
SecretFile = proplists:get_value(kinesis_secretfile, Config),
BatchSize = proplists:get_value(batch_size, Config, 100), BatchSize = proplists:get_value(batch_size, Config, 100),
BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>), BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>),
PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"), PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"),
@ -140,29 +145,32 @@ kinesis_config(Config) ->
Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>, Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>,
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.kinesis_producer.~s {\n" "bridges.kinesis_producer.~s {"
" enable = true\n" "\n enable = true"
" aws_access_key_id = \"aws_access_key_id\"\n" "\n aws_access_key_id = ~p"
" aws_secret_access_key = \"aws_secret_access_key\"\n" "\n aws_secret_access_key = ~p"
" endpoint = \"~s://~s:~b\"\n" "\n endpoint = \"~s://~s:~b\""
" stream_name = \"~s\"\n" "\n stream_name = \"~s\""
" partition_key = \"~s\"\n" "\n partition_key = \"~s\""
" payload_template = \"~s\"\n" "\n payload_template = \"~s\""
" max_retries = ~b\n" "\n max_retries = ~b"
" pool_size = 1\n" "\n pool_size = 1"
" resource_opts = {\n" "\n resource_opts = {"
" health_check_interval = \"3s\"\n" "\n health_check_interval = \"3s\""
" request_ttl = 30s\n" "\n request_ttl = 30s"
" resume_interval = 1s\n" "\n resume_interval = 1s"
" metrics_flush_interval = \"700ms\"\n" "\n metrics_flush_interval = \"700ms\""
" worker_pool_size = 1\n" "\n worker_pool_size = 1"
" query_mode = ~s\n" "\n query_mode = ~s"
" batch_size = ~b\n" "\n batch_size = ~b"
" batch_time = \"~s\"\n" "\n batch_time = \"~s\""
" }\n" "\n }"
"}\n", "\n }",
[ [
Name, Name,
?KINESIS_ACCESS_KEY,
%% NOTE: using file-based secrets with HOCON configs.
"file://" ++ SecretFile,
Scheme, Scheme,
ProxyHost, ProxyHost,
KinesisPort, KinesisPort,
@ -203,9 +211,6 @@ delete_bridge(Config) ->
ct:pal("deleting bridge ~p", [{Type, Name}]), ct:pal("deleting bridge ~p", [{Type, Name}]),
emqx_bridge:remove(Type, Name). emqx_bridge:remove(Type, Name).
create_bridge_http(Config) ->
create_bridge_http(Config, _KinesisConfigOverrides = #{}).
create_bridge_http(Config, KinesisConfigOverrides) -> create_bridge_http(Config, KinesisConfigOverrides) ->
TypeBin = ?BRIDGE_TYPE_BIN, TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kinesis_name, Config), Name = ?config(kinesis_name, Config),
@ -489,7 +494,11 @@ to_bin(Str) when is_list(Str) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_create_via_http(Config) -> t_create_via_http(Config) ->
?assertMatch({ok, _}, create_bridge_http(Config)), Overrides = #{
%% NOTE: using literal secret with HTTP API requests.
<<"aws_secret_access_key">> => <<?KINESIS_SECRET_KEY>>
},
?assertMatch({ok, _}, create_bridge_http(Config, Overrides)),
ok. ok.
t_start_failed_then_fix(Config) -> t_start_failed_then_fix(Config) ->