diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src index 6066e2495..74d7dc94f 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kinesis, [ {description, "EMQX Enterprise Amazon Kinesis Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl index d98e7ab11..14e197113 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl @@ -62,12 +62,10 @@ fields(connector_config) -> } )}, {aws_secret_access_key, - mk( - binary(), + emqx_schema_secret:mk( #{ required => true, - desc => ?DESC("aws_secret_access_key"), - sensitive => true + desc => ?DESC("aws_secret_access_key") } )}, {endpoint, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl index d9dc0220f..959b539a0 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl @@ -97,7 +97,13 @@ init(#{ partition_key => PartitionKey, stream_name => StreamName }, - New = + %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords. + ok = erlcloud_config:configure( + to_str(AwsAccessKey), + to_str(emqx_secret:unwrap(AwsSecretAccessKey)), + Host, + Port, + Scheme, fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) -> Config0 = erlcloud_kinesis:new( AccessKeyID, @@ -107,9 +113,7 @@ init(#{ ConnectionScheme ++ "://" ), Config0#aws_config{retry_num = MaxRetries} - end, - erlcloud_config:configure( - to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New + end ), % check the connection case erlcloud_kinesis:list_streams() of diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 1e07ae96e..decf3e83b 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -15,7 +15,7 @@ -type config() :: #{ aws_access_key_id := binary(), - aws_secret_access_key := binary(), + aws_secret_access_key := emqx_secret:t(binary()), endpoint := binary(), stream_name := binary(), partition_key := binary(), diff --git a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl index ea926fc33..61b354ea3 100644 --- a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl @@ -11,10 +11,11 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(PRODUCER, emqx_bridge_kinesis_impl_producer). -define(BRIDGE_TYPE, kinesis_producer). -define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>). -define(KINESIS_PORT, 4566). +-define(KINESIS_ACCESS_KEY, "aws_access_key_id"). +-define(KINESIS_SECRET_KEY, "aws_secret_access_key"). -define(TOPIC, <<"t/topic">>). %%------------------------------------------------------------------------------ @@ -38,6 +39,8 @@ init_per_suite(Config) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), ProxyName = "kinesis", + SecretFile = filename:join(?config(priv_dir, Config), "secret"), + ok = file:write_file(SecretFile, <>), ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), {ok, _} = application:ensure_all_started(emqx_connector), @@ -46,6 +49,7 @@ init_per_suite(Config) -> {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, {kinesis_port, ?KINESIS_PORT}, + {kinesis_secretfile, SecretFile}, {proxy_name, ProxyName} | Config ]. @@ -130,6 +134,7 @@ kinesis_config(Config) -> Scheme = proplists:get_value(connection_scheme, Config, "http"), ProxyHost = proplists:get_value(proxy_host, Config), KinesisPort = proplists:get_value(kinesis_port, Config), + SecretFile = proplists:get_value(kinesis_secretfile, Config), BatchSize = proplists:get_value(batch_size, Config, 100), BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>), PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"), @@ -140,29 +145,32 @@ kinesis_config(Config) -> Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>, ConfigString = io_lib:format( - "bridges.kinesis_producer.~s {\n" - " enable = true\n" - " aws_access_key_id = \"aws_access_key_id\"\n" - " aws_secret_access_key = \"aws_secret_access_key\"\n" - " endpoint = \"~s://~s:~b\"\n" - " stream_name = \"~s\"\n" - " partition_key = \"~s\"\n" - " payload_template = \"~s\"\n" - " max_retries = ~b\n" - " pool_size = 1\n" - " resource_opts = {\n" - " health_check_interval = \"3s\"\n" - " request_ttl = 30s\n" - " resume_interval = 1s\n" - " metrics_flush_interval = \"700ms\"\n" - " worker_pool_size = 1\n" - " query_mode = ~s\n" - " batch_size = ~b\n" - " batch_time = \"~s\"\n" - " }\n" - "}\n", + "bridges.kinesis_producer.~s {" + "\n enable = true" + "\n aws_access_key_id = ~p" + "\n aws_secret_access_key = ~p" + "\n endpoint = \"~s://~s:~b\"" + "\n stream_name = \"~s\"" + "\n partition_key = \"~s\"" + "\n payload_template = \"~s\"" + "\n max_retries = ~b" + "\n pool_size = 1" + "\n resource_opts = {" + "\n health_check_interval = \"3s\"" + "\n request_ttl = 30s" + "\n resume_interval = 1s" + "\n metrics_flush_interval = \"700ms\"" + "\n worker_pool_size = 1" + "\n query_mode = ~s" + "\n batch_size = ~b" + "\n batch_time = \"~s\"" + "\n }" + "\n }", [ Name, + ?KINESIS_ACCESS_KEY, + %% NOTE: using file-based secrets with HOCON configs. + "file://" ++ SecretFile, Scheme, ProxyHost, KinesisPort, @@ -203,9 +211,6 @@ delete_bridge(Config) -> ct:pal("deleting bridge ~p", [{Type, Name}]), emqx_bridge:remove(Type, Name). -create_bridge_http(Config) -> - create_bridge_http(Config, _KinesisConfigOverrides = #{}). - create_bridge_http(Config, KinesisConfigOverrides) -> TypeBin = ?BRIDGE_TYPE_BIN, Name = ?config(kinesis_name, Config), @@ -489,7 +494,11 @@ to_bin(Str) when is_list(Str) -> %%------------------------------------------------------------------------------ t_create_via_http(Config) -> - ?assertMatch({ok, _}, create_bridge_http(Config)), + Overrides = #{ + %% NOTE: using literal secret with HTTP API requests. + <<"aws_secret_access_key">> => <> + }, + ?assertMatch({ok, _}, create_bridge_http(Config, Overrides)), ok. t_start_failed_then_fix(Config) ->