diff --git a/.ci/docker-compose-file/credentials.env b/.ci/docker-compose-file/credentials.env new file mode 100644 index 000000000..50cc83a3f --- /dev/null +++ b/.ci/docker-compose-file/credentials.env @@ -0,0 +1,7 @@ +MONGO_USERNAME=emqx +MONGO_PASSWORD=passw0rd +MONGO_AUTHSOURCE=admin + +# See "Environment Variables" @ https://hub.docker.com/_/mongo +MONGO_INITDB_ROOT_USERNAME=${MONGO_USERNAME} +MONGO_INITDB_ROOT_PASSWORD=${MONGO_PASSWORD} diff --git a/.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml b/.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml index 39f37e66c..0eae6c358 100644 --- a/.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml @@ -9,6 +9,9 @@ services: - emqx_bridge ports: - "27017:27017" + env_file: + - .env + - credentials.env command: --ipv6 --bind_ip_all diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml index d4a44bfb0..f3943b010 100644 --- a/.ci/docker-compose-file/docker-compose.yaml +++ b/.ci/docker-compose-file/docker-compose.yaml @@ -5,6 +5,7 @@ services: container_name: erlang image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04} env_file: + - credentials.env - conf.env environment: GITHUB_ACTIONS: ${GITHUB_ACTIONS:-} diff --git a/apps/emqx_auth_mongodb/test/emqx_authn_mongodb_SUITE.erl b/apps/emqx_auth_mongodb/test/emqx_authn_mongodb_SUITE.erl index c6623c11f..9ccad551d 100644 --- a/apps/emqx_auth_mongodb/test/emqx_authn_mongodb_SUITE.erl +++ b/apps/emqx_auth_mongodb/test/emqx_authn_mongodb_SUITE.erl @@ -278,6 +278,10 @@ raw_mongo_auth_config() -> <<"server">> => mongo_server(), <<"w_mode">> => <<"unsafe">>, + <<"auth_source">> => mongo_authsource(), + <<"username">> => mongo_username(), + <<"password">> => mongo_password(), + <<"filter">> => #{<<"username">> => <<"${username}">>}, <<"password_hash_field">> => <<"password_hash">>, <<"salt_field">> => <<"salt">>, @@ -464,9 +468,21 @@ mongo_config() -> {database, <<"mqtt">>}, {host, ?MONGO_HOST}, {port, ?MONGO_DEFAULT_PORT}, + {auth_source, mongo_authsource()}, + {login, mongo_username()}, + {password, mongo_password()}, {register, ?MONGO_CLIENT} ]. +mongo_authsource() -> + iolist_to_binary(os:getenv("MONGO_AUTHSOURCE", "admin")). + +mongo_username() -> + iolist_to_binary(os:getenv("MONGO_USERNAME", "")). + +mongo_password() -> + iolist_to_binary(os:getenv("MONGO_PASSWORD", "")). + start_apps(Apps) -> lists:foreach(fun application:ensure_all_started/1, Apps). diff --git a/apps/emqx_auth_mongodb/test/emqx_authz_mongodb_SUITE.erl b/apps/emqx_auth_mongodb/test/emqx_authz_mongodb_SUITE.erl index c57dce860..b19d7fba2 100644 --- a/apps/emqx_auth_mongodb/test/emqx_authz_mongodb_SUITE.erl +++ b/apps/emqx_auth_mongodb/test/emqx_authz_mongodb_SUITE.erl @@ -397,6 +397,10 @@ raw_mongo_authz_config() -> <<"collection">> => <<"acl">>, <<"server">> => mongo_server(), + <<"auth_source">> => mongo_authsource(), + <<"username">> => mongo_username(), + <<"password">> => mongo_password(), + <<"filter">> => #{<<"username">> => <<"${username}">>} }. @@ -408,9 +412,21 @@ mongo_config() -> {database, <<"mqtt">>}, {host, ?MONGO_HOST}, {port, ?MONGO_DEFAULT_PORT}, + {auth_source, mongo_authsource()}, + {login, mongo_username()}, + {password, mongo_password()}, {register, ?MONGO_CLIENT} ]. +mongo_authsource() -> + iolist_to_binary(os:getenv("MONGO_AUTHSOURCE", "admin")). + +mongo_username() -> + iolist_to_binary(os:getenv("MONGO_USERNAME", "")). + +mongo_password() -> + iolist_to_binary(os:getenv("MONGO_PASSWORD", "")). + start_apps(Apps) -> lists:foreach(fun application:ensure_all_started/1, Apps). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index c7646faf4..20b3b08f6 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -356,9 +356,10 @@ parse_confs(<<"iotdb">>, Name, Conf) -> authentication := #{ username := Username, - password := Password + password := Secret } } = Conf, + Password = emqx_secret:unwrap(Secret), BasicToken = base64:encode(<>), %% This version atom correspond to the macro ?VSN_1_1_X in %% emqx_bridge_iotdb.hrl. It would be better to use the macro directly, but diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index afea652ef..e29dc7931 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -70,7 +70,7 @@ cassandra_db_fields() -> {keyspace, fun keyspace/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {username, fun emqx_connector_schema_lib:username/1}, - {password, fun emqx_connector_schema_lib:password/1}, + {password, emqx_connector_schema_lib:password_field()}, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ]. @@ -111,14 +111,14 @@ on_start( emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION) ), - Options = [ - {nodes, Servers}, - {keyspace, Keyspace}, - {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, - {pool_size, PoolSize} - ], - Options1 = maybe_add_opt(username, Config, Options), - Options2 = maybe_add_opt(password, Config, Options1, _IsSensitive = true), + Options = + maps:to_list(maps:with([username, password], Config)) ++ + [ + {nodes, Servers}, + {keyspace, Keyspace}, + {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, + {pool_size, PoolSize} + ], SslOpts = case maps:get(enable, SSL) of @@ -131,7 +131,7 @@ on_start( [] end, State = parse_prepare_cql(Config), - case emqx_resource_pool:start(InstId, ?MODULE, Options2 ++ SslOpts) of + case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {error, Reason} -> @@ -387,6 +387,7 @@ conn_opts(Opts) -> conn_opts([], Acc) -> Acc; conn_opts([{password, Password} | Opts], Acc) -> + %% TODO: teach `ecql` to accept 0-arity closures as passwords. conn_opts(Opts, [{password, emqx_secret:unwrap(Password)} | Acc]); conn_opts([Opt | Opts], Acc) -> conn_opts(Opts, [Opt | Acc]). @@ -512,19 +513,3 @@ maybe_assign_type(V) when is_integer(V) -> maybe_assign_type(V) when is_float(V) -> {double, V}; maybe_assign_type(V) -> V. - -maybe_add_opt(Key, Conf, Opts) -> - maybe_add_opt(Key, Conf, Opts, _IsSensitive = false). - -maybe_add_opt(Key, Conf, Opts, IsSensitive) -> - case Conf of - #{Key := Val} -> - [{Key, maybe_wrap(IsSensitive, Val)} | Opts]; - _ -> - Opts - end. - -maybe_wrap(false = _IsSensitive, Val) -> - Val; -maybe_wrap(true, Val) -> - emqx_secret:wrap(Val). diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl index fcd482b47..de306e3f0 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl @@ -40,10 +40,9 @@ all() -> ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), [ - {auth, TCs}, - {noauth, TCs} + {auth, [t_lifecycle, t_start_passfile]}, + {noauth, [t_lifecycle]} ]. cassandra_servers(CassandraHost) -> @@ -115,32 +114,37 @@ end_per_testcase(_, _Config) -> t_lifecycle(Config) -> perform_lifecycle_check( - <<"emqx_connector_cassandra_SUITE">>, + <>, cassandra_config(Config) ). -show(X) -> - erlang:display(X), - X. - -show(Label, What) -> - erlang:display({Label, What}), - What. +t_start_passfile(Config) -> + ResourceID = atom_to_binary(?FUNCTION_NAME), + PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(PasswordFilename, ?CASSA_PASSWORD), + InitialConfig = emqx_utils_maps:deep_merge( + cassandra_config(Config), + #{ + <<"config">> => #{ + password => iolist_to_binary(["file://", PasswordFilename]) + } + } + ), + ?assertMatch( + #{status := connected}, + create_local_resource(ResourceID, check_config(InitialConfig)) + ), + ?assertEqual( + ok, + emqx_resource:remove_local(ResourceID) + ). perform_lifecycle_check(ResourceId, InitialConfig) -> - {ok, #{config := CheckedConfig}} = - emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, InitialConfig), - {ok, #{ + CheckedConfig = check_config(InitialConfig), + #{ state := #{pool_name := PoolName} = State, status := InitialStatus - }} = - emqx_resource:create_local( - ResourceId, - ?CONNECTOR_RESOURCE_GROUP, - ?CASSANDRA_RESOURCE_MOD, - CheckedConfig, - #{} - ), + } = create_local_resource(ResourceId, CheckedConfig), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource {ok, ?CONNECTOR_RESOURCE_GROUP, #{ @@ -191,6 +195,21 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> %% utils %%-------------------------------------------------------------------- +check_config(Config) -> + {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, Config), + CheckedConfig. + +create_local_resource(ResourceId, CheckedConfig) -> + {ok, Bridge} = + emqx_resource:create_local( + ResourceId, + ?CONNECTOR_RESOURCE_GROUP, + ?CASSANDRA_RESOURCE_MOD, + CheckedConfig, + #{} + ), + Bridge. + cassandra_config(Config) -> Host = ?config(cassa_host, Config), AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)), diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index 4f7519440..85c035be1 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 97b855ad2..8f575dd8d 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -145,7 +145,7 @@ on_start( Options = [ {url, URL}, {user, maps:get(username, Config, "default")}, - {key, emqx_secret:wrap(maps:get(password, Config, "public"))}, + {key, maps:get(password, Config, emqx_secret:wrap("public"))}, {database, DB}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize}, @@ -243,6 +243,7 @@ connect(Options) -> URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))), User = proplists:get_value(user, Options), Database = proplists:get_value(database, Options), + %% TODO: teach `clickhouse` to accept 0-arity closures as passwords. Key = emqx_secret:unwrap(proplists:get_value(key, Options)), Pool = proplists:get_value(pool, Options), PoolSize = proplists:get_value(pool_size, Options), diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl index 12d678e85..e1d3149db 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl @@ -10,10 +10,12 @@ -include("emqx_connector.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). -define(APP, emqx_bridge_clickhouse). -define(CLICKHOUSE_HOST, "clickhouse"). -define(CLICKHOUSE_RESOURCE_MOD, emqx_bridge_clickhouse_connector). +-define(CLICKHOUSE_PASSWORD, "public"). %% This test SUITE requires a running clickhouse instance. If you don't want to %% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script @@ -57,7 +59,7 @@ init_per_suite(Config) -> clickhouse:start_link([ {url, clickhouse_url()}, {user, <<"default">>}, - {key, "public"}, + {key, ?CLICKHOUSE_PASSWORD}, {pool, tmp_pool} ]), {ok, _, _} = clickhouse:query(Conn, <<"CREATE DATABASE IF NOT EXISTS mqtt">>, #{}), @@ -92,6 +94,31 @@ t_lifecycle(_Config) -> clickhouse_config() ). +t_start_passfile(Config) -> + ResourceID = atom_to_binary(?FUNCTION_NAME), + PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(PasswordFilename, <>), + InitialConfig = clickhouse_config(#{ + password => iolist_to_binary(["file://", PasswordFilename]) + }), + {ok, #{config := ResourceConfig}} = + emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig), + ?assertMatch( + {ok, #{status := connected}}, + emqx_resource:create_local( + ResourceID, + ?CONNECTOR_RESOURCE_GROUP, + ?CLICKHOUSE_RESOURCE_MOD, + ResourceConfig, + #{} + ) + ), + ?assertEqual( + ok, + emqx_resource:remove_local(ResourceID) + ), + ok. + show(X) -> erlang:display(X), X. @@ -168,12 +195,15 @@ perform_lifecycle_check(ResourceID, InitialConfig) -> % %%------------------------------------------------------------------------------ clickhouse_config() -> + clickhouse_config(#{}). + +clickhouse_config(Overrides) -> Config = #{ auto_reconnect => true, database => <<"mqtt">>, username => <<"default">>, - password => <<"public">>, + password => <>, pool_size => 8, url => iolist_to_binary( io_lib:format( @@ -186,7 +216,7 @@ clickhouse_config() -> ), connect_timeout => <<"10s">> }, - #{<<"config">> => Config}. + #{<<"config">> => maps:merge(Config, Overrides)}. test_query_no_params() -> {query, <<"SELECT 1">>}. diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index ed5078432..a4b372056 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 0d62845fd..9cdb8886c 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -45,12 +45,10 @@ fields(config) -> #{required => true, desc => ?DESC("aws_access_key_id")} )}, {aws_secret_access_key, - mk( - binary(), + emqx_schema_secret:mk( #{ required => true, - desc => ?DESC("aws_secret_access_key"), - sensitive => true + desc => ?DESC("aws_secret_access_key") } )}, {pool_size, fun emqx_connector_schema_lib:pool_size/1}, @@ -89,7 +87,7 @@ on_start( host => Host, port => Port, aws_access_key_id => to_str(AccessKeyID), - aws_secret_access_key => to_str(SecretAccessKey), + aws_secret_access_key => SecretAccessKey, schema => Schema }}, {pool_size, PoolSize} @@ -182,9 +180,8 @@ do_query( end. connect(Opts) -> - Options = proplists:get_value(config, Opts), - {ok, _Pid} = Result = emqx_bridge_dynamo_connector_client:start_link(Options), - Result. + Config = proplists:get_value(config, Opts), + {ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config). parse_template(Config) -> Templates = diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index 1b379298f..1cb326cf7 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -20,8 +20,7 @@ handle_cast/2, handle_info/2, terminate/2, - code_change/3, - format_status/2 + code_change/3 ]). -ifdef(TEST). @@ -62,11 +61,13 @@ start_link(Options) -> %% Initialize dynamodb data bridge init(#{ aws_access_key_id := AccessKeyID, - aws_secret_access_key := SecretAccessKey, + aws_secret_access_key := Secret, host := Host, port := Port, schema := Schema }) -> + %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords. + SecretAccessKey = to_str(emqx_secret:unwrap(Secret)), erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), {ok, #{}}. @@ -101,13 +102,6 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. --spec format_status( - Opt :: normal | terminate, - Status :: list() -) -> Status :: term(). -format_status(_Opt, Status) -> - Status. - %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -184,3 +178,8 @@ convert2binary(Value) when is_list(Value) -> unicode:characters_to_binary(Value); convert2binary(Value) when is_map(Value) -> emqx_utils_json:encode(Value). + +to_str(List) when is_list(List) -> + List; +to_str(Bin) when is_binary(Bin) -> + erlang:binary_to_list(Bin). diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index 9490e6455..936d2d506 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -22,8 +22,6 @@ -define(BATCH_SIZE, 10). -define(PAYLOAD, <<"HELLO">>). --define(GET_CONFIG(KEY__, CFG__), proplists:get_value(KEY__, CFG__)). - %% How to run it locally (all commands are run in $PROJ_ROOT dir): %% run ct in docker container %% run script: @@ -84,7 +82,9 @@ end_per_group(_Group, _Config) -> ok. init_per_suite(Config) -> - Config. + SecretFile = filename:join(?config(priv_dir, Config), "secret"), + ok = file:write_file(SecretFile, <>), + [{dynamo_secretfile, SecretFile} | Config]. end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), @@ -158,32 +158,35 @@ common_init(ConfigT) -> end. dynamo_config(BridgeType, Config) -> - Port = integer_to_list(?GET_CONFIG(port, Config)), - Url = "http://" ++ ?GET_CONFIG(host, Config) ++ ":" ++ Port, + Host = ?config(host, Config), + Port = ?config(port, Config), Name = atom_to_binary(?MODULE), - BatchSize = ?GET_CONFIG(batch_size, Config), - QueryMode = ?GET_CONFIG(query_mode, Config), + BatchSize = ?config(batch_size, Config), + QueryMode = ?config(query_mode, Config), + SecretFile = ?config(dynamo_secretfile, Config), ConfigString = io_lib:format( - "bridges.~s.~s {\n" - " enable = true\n" - " url = ~p\n" - " table = ~p\n" - " aws_access_key_id = ~p\n" - " aws_secret_access_key = ~p\n" - " resource_opts = {\n" - " request_ttl = 500ms\n" - " batch_size = ~b\n" - " query_mode = ~s\n" - " }\n" - "}", + "bridges.~s.~s {" + "\n enable = true" + "\n url = \"http://~s:~p\"" + "\n table = ~p" + "\n aws_access_key_id = ~p" + "\n aws_secret_access_key = ~p" + "\n resource_opts = {" + "\n request_ttl = 500ms" + "\n batch_size = ~b" + "\n query_mode = ~s" + "\n }" + "\n }", [ BridgeType, Name, - Url, + Host, + Port, ?TABLE, ?ACCESS_KEY_ID, - ?SECRET_ACCESS_KEY, + %% NOTE: using file-based secrets with HOCON configs + "file://" ++ SecretFile, BatchSize, QueryMode ] @@ -252,8 +255,8 @@ delete_table(_Config) -> erlcloud_ddb2:delete_table(?TABLE_BIN). setup_dynamo(Config) -> - Host = ?GET_CONFIG(host, Config), - Port = ?GET_CONFIG(port, Config), + Host = ?config(host, Config), + Port = ?config(port, Config), erlcloud_ddb2:configure(?ACCESS_KEY_ID, ?SECRET_ACCESS_KEY, Host, Port, ?SCHEMA). directly_setup_dynamo() -> @@ -313,7 +316,9 @@ t_setup_via_http_api_and_publish(Config) -> PgsqlConfig0 = ?config(dynamo_config, Config), PgsqlConfig = PgsqlConfig0#{ <<"name">> => Name, - <<"type">> => BridgeType + <<"type">> => BridgeType, + %% NOTE: using literal secret with HTTP API requests. + <<"aws_secret_access_key">> => <> }, ?assertMatch( {ok, _}, @@ -400,7 +405,7 @@ t_simple_query(Config) -> ), Request = {get_item, {<<"id">>, <<"not_exists">>}}, Result = query_resource(Config, Request), - case ?GET_CONFIG(batch_size, Config) of + case ?config(batch_size, Config) of ?BATCH_SIZE -> ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result); 1 -> diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index 4c5a15b79..a8a938a0b 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_greptimedb, [ {description, "EMQX GreptimeDB Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index ff4ba313e..d588f7f8c 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -147,13 +147,7 @@ fields(greptimedb) -> [ {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})}, {username, mk(binary(), #{desc => ?DESC("username")})}, - {password, - mk(binary(), #{ - desc => ?DESC("password"), - format => <<"password">>, - sensitive => true, - converter => fun emqx_schema:password_converter/2 - })} + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} ] ++ emqx_connector_schema_lib:ssl_fields(). server() -> @@ -302,7 +296,8 @@ ssl_config(SSL = #{enable := true}) -> auth(#{username := Username, password := Password}) -> [ - {auth, {basic, #{username => str(Username), password => str(Password)}}} + %% TODO: teach `greptimedb` to accept 0-arity closures as passwords. + {auth, {basic, #{username => str(Username), password => emqx_secret:unwrap(Password)}}} ]; auth(_) -> []. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index 27fe1659c..c6236d97c 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index c8053a53d..2b4fb8d74 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -192,20 +192,14 @@ fields(influxdb_api_v1) -> [ {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, {username, mk(binary(), #{desc => ?DESC("username")})}, - {password, - mk(binary(), #{ - desc => ?DESC("password"), - format => <<"password">>, - sensitive => true, - converter => fun emqx_schema:password_converter/2 - })} + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} ] ++ emqx_connector_schema_lib:ssl_fields(); fields(influxdb_api_v2) -> fields(common) ++ [ {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, - {token, mk(binary(), #{required => true, desc => ?DESC("token")})} + {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})} ] ++ emqx_connector_schema_lib:ssl_fields(). server() -> @@ -363,7 +357,8 @@ protocol_config(#{ {version, v2}, {bucket, str(Bucket)}, {org, str(Org)}, - {token, Token} + %% TODO: teach `influxdb` to accept 0-arity closures as passwords. + {token, emqx_secret:unwrap(Token)} ] ++ ssl_config(SSL). ssl_config(#{enable := false}) -> @@ -383,7 +378,8 @@ username(_) -> []. password(#{password := Password}) -> - [{password, str(Password)}]; + %% TODO: teach `influxdb` to accept 0-arity closures as passwords. + [{password, str(emqx_secret:unwrap(Password))}]; password(_) -> []. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index b79c4c2ce..42b3c165f 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_impl diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 38dfebe97..25bafbd00 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -51,12 +51,9 @@ fields(auth_basic) -> [ {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, {password, - mk(binary(), #{ + emqx_schema_secret:mk(#{ required => true, - desc => ?DESC("config_auth_basic_password"), - format => <<"password">>, - sensitive => true, - converter => fun emqx_schema:password_converter/2 + desc => ?DESC("config_auth_basic_password") })} ]. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 5b83a6af2..800a87601 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -283,11 +283,9 @@ fields(auth_username_password) -> })}, {username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})}, {password, - mk(binary(), #{ + emqx_connector_schema_lib:password_field(#{ required => true, - sensitive => true, - desc => ?DESC(auth_sasl_password), - converter => fun emqx_schema:password_converter/2 + desc => ?DESC(auth_sasl_password) })} ]; fields(auth_gssapi_kerberos) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl index b3ad2ca36..eb8f36fb5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl @@ -31,8 +31,8 @@ make_client_id(BridgeType0, BridgeName0) -> sasl(none) -> undefined; -sasl(#{mechanism := Mechanism, username := Username, password := Password}) -> - {Mechanism, Username, emqx_secret:wrap(Password)}; +sasl(#{mechanism := Mechanism, username := Username, password := Secret}) -> + {Mechanism, Username, Secret}; sasl(#{ kerberos_principal := Principal, kerberos_keytab_file := KeyTabFile diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 943f30629..4fd08c154 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -30,29 +30,41 @@ all() -> ]. groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - SASLAuths = [ - sasl_auth_plain, - sasl_auth_scram256, - sasl_auth_scram512, - sasl_auth_kerberos + SASLGroups = [ + {sasl_auth_plain, testcases(sasl)}, + {sasl_auth_scram256, testcases(sasl)}, + {sasl_auth_scram512, testcases(sasl)}, + {sasl_auth_kerberos, testcases(sasl_auth_kerberos)} ], - SASLAuthGroups = [{group, Type} || Type <- SASLAuths], - OnlyOnceTCs = only_once_tests(), - MatrixTCs = AllTCs -- OnlyOnceTCs, - SASLTests = [{Group, MatrixTCs} || Group <- SASLAuths], + SASLAuthGroups = [{group, Group} || {Group, _} <- SASLGroups], [ - {plain, MatrixTCs ++ OnlyOnceTCs}, - {ssl, MatrixTCs}, + {plain, testcases(plain)}, + {ssl, testcases(common)}, {sasl_plain, SASLAuthGroups}, {sasl_ssl, SASLAuthGroups} - ] ++ SASLTests. + | SASLGroups + ]. -sasl_only_tests() -> - [t_failed_creation_then_fixed]. - -%% tests that do not need to be run on all groups -only_once_tests() -> +testcases(all) -> + emqx_common_test_helpers:all(?MODULE); +testcases(plain) -> + %% NOTE: relevant only for a subset of SASL testcases + Exclude = [t_failed_creation_then_fixed], + testcases(all) -- Exclude; +testcases(common) -> + testcases(plain) -- testcases(once); +testcases(sasl) -> + testcases(all) -- testcases(once); +testcases(sasl_auth_kerberos) -> + %% NOTE: need a proxy to run these tests + Exclude = [ + t_failed_creation_then_fixed, + t_on_get_status, + t_receive_after_recovery + ], + testcases(sasl) -- Exclude; +testcases(once) -> + %% tests that do not need to be run on all groups [ t_begin_offset_earliest, t_bridge_rule_action_source, @@ -220,7 +232,7 @@ init_per_group(sasl_auth_kerberos, Config0) -> (KV) -> KV end, - [{has_proxy, false}, {sasl_auth_mechanism, kerberos} | Config0] + [{sasl_auth_mechanism, kerberos} | Config0] ), Config; init_per_group(_Group, Config) -> @@ -264,43 +276,6 @@ end_per_group(Group, Config) when end_per_group(_Group, _Config) -> ok. -init_per_testcase(TestCase, Config) when - TestCase =:= t_failed_creation_then_fixed --> - KafkaType = ?config(kafka_type, Config), - AuthMechanism = ?config(sasl_auth_mechanism, Config), - IsSASL = lists:member(KafkaType, [sasl_plain, sasl_ssl]), - case {IsSASL, AuthMechanism} of - {true, kerberos} -> - [{skip_does_not_apply, true}]; - {true, _} -> - common_init_per_testcase(TestCase, Config); - {false, _} -> - [{skip_does_not_apply, true}] - end; -init_per_testcase(TestCase, Config) when - TestCase =:= t_failed_creation_then_fixed --> - %% test with one partiton only for this case because - %% the wait probe may not be always sent to the same partition - HasProxy = proplists:get_value(has_proxy, Config, true), - case HasProxy of - false -> - [{skip_does_not_apply, true}]; - true -> - common_init_per_testcase(TestCase, [{num_partitions, 1} | Config]) - end; -init_per_testcase(TestCase, Config) when - TestCase =:= t_on_get_status; - TestCase =:= t_receive_after_recovery --> - HasProxy = proplists:get_value(has_proxy, Config, true), - case HasProxy of - false -> - [{skip_does_not_apply, true}]; - true -> - common_init_per_testcase(TestCase, Config) - end; init_per_testcase(t_cluster_group = TestCase, Config0) -> Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]), common_init_per_testcase(TestCase, Config); @@ -393,30 +368,24 @@ common_init_per_testcase(TestCase, Config0) -> ]. end_per_testcase(_Testcase, Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - ProducersConfigs = ?config(kafka_producers, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - delete_all_bridges(), - #{clientid := KafkaProducerClientId, producers := ProducersMapping} = - ProducersConfigs, - lists:foreach( - fun(Producers) -> - ok = wolff:stop_and_delete_supervised_producers(Producers) - end, - maps:values(ProducersMapping) - ), - ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), - %% in CI, apparently this needs more time since the - %% machines struggle with all the containers running... - emqx_common_test_helpers:call_janitor(60_000), - ok = snabbkaffe:stop(), - ok - end. + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ProducersConfigs = ?config(kafka_producers, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + #{clientid := KafkaProducerClientId, producers := ProducersMapping} = + ProducersConfigs, + lists:foreach( + fun(Producers) -> + ok = wolff:stop_and_delete_supervised_producers(Producers) + end, + maps:values(ProducersMapping) + ), + ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(). %%------------------------------------------------------------------------------ %% Helper fns @@ -1391,14 +1360,6 @@ t_multiple_topic_mappings(Config) -> ok. t_on_get_status(Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - do_t_on_get_status(Config) - end. - -do_t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), @@ -1421,14 +1382,6 @@ do_t_on_get_status(Config) -> %% ensure that we can create and use the bridge successfully after %% creating it with bad config. t_failed_creation_then_fixed(Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ?check_trace(do_t_failed_creation_then_fixed(Config), []) - end. - -do_t_failed_creation_then_fixed(Config) -> ct:timetrap({seconds, 180}), MQTTTopic = ?config(mqtt_topic, Config), MQTTQoS = ?config(mqtt_qos, Config), @@ -1516,14 +1469,6 @@ do_t_failed_creation_then_fixed(Config) -> %% recovering from a network partition will make the subscribers %% consume the messages produced during the down time. t_receive_after_recovery(Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - do_t_receive_after_recovery(Config) - end. - -do_t_receive_after_recovery(Config) -> ct:timetrap(120_000), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index b37ef00e9..2a8a42a09 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -28,13 +28,8 @@ ). -include_lib("eunit/include/eunit.hrl"). --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx_dashboard/include/emqx_dashboard.hrl"). -define(HOST, "http://127.0.0.1:18083"). - -%% -define(API_VERSION, "v5"). - -define(BASE_PATH, "/api/v5"). %% NOTE: it's "kafka", but not "kafka_producer" @@ -48,13 +43,6 @@ %%------------------------------------------------------------------------------ all() -> - case code:get_object_code(cthr) of - {Module, Code, Filename} -> - {module, Module} = code:load_binary(Module, Filename, Code), - ok; - error -> - error - end, All0 = emqx_common_test_helpers:all(?MODULE), All = All0 -- matrix_cases(), Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), @@ -105,23 +93,12 @@ init_per_suite(Config0) -> emqx_connector, emqx_bridge_kafka, emqx_bridge, - emqx_rule_engine + emqx_rule_engine, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), - emqx_mgmt_api_test_util:init_suite(), wait_until_kafka_is_up(), - %% Wait until bridges API is up - (fun WaitUntilRestApiUp() -> - case http_get(["bridges"]) of - {ok, 200, _Res} -> - ok; - Val -> - ct:pal("REST API for bridges not up. Wait and try again. Response: ~p", [Val]), - timer:sleep(1000), - WaitUntilRestApiUp() - end - end)(), [{apps, Apps} | Config]. end_per_suite(Config) -> @@ -183,6 +160,7 @@ t_query_mode_async(CtConfig) -> t_publish(matrix) -> {publish, [ [tcp, none, key_dispatch, sync], + [ssl, plain_passfile, random, sync], [ssl, scram_sha512, random, async], [ssl, kerberos, random, sync] ]}; @@ -200,9 +178,15 @@ t_publish(Config) -> end, Auth1 = case Auth of - none -> "none"; - scram_sha512 -> valid_sasl_scram512_settings(); - kerberos -> valid_sasl_kerberos_settings() + none -> + "none"; + plain_passfile -> + Passfile = filename:join(?config(priv_dir, Config), "passfile"), + valid_sasl_plain_passfile_settings(Passfile); + scram_sha512 -> + valid_sasl_scram512_settings(); + kerberos -> + valid_sasl_kerberos_settings() end, ConnCfg = #{ "bootstrap_hosts" => Hosts, @@ -1018,112 +1002,89 @@ hocon_config(Args, ConfigTemplateFun) -> ), Hocon. -%% erlfmt-ignore hocon_config_template() -> -""" -bridges.kafka.{{ bridge_name }} { - bootstrap_hosts = \"{{ kafka_hosts_string }}\" - enable = true - authentication = {{{ authentication }}} - ssl = {{{ ssl }}} - local_topic = \"{{ local_topic }}\" - kafka = { - message = { - key = \"${clientid}\" - value = \"${.payload}\" - timestamp = \"${timestamp}\" - } - buffer = { - memory_overload_protection = false - } - partition_strategy = {{ partition_strategy }} - topic = \"{{ kafka_topic }}\" - query_mode = {{ query_mode }} - } - metadata_request_timeout = 5s - min_metadata_refresh_interval = 3s - socket_opts { - nodelay = true - } - connect_timeout = 5s -} -""". + "bridges.kafka.{{ bridge_name }} {" + "\n bootstrap_hosts = \"{{ kafka_hosts_string }}\"" + "\n enable = true" + "\n authentication = {{{ authentication }}}" + "\n ssl = {{{ ssl }}}" + "\n local_topic = \"{{ local_topic }}\"" + "\n kafka = {" + "\n message = {" + "\n key = \"${clientid}\"" + "\n value = \"${.payload}\"" + "\n timestamp = \"${timestamp}\"" + "\n }" + "\n buffer = {" + "\n memory_overload_protection = false" + "\n }" + "\n partition_strategy = {{ partition_strategy }}" + "\n topic = \"{{ kafka_topic }}\"" + "\n query_mode = {{ query_mode }}" + "\n }" + "\n metadata_request_timeout = 5s" + "\n min_metadata_refresh_interval = 3s" + "\n socket_opts {" + "\n nodelay = true" + "\n }" + "\n connect_timeout = 5s" + "\n }". -%% erlfmt-ignore hocon_config_template_with_headers() -> -""" -bridges.kafka.{{ bridge_name }} { - bootstrap_hosts = \"{{ kafka_hosts_string }}\" - enable = true - authentication = {{{ authentication }}} - ssl = {{{ ssl }}} - local_topic = \"{{ local_topic }}\" - kafka = { - message = { - key = \"${clientid}\" - value = \"${.payload}\" - timestamp = \"${timestamp}\" - } - buffer = { - memory_overload_protection = false - } - kafka_headers = \"{{ kafka_headers }}\" - kafka_header_value_encode_mode: json - kafka_ext_headers: {{{ kafka_ext_headers }}} - partition_strategy = {{ partition_strategy }} - topic = \"{{ kafka_topic }}\" - query_mode = {{ query_mode }} - } - metadata_request_timeout = 5s - min_metadata_refresh_interval = 3s - socket_opts { - nodelay = true - } - connect_timeout = 5s -} -""". + "bridges.kafka.{{ bridge_name }} {" + "\n bootstrap_hosts = \"{{ kafka_hosts_string }}\"" + "\n enable = true" + "\n authentication = {{{ authentication }}}" + "\n ssl = {{{ ssl }}}" + "\n local_topic = \"{{ local_topic }}\"" + "\n kafka = {" + "\n message = {" + "\n key = \"${clientid}\"" + "\n value = \"${.payload}\"" + "\n timestamp = \"${timestamp}\"" + "\n }" + "\n buffer = {" + "\n memory_overload_protection = false" + "\n }" + "\n kafka_headers = \"{{ kafka_headers }}\"" + "\n kafka_header_value_encode_mode: json" + "\n kafka_ext_headers: {{{ kafka_ext_headers }}}" + "\n partition_strategy = {{ partition_strategy }}" + "\n topic = \"{{ kafka_topic }}\"" + "\n query_mode = {{ query_mode }}" + "\n }" + "\n metadata_request_timeout = 5s" + "\n min_metadata_refresh_interval = 3s" + "\n socket_opts {" + "\n nodelay = true" + "\n }" + "\n connect_timeout = 5s" + "\n }". -%% erlfmt-ignore hocon_config_template_authentication("none") -> "none"; hocon_config_template_authentication(#{"mechanism" := _}) -> -""" -{ - mechanism = {{ mechanism }} - password = {{ password }} - username = {{ username }} -} -"""; + "{" + "\n mechanism = {{ mechanism }}" + "\n password = \"{{ password }}\"" + "\n username = \"{{ username }}\"" + "\n }"; hocon_config_template_authentication(#{"kerberos_principal" := _}) -> -""" -{ - kerberos_principal = \"{{ kerberos_principal }}\" - kerberos_keytab_file = \"{{ kerberos_keytab_file }}\" -} -""". + "{" + "\n kerberos_principal = \"{{ kerberos_principal }}\"" + "\n kerberos_keytab_file = \"{{ kerberos_keytab_file }}\"" + "\n }". -%% erlfmt-ignore hocon_config_template_ssl(Map) when map_size(Map) =:= 0 -> -""" -{ - enable = false -} -"""; + "{ enable = false }"; hocon_config_template_ssl(#{"enable" := "false"}) -> -""" -{ - enable = false -} -"""; + "{ enable = false }"; hocon_config_template_ssl(#{"enable" := "true"}) -> -""" -{ - enable = true - cacertfile = \"{{{cacertfile}}}\" - certfile = \"{{{certfile}}}\" - keyfile = \"{{{keyfile}}}\" -} -""". + "{ enable = true" + "\n cacertfile = \"{{{cacertfile}}}\"" + "\n certfile = \"{{{certfile}}}\"" + "\n keyfile = \"{{{keyfile}}}\"" + "\n }". kafka_hosts_string(tcp, none) -> kafka_hosts_string(); @@ -1197,6 +1158,13 @@ valid_sasl_kerberos_settings() -> "kerberos_keytab_file" => shared_secret(rig_keytab) }. +valid_sasl_plain_passfile_settings(Passfile) -> + Auth = valid_sasl_plain_settings(), + ok = file:write_file(Passfile, maps:get("password", Auth)), + Auth#{ + "password" := "file://" ++ Passfile + }. + kafka_hosts() -> kpro:parse_endpoints(kafka_hosts_string()). diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 1d9682b9b..ff4334a85 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -223,144 +223,136 @@ check_atom_key(Conf) when is_map(Conf) -> %% Data section %%=========================================================================== -%% erlfmt-ignore kafka_producer_old_hocon(_WithLocalTopic = true) -> kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n"); kafka_producer_old_hocon(_WithLocalTopic = false) -> kafka_producer_old_hocon("mqtt {}\n"); kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) -> -""" -bridges.kafka { - myproducer { - authentication = \"none\" - bootstrap_hosts = \"toxiproxy:9292\" - connect_timeout = \"5s\" - metadata_request_timeout = \"5s\" - min_metadata_refresh_interval = \"3s\" - producer { - kafka { - buffer { - memory_overload_protection = false - mode = \"memory\" - per_partition_limit = \"2GB\" - segment_bytes = \"100MB\" - } - compression = \"no_compression\" - max_batch_bytes = \"896KB\" - max_inflight = 10 - message { - key = \"${.clientid}\" - timestamp = \"${.timestamp}\" - value = \"${.}\" - } - partition_count_refresh_interval = \"60s\" - partition_strategy = \"random\" - required_acks = \"all_isr\" - topic = \"test-topic-two-partitions\" - } -""" ++ MQTTConfig ++ -""" - } - socket_opts { - nodelay = true - recbuf = \"1024KB\" - sndbuf = \"1024KB\" - } - ssl {enable = false, verify = \"verify_peer\"} - } -} -""". + [ + "bridges.kafka {" + "\n myproducer {" + "\n authentication = \"none\"" + "\n bootstrap_hosts = \"toxiproxy:9292\"" + "\n connect_timeout = \"5s\"" + "\n metadata_request_timeout = \"5s\"" + "\n min_metadata_refresh_interval = \"3s\"" + "\n producer {" + "\n kafka {" + "\n buffer {" + "\n memory_overload_protection = false" + "\n mode = \"memory\"" + "\n per_partition_limit = \"2GB\"" + "\n segment_bytes = \"100MB\"" + "\n }" + "\n compression = \"no_compression\"" + "\n max_batch_bytes = \"896KB\"" + "\n max_inflight = 10" + "\n message {" + "\n key = \"${.clientid}\"" + "\n timestamp = \"${.timestamp}\"" + "\n value = \"${.}\"" + "\n }" + "\n partition_count_refresh_interval = \"60s\"" + "\n partition_strategy = \"random\"" + "\n required_acks = \"all_isr\"" + "\n topic = \"test-topic-two-partitions\"" + "\n }", + MQTTConfig, + "\n }" + "\n socket_opts {" + "\n nodelay = true" + "\n recbuf = \"1024KB\"" + "\n sndbuf = \"1024KB\"" + "\n }" + "\n ssl {enable = false, verify = \"verify_peer\"}" + "\n }" + "\n}" + ]. kafka_producer_new_hocon() -> - "" - "\n" - "bridges.kafka {\n" - " myproducer {\n" - " authentication = \"none\"\n" - " bootstrap_hosts = \"toxiproxy:9292\"\n" - " connect_timeout = \"5s\"\n" - " metadata_request_timeout = \"5s\"\n" - " min_metadata_refresh_interval = \"3s\"\n" - " kafka {\n" - " buffer {\n" - " memory_overload_protection = false\n" - " mode = \"memory\"\n" - " per_partition_limit = \"2GB\"\n" - " segment_bytes = \"100MB\"\n" - " }\n" - " compression = \"no_compression\"\n" - " max_batch_bytes = \"896KB\"\n" - " max_inflight = 10\n" - " message {\n" - " key = \"${.clientid}\"\n" - " timestamp = \"${.timestamp}\"\n" - " value = \"${.}\"\n" - " }\n" - " partition_count_refresh_interval = \"60s\"\n" - " partition_strategy = \"random\"\n" - " required_acks = \"all_isr\"\n" - " topic = \"test-topic-two-partitions\"\n" - " }\n" - " local_topic = \"mqtt/local\"\n" - " socket_opts {\n" - " nodelay = true\n" - " recbuf = \"1024KB\"\n" - " sndbuf = \"1024KB\"\n" - " }\n" - " ssl {enable = false, verify = \"verify_peer\"}\n" - " resource_opts {\n" - " health_check_interval = 10s\n" - " }\n" - " }\n" - "}\n" - "". + "bridges.kafka {" + "\n myproducer {" + "\n authentication = \"none\"" + "\n bootstrap_hosts = \"toxiproxy:9292\"" + "\n connect_timeout = \"5s\"" + "\n metadata_request_timeout = \"5s\"" + "\n min_metadata_refresh_interval = \"3s\"" + "\n kafka {" + "\n buffer {" + "\n memory_overload_protection = false" + "\n mode = \"memory\"" + "\n per_partition_limit = \"2GB\"" + "\n segment_bytes = \"100MB\"" + "\n }" + "\n compression = \"no_compression\"" + "\n max_batch_bytes = \"896KB\"" + "\n max_inflight = 10" + "\n message {" + "\n key = \"${.clientid}\"" + "\n timestamp = \"${.timestamp}\"" + "\n value = \"${.}\"" + "\n }" + "\n partition_count_refresh_interval = \"60s\"" + "\n partition_strategy = \"random\"" + "\n required_acks = \"all_isr\"" + "\n topic = \"test-topic-two-partitions\"" + "\n }" + "\n local_topic = \"mqtt/local\"" + "\n socket_opts {" + "\n nodelay = true" + "\n recbuf = \"1024KB\"" + "\n sndbuf = \"1024KB\"" + "\n }" + "\n ssl {enable = false, verify = \"verify_peer\"}" + "\n resource_opts {" + "\n health_check_interval = 10s" + "\n }" + "\n }" + "\n}". -%% erlfmt-ignore kafka_consumer_hocon() -> -""" -bridges.kafka_consumer.my_consumer { - enable = true - bootstrap_hosts = \"kafka-1.emqx.net:9292\" - connect_timeout = 5s - min_metadata_refresh_interval = 3s - metadata_request_timeout = 5s - authentication = { - mechanism = plain - username = emqxuser - password = password - } - kafka { - max_batch_bytes = 896KB - max_rejoin_attempts = 5 - offset_commit_interval_seconds = 3s - offset_reset_policy = latest - } - topic_mapping = [ - { - kafka_topic = \"kafka-topic-1\" - mqtt_topic = \"mqtt/topic/1\" - qos = 1 - payload_template = \"${.}\" - }, - { - kafka_topic = \"kafka-topic-2\" - mqtt_topic = \"mqtt/topic/2\" - qos = 2 - payload_template = \"v = ${.value}\" - } - ] - key_encoding_mode = none - value_encoding_mode = none - ssl { - enable = false - verify = verify_none - server_name_indication = \"auto\" - } - resource_opts { - health_check_interval = 10s - } -} -""". + "bridges.kafka_consumer.my_consumer {" + "\n enable = true" + "\n bootstrap_hosts = \"kafka-1.emqx.net:9292\"" + "\n connect_timeout = 5s" + "\n min_metadata_refresh_interval = 3s" + "\n metadata_request_timeout = 5s" + "\n authentication = {" + "\n mechanism = plain" + "\n username = emqxuser" + "\n password = password" + "\n }" + "\n kafka {" + "\n max_batch_bytes = 896KB" + "\n max_rejoin_attempts = 5" + "\n offset_commit_interval_seconds = 3s" + "\n offset_reset_policy = latest" + "\n }" + "\n topic_mapping = [" + "\n {" + "\n kafka_topic = \"kafka-topic-1\"" + "\n mqtt_topic = \"mqtt/topic/1\"" + "\n qos = 1" + "\n payload_template = \"${.}\"" + "\n }," + "\n {" + "\n kafka_topic = \"kafka-topic-2\"" + "\n mqtt_topic = \"mqtt/topic/2\"" + "\n qos = 2" + "\n payload_template = \"v = ${.value}\"" + "\n }" + "\n ]" + "\n key_encoding_mode = none" + "\n value_encoding_mode = none" + "\n ssl {" + "\n enable = false" + "\n verify = verify_none" + "\n server_name_indication = \"auto\"" + "\n }" + "\n resource_opts {" + "\n health_check_interval = 10s" + "\n }" + "\n }". %% assert compatibility bridge_schema_json_test() -> diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src index 6066e2495..74d7dc94f 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kinesis, [ {description, "EMQX Enterprise Amazon Kinesis Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl index d98e7ab11..14e197113 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl @@ -62,12 +62,10 @@ fields(connector_config) -> } )}, {aws_secret_access_key, - mk( - binary(), + emqx_schema_secret:mk( #{ required => true, - desc => ?DESC("aws_secret_access_key"), - sensitive => true + desc => ?DESC("aws_secret_access_key") } )}, {endpoint, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl index d9dc0220f..959b539a0 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl @@ -97,7 +97,13 @@ init(#{ partition_key => PartitionKey, stream_name => StreamName }, - New = + %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords. + ok = erlcloud_config:configure( + to_str(AwsAccessKey), + to_str(emqx_secret:unwrap(AwsSecretAccessKey)), + Host, + Port, + Scheme, fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) -> Config0 = erlcloud_kinesis:new( AccessKeyID, @@ -107,9 +113,7 @@ init(#{ ConnectionScheme ++ "://" ), Config0#aws_config{retry_num = MaxRetries} - end, - erlcloud_config:configure( - to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New + end ), % check the connection case erlcloud_kinesis:list_streams() of diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 1e07ae96e..decf3e83b 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -15,7 +15,7 @@ -type config() :: #{ aws_access_key_id := binary(), - aws_secret_access_key := binary(), + aws_secret_access_key := emqx_secret:t(binary()), endpoint := binary(), stream_name := binary(), partition_key := binary(), diff --git a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl index ea926fc33..61b354ea3 100644 --- a/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl @@ -11,10 +11,11 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(PRODUCER, emqx_bridge_kinesis_impl_producer). -define(BRIDGE_TYPE, kinesis_producer). -define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>). -define(KINESIS_PORT, 4566). +-define(KINESIS_ACCESS_KEY, "aws_access_key_id"). +-define(KINESIS_SECRET_KEY, "aws_secret_access_key"). -define(TOPIC, <<"t/topic">>). %%------------------------------------------------------------------------------ @@ -38,6 +39,8 @@ init_per_suite(Config) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), ProxyName = "kinesis", + SecretFile = filename:join(?config(priv_dir, Config), "secret"), + ok = file:write_file(SecretFile, <>), ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), {ok, _} = application:ensure_all_started(emqx_connector), @@ -46,6 +49,7 @@ init_per_suite(Config) -> {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, {kinesis_port, ?KINESIS_PORT}, + {kinesis_secretfile, SecretFile}, {proxy_name, ProxyName} | Config ]. @@ -130,6 +134,7 @@ kinesis_config(Config) -> Scheme = proplists:get_value(connection_scheme, Config, "http"), ProxyHost = proplists:get_value(proxy_host, Config), KinesisPort = proplists:get_value(kinesis_port, Config), + SecretFile = proplists:get_value(kinesis_secretfile, Config), BatchSize = proplists:get_value(batch_size, Config, 100), BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>), PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"), @@ -140,29 +145,32 @@ kinesis_config(Config) -> Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>, ConfigString = io_lib:format( - "bridges.kinesis_producer.~s {\n" - " enable = true\n" - " aws_access_key_id = \"aws_access_key_id\"\n" - " aws_secret_access_key = \"aws_secret_access_key\"\n" - " endpoint = \"~s://~s:~b\"\n" - " stream_name = \"~s\"\n" - " partition_key = \"~s\"\n" - " payload_template = \"~s\"\n" - " max_retries = ~b\n" - " pool_size = 1\n" - " resource_opts = {\n" - " health_check_interval = \"3s\"\n" - " request_ttl = 30s\n" - " resume_interval = 1s\n" - " metrics_flush_interval = \"700ms\"\n" - " worker_pool_size = 1\n" - " query_mode = ~s\n" - " batch_size = ~b\n" - " batch_time = \"~s\"\n" - " }\n" - "}\n", + "bridges.kinesis_producer.~s {" + "\n enable = true" + "\n aws_access_key_id = ~p" + "\n aws_secret_access_key = ~p" + "\n endpoint = \"~s://~s:~b\"" + "\n stream_name = \"~s\"" + "\n partition_key = \"~s\"" + "\n payload_template = \"~s\"" + "\n max_retries = ~b" + "\n pool_size = 1" + "\n resource_opts = {" + "\n health_check_interval = \"3s\"" + "\n request_ttl = 30s" + "\n resume_interval = 1s" + "\n metrics_flush_interval = \"700ms\"" + "\n worker_pool_size = 1" + "\n query_mode = ~s" + "\n batch_size = ~b" + "\n batch_time = \"~s\"" + "\n }" + "\n }", [ Name, + ?KINESIS_ACCESS_KEY, + %% NOTE: using file-based secrets with HOCON configs. + "file://" ++ SecretFile, Scheme, ProxyHost, KinesisPort, @@ -203,9 +211,6 @@ delete_bridge(Config) -> ct:pal("deleting bridge ~p", [{Type, Name}]), emqx_bridge:remove(Type, Name). -create_bridge_http(Config) -> - create_bridge_http(Config, _KinesisConfigOverrides = #{}). - create_bridge_http(Config, KinesisConfigOverrides) -> TypeBin = ?BRIDGE_TYPE_BIN, Name = ?config(kinesis_name, Config), @@ -489,7 +494,11 @@ to_bin(Str) when is_list(Str) -> %%------------------------------------------------------------------------------ t_create_via_http(Config) -> - ?assertMatch({ok, _}, create_bridge_http(Config)), + Overrides = #{ + %% NOTE: using literal secret with HTTP API requests. + <<"aws_secret_access_key">> => <> + }, + ?assertMatch({ok, _}, create_bridge_http(Config, Overrides)), ok. t_start_failed_then_fix(Config) -> diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index 35bcc3fc4..5545ac967 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mongodb, [ {description, "EMQX Enterprise MongoDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index 8c004d829..741db9550 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -6,9 +6,6 @@ -behaviour(emqx_resource). --include_lib("emqx_connector/include/emqx_connector_tables.hrl"). --include_lib("emqx_resource/include/emqx_resource.hrl"). --include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). diff --git a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl index f2d0bc1c5..cedb19b88 100644 --- a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl +++ b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl @@ -11,6 +11,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_utils_conv, [bin/1]). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -96,14 +98,27 @@ init_per_group(Type = single, Config) -> true -> ok = start_apps(), emqx_mgmt_api_test_util:init_suite(), - {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config), + %% NOTE: `mongo-single` has auth enabled, see `credentials.env`. + AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")), + Username = bin(os:getenv("MONGO_USERNAME", "")), + Password = bin(os:getenv("MONGO_PASSWORD", "")), + Passfile = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(Passfile, Password), + NConfig = [ + {mongo_authsource, AuthSource}, + {mongo_username, Username}, + {mongo_password, Password}, + {mongo_passfile, Passfile} + | Config + ], + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, NConfig), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, {mongo_config, MongoConfig}, {mongo_type, Type}, {mongo_name, Name} - | Config + | NConfig ]; false -> {skip, no_mongo} @@ -121,13 +136,13 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Testcase, Config) -> - catch clear_db(Config), + clear_db(Config), delete_bridge(Config), snabbkaffe:start_trace(), Config. end_per_testcase(_Testcase, Config) -> - catch clear_db(Config), + clear_db(Config), delete_bridge(Config), snabbkaffe:stop(), ok. @@ -175,19 +190,19 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) -> Name = atom_to_binary(?MODULE), ConfigString = io_lib:format( - "bridges.mongodb_rs.~s {\n" - " enable = true\n" - " collection = mycol\n" - " replica_set_name = rs0\n" - " servers = [~p]\n" - " w_mode = safe\n" - " use_legacy_protocol = auto\n" - " database = mqtt\n" - " resource_opts = {\n" - " query_mode = ~s\n" - " worker_pool_size = 1\n" - " }\n" - "}", + "bridges.mongodb_rs.~s {" + "\n enable = true" + "\n collection = mycol" + "\n replica_set_name = rs0" + "\n servers = [~p]" + "\n w_mode = safe" + "\n use_legacy_protocol = auto" + "\n database = mqtt" + "\n resource_opts = {" + "\n query_mode = ~s" + "\n worker_pool_size = 1" + "\n }" + "\n }", [ Name, Servers, @@ -202,18 +217,18 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) -> Name = atom_to_binary(?MODULE), ConfigString = io_lib:format( - "bridges.mongodb_sharded.~s {\n" - " enable = true\n" - " collection = mycol\n" - " servers = [~p]\n" - " w_mode = safe\n" - " use_legacy_protocol = auto\n" - " database = mqtt\n" - " resource_opts = {\n" - " query_mode = ~s\n" - " worker_pool_size = 1\n" - " }\n" - "}", + "bridges.mongodb_sharded.~s {" + "\n enable = true" + "\n collection = mycol" + "\n servers = [~p]" + "\n w_mode = safe" + "\n use_legacy_protocol = auto" + "\n database = mqtt" + "\n resource_opts = {" + "\n query_mode = ~s" + "\n worker_pool_size = 1" + "\n }" + "\n }", [ Name, Servers, @@ -228,21 +243,27 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) -> Name = atom_to_binary(?MODULE), ConfigString = io_lib:format( - "bridges.mongodb_single.~s {\n" - " enable = true\n" - " collection = mycol\n" - " server = ~p\n" - " w_mode = safe\n" - " use_legacy_protocol = auto\n" - " database = mqtt\n" - " resource_opts = {\n" - " query_mode = ~s\n" - " worker_pool_size = 1\n" - " }\n" - "}", + "bridges.mongodb_single.~s {" + "\n enable = true" + "\n collection = mycol" + "\n server = ~p" + "\n w_mode = safe" + "\n use_legacy_protocol = auto" + "\n database = mqtt" + "\n auth_source = ~s" + "\n username = ~s" + "\n password = \"file://~s\"" + "\n resource_opts = {" + "\n query_mode = ~s" + "\n worker_pool_size = 1" + "\n }" + "\n }", [ Name, Server, + ?config(mongo_authsource, Config), + ?config(mongo_username, Config), + ?config(mongo_passfile, Config), QueryMode ] ), @@ -284,8 +305,24 @@ clear_db(Config) -> Host = ?config(mongo_host, Config), Port = ?config(mongo_port, Config), Server = Host ++ ":" ++ integer_to_list(Port), - #{<<"database">> := Db, <<"collection">> := Collection} = ?config(mongo_config, Config), - {ok, Client} = mongo_api:connect(Type, [Server], [], [{database, Db}, {w_mode, unsafe}]), + #{ + <<"database">> := Db, + <<"collection">> := Collection + } = ?config(mongo_config, Config), + WorkerOpts = [ + {database, Db}, + {w_mode, unsafe} + | lists:flatmap( + fun + ({mongo_authsource, AS}) -> [{auth_source, AS}]; + ({mongo_username, User}) -> [{login, User}]; + ({mongo_password, Pass}) -> [{password, Pass}]; + (_) -> [] + end, + Config + ) + ], + {ok, Client} = mongo_api:connect(Type, [Server], [], WorkerOpts), {true, _} = mongo_api:delete(Client, Collection, _Selector = #{}), mongo_api:disconnect(Client). @@ -386,13 +423,21 @@ t_setup_via_config_and_publish(Config) -> ok. t_setup_via_http_api_and_publish(Config) -> - Type = mongo_type_bin(?config(mongo_type, Config)), + Type = ?config(mongo_type, Config), Name = ?config(mongo_name, Config), MongoConfig0 = ?config(mongo_config, Config), - MongoConfig = MongoConfig0#{ + MongoConfig1 = MongoConfig0#{ <<"name">> => Name, - <<"type">> => Type + <<"type">> => mongo_type_bin(Type) }, + MongoConfig = + case Type of + single -> + %% NOTE: using literal password with HTTP API requests. + MongoConfig1#{<<"password">> => ?config(mongo_password, Config)}; + _ -> + MongoConfig1 + end, ?assertMatch( {ok, _}, create_bridge_http(MongoConfig) diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index a34b65ede..98b957b19 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -21,7 +21,6 @@ "DEFAULT CHARSET=utf8MB4;" ). -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test"). --define(SQL_DELETE, "DELETE from mqtt_test"). -define(SQL_SELECT, "SELECT payload FROM mqtt_test"). % DB defaults @@ -112,8 +111,8 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Testcase, Config) -> + connect_and_drop_table(Config), connect_and_create_table(Config), - connect_and_clear_table(Config), delete_bridge(Config), snabbkaffe:start_trace(), Config. @@ -122,9 +121,7 @@ end_per_testcase(_Testcase, Config) -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - connect_and_clear_table(Config), ok = snabbkaffe:stop(), - delete_bridge(Config), emqx_common_test_helpers:call_janitor(), ok. @@ -323,9 +320,6 @@ connect_and_create_table(Config) -> connect_and_drop_table(Config) -> query_direct_mysql(Config, ?SQL_DROP_TABLE). -connect_and_clear_table(Config) -> - query_direct_mysql(Config, ?SQL_DELETE). - connect_and_get_payload(Config) -> query_direct_mysql(Config, ?SQL_SELECT). @@ -777,28 +771,21 @@ t_table_removed(Config) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - ?check_trace( - begin - connect_and_create_table(Config), - ?assertMatch({ok, _}, create_bridge(Config)), - ?retry( - _Sleep = 1_000, - _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) - ), - connect_and_drop_table(Config), - Val = integer_to_binary(erlang:unique_integer()), - SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 1000, - ?assertMatch( - {error, - {unrecoverable_error, - {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, - sync_query_resource(Config, {send_message, SentData, [], Timeout}) - ), - ok - end, - [] + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + connect_and_drop_table(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, + {unrecoverable_error, {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, + sync_query_resource(Config, {send_message, SentData, [], Timeout}) ), ok. @@ -807,38 +794,31 @@ t_nested_payload_template(Config) -> BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), Value = integer_to_binary(erlang:unique_integer()), - ?check_trace( - begin - connect_and_create_table(Config), - {ok, _} = create_bridge( - Config, - #{ - <<"sql">> => - "INSERT INTO mqtt_test(payload, arrived) " - "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))" - } - ), - {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), - ?retry( - _Sleep = 1_000, - _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) - ), - %% send message via rule action - Payload = emqx_utils_json:encode(#{value => Value}), - Message = emqx_message:make(Topic, Payload), - {_, {ok, _}} = - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := mysql_connector_query_return}, - 10_000 - ), - ?assertEqual( - {ok, [<<"payload">>], [[Value]]}, - connect_and_get_payload(Config) - ), - ok - end, - [] + {ok, _} = create_bridge( + Config, + #{ + <<"sql">> => + "INSERT INTO mqtt_test(payload, arrived) " + "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))" + } + ), + {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + %% send message via rule action + Payload = emqx_utils_json:encode(#{value => Value}), + Message = emqx_message:make(Topic, Payload), + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := mysql_connector_query_return}, + 10_000 + ), + ?assertEqual( + {ok, [<<"payload">>], [[Value]]}, + connect_and_get_payload(Config) ), ok. diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 6b949b047..878ae2e1d 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -16,7 +16,6 @@ -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]). -define(SID, "XE"). -define(RULE_TOPIC, "mqtt/rule"). -% -define(RULE_TOPIC_BIN, <>). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -33,9 +32,6 @@ groups() -> {plain, AllTCs} ]. -only_once_tests() -> - [t_create_via_http]. - init_per_suite(Config) -> Config. diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 156d4bd16..722489ba6 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -183,31 +183,33 @@ pgsql_config(BridgeType, Config) -> end, QueryMode = ?config(query_mode, Config), TlsEnabled = ?config(enable_tls, Config), + %% NOTE: supplying password through a file here, to verify that it works. + Password = create_passfile(BridgeType, Config), ConfigString = io_lib:format( - "bridges.~s.~s {\n" - " enable = true\n" - " server = ~p\n" - " database = ~p\n" - " username = ~p\n" - " password = ~p\n" - " sql = ~p\n" - " resource_opts = {\n" - " request_ttl = 500ms\n" - " batch_size = ~b\n" - " query_mode = ~s\n" - " }\n" - " ssl = {\n" - " enable = ~w\n" - " }\n" - "}", + "bridges.~s.~s {" + "\n enable = true" + "\n server = ~p" + "\n database = ~p" + "\n username = ~p" + "\n password = ~p" + "\n sql = ~p" + "\n resource_opts = {" + "\n request_ttl = 500ms" + "\n batch_size = ~b" + "\n query_mode = ~s" + "\n }" + "\n ssl = {" + "\n enable = ~w" + "\n }" + "\n }", [ BridgeType, Name, Server, ?PGSQL_DATABASE, ?PGSQL_USERNAME, - ?PGSQL_PASSWORD, + Password, ?SQL_BRIDGE, BatchSize, QueryMode, @@ -216,6 +218,12 @@ pgsql_config(BridgeType, Config) -> ), {Name, parse_and_check(ConfigString, BridgeType, Name)}. +create_passfile(BridgeType, Config) -> + Filename = binary_to_list(BridgeType) ++ ".passfile", + Filepath = filename:join(?config(priv_dir, Config), Filename), + ok = file:write_file(Filepath, ?PGSQL_PASSWORD), + "file://" ++ Filepath. + parse_and_check(ConfigString, BridgeType, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), @@ -379,7 +387,9 @@ t_setup_via_http_api_and_publish(Config) -> QueryMode = ?config(query_mode, Config), PgsqlConfig = PgsqlConfig0#{ <<"name">> => Name, - <<"type">> => BridgeType + <<"type">> => BridgeType, + %% NOTE: using literal passwords with HTTP API requests. + <<"password">> => <> }, ?assertMatch( {ok, _}, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index beb8452b2..c7b378617 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -170,21 +170,17 @@ fields(auth_basic) -> [ {username, mk(binary(), #{required => true, desc => ?DESC("auth_basic_username")})}, {password, - mk(binary(), #{ + emqx_schema_secret:mk(#{ required => true, - desc => ?DESC("auth_basic_password"), - sensitive => true, - converter => fun emqx_schema:password_converter/2 + desc => ?DESC("auth_basic_password") })} ]; fields(auth_token) -> [ {jwt, - mk(binary(), #{ + emqx_schema_secret:mk(#{ required => true, - desc => ?DESC("auth_token_jwt"), - sensitive => true, - converter => fun emqx_schema:password_converter/2 + desc => ?DESC("auth_token_jwt") })} ]; fields("get_" ++ Type) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 33ac83ee1..fed0142c5 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -78,7 +78,6 @@ query_mode(_Config) -> -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> #{ - authentication := _Auth, bridge_name := BridgeName, servers := Servers0, ssl := SSL @@ -263,12 +262,14 @@ conn_opts(#{authentication := none}) -> #{}; conn_opts(#{authentication := #{username := Username, password := Password}}) -> #{ - auth_data => iolist_to_binary([Username, <<":">>, Password]), + %% TODO: teach `pulsar` to accept 0-arity closures as passwords. + auth_data => iolist_to_binary([Username, <<":">>, emqx_secret:unwrap(Password)]), auth_method_name => <<"basic">> }; conn_opts(#{authentication := #{jwt := JWT}}) -> #{ - auth_data => JWT, + %% TODO: teach `pulsar` to accept 0-arity closures as passwords. + auth_data => emqx_secret:unwrap(JWT), auth_method_name => <<"token">> }. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index c7d931c93..7e32b5a89 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index ff439b676..04a93e08e 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -72,7 +72,7 @@ fields(config) -> desc => ?DESC("username") } )}, - {password, fun emqx_connector_schema_lib:password_required/1}, + {password, emqx_connector_schema_lib:password_field(#{required => true})}, {pool_size, hoconsc:mk( typerefl:pos_integer(), @@ -194,7 +194,6 @@ on_start( #{ pool_size := PoolSize, payload_template := PayloadTemplate, - password := Password, delivery_mode := InitialDeliveryMode } = InitialConfig ) -> @@ -204,7 +203,6 @@ on_start( persistent -> 2 end, Config = InitialConfig#{ - password => emqx_secret:wrap(Password), delivery_mode => DeliveryMode }, ?SLOG(info, #{ @@ -240,13 +238,11 @@ on_start( ok -> {ok, State}; {error, Reason} -> - LogMessage = - #{ - msg => "rabbitmq_connector_start_failed", - error_reason => Reason, - config => emqx_utils:redact(Config) - }, - ?SLOG(info, LogMessage), + ?SLOG(info, #{ + msg => "rabbitmq_connector_start_failed", + error_reason => Reason, + config => emqx_utils:redact(Config) + }), {error, Reason} end. @@ -319,6 +315,7 @@ create_rabbitmq_connection_and_channel(Config) -> heartbeat := Heartbeat, wait_for_publish_confirmations := WaitForPublishConfirmations } = Config, + %% TODO: teach `amqp` to accept 0-arity closures as passwords. Password = emqx_secret:unwrap(WrappedPassword), SSLOptions = case maps:get(ssl, Config, #{}) of diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index 106a4d67b..689c39dc5 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -10,6 +10,7 @@ -include("emqx_connector.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). %% This test SUITE requires a running RabbitMQ instance. If you don't want to @@ -26,6 +27,9 @@ rabbit_mq_host() -> rabbit_mq_port() -> 5672. +rabbit_mq_password() -> + <<"guest">>. + rabbit_mq_exchange() -> <<"test_exchange">>. @@ -45,12 +49,12 @@ init_per_suite(Config) -> ) of true -> - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource]), - {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(amqp_client), + Apps = emqx_cth_suite:start( + [emqx_conf, emqx_connector, emqx_bridge_rabbitmq], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), ChannelConnection = setup_rabbit_mq_exchange_and_queue(), - [{channel_connection, ChannelConnection} | Config]; + [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config]; false -> case os:getenv("IS_CI") of "yes" -> @@ -106,13 +110,11 @@ end_per_suite(Config) -> connection := Connection, channel := Channel } = get_channel_connection(Config), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector), %% Close the channel ok = amqp_channel:close(Channel), %% Close the connection - ok = amqp_connection:close(Connection). + ok = amqp_connection:close(Connection), + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). % %%------------------------------------------------------------------------------ % %% Testcases @@ -125,23 +127,31 @@ t_lifecycle(Config) -> Config ). +t_start_passfile(Config) -> + ResourceID = atom_to_binary(?FUNCTION_NAME), + PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(PasswordFilename, rabbit_mq_password()), + InitialConfig = rabbitmq_config(#{ + password => iolist_to_binary(["file://", PasswordFilename]) + }), + ?assertMatch( + #{status := connected}, + create_local_resource(ResourceID, check_config(InitialConfig)) + ), + ?assertEqual( + ok, + emqx_resource:remove_local(ResourceID) + ). + perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> #{ channel := Channel } = get_channel_connection(TestConfig), - {ok, #{config := CheckedConfig}} = - emqx_resource:check_config(emqx_bridge_rabbitmq_connector, InitialConfig), - {ok, #{ + CheckedConfig = check_config(InitialConfig), + #{ state := #{poolname := PoolName} = State, status := InitialStatus - }} = - emqx_resource:create_local( - ResourceID, - ?CONNECTOR_RESOURCE_GROUP, - emqx_bridge_rabbitmq_connector, - CheckedConfig, - #{} - ), + } = create_local_resource(ResourceID, CheckedConfig), ?assertEqual(InitialStatus, connected), %% Instance should match the state and status of the just started resource {ok, ?CONNECTOR_RESOURCE_GROUP, #{ @@ -184,6 +194,21 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> % %% Helpers % %%------------------------------------------------------------------------------ +check_config(Config) -> + {ok, #{config := CheckedConfig}} = + emqx_resource:check_config(emqx_bridge_rabbitmq_connector, Config), + CheckedConfig. + +create_local_resource(ResourceID, CheckedConfig) -> + {ok, Bridge} = emqx_resource:create_local( + ResourceID, + ?CONNECTOR_RESOURCE_GROUP, + emqx_bridge_rabbitmq_connector, + CheckedConfig, + #{} + ), + Bridge. + perform_query(PoolName, Channel) -> %% Send message to queue: ok = emqx_resource:query(PoolName, {query, test_data()}), @@ -216,16 +241,19 @@ receive_simple_test_message(Channel) -> end. rabbitmq_config() -> + rabbitmq_config(#{}). + +rabbitmq_config(Overrides) -> Config = #{ server => rabbit_mq_host(), port => 5672, username => <<"guest">>, - password => <<"guest">>, + password => rabbit_mq_password(), exchange => rabbit_mq_exchange(), routing_key => rabbit_mq_routing_key() }, - #{<<"config">> => Config}. + #{<<"config">> => maps:merge(Config, Overrides)}. test_data() -> #{<<"msg_field">> => <<"Hello">>}. diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index e158a2e46..38c00e7ee 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, []}, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index dbac88249..81045ade4 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -48,13 +48,8 @@ fields(config) -> binary(), #{default => <<>>, desc => ?DESC("access_key")} )}, - {secret_key, - mk( - binary(), - #{default => <<>>, desc => ?DESC("secret_key"), sensitive => true} - )}, - {security_token, - mk(binary(), #{default => <<>>, desc => ?DESC(security_token), sensitive => true})}, + {secret_key, emqx_schema_secret:mk(#{default => <<>>, desc => ?DESC("secret_key")})}, + {security_token, emqx_schema_secret:mk(#{default => <<>>, desc => ?DESC(security_token)})}, {sync_timeout, mk( emqx_schema:timeout_duration(), @@ -294,21 +289,19 @@ make_producer_opts( acl_info => emqx_secret:wrap(ACLInfo) }. -acl_info(<<>>, <<>>, <<>>) -> +acl_info(<<>>, _, _) -> #{}; -acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) -> - #{ +acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) -> + Info = #{ access_key => AccessKey, - secret_key => SecretKey - }; -acl_info(AccessKey, SecretKey, SecurityToken) when - is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken) --> - #{ - access_key => AccessKey, - secret_key => SecretKey, - security_token => SecurityToken - }; + secret_key => emqx_maybe:define(emqx_secret:unwrap(SecretKey), <<>>) + }, + case emqx_maybe:define(emqx_secret:unwrap(SecurityToken), <<>>) of + <<>> -> + Info; + Token -> + Info#{security_token => Token} + end; acl_info(_, _, _) -> #{}. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 1664fee59..331f9c29f 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, []}, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 6db8c2877..a87e71e31 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -199,7 +199,7 @@ on_start( Options = [ {server, to_bin(Server)}, {username, Username}, - {password, emqx_secret:wrap(maps:get(password, Config, ""))}, + {password, maps:get(password, Config, emqx_secret:wrap(""))}, {driver, Driver}, {database, Database}, {pool_size, PoolSize} diff --git a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl index 101ead838..62214cb5e 100644 --- a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl +++ b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl @@ -130,7 +130,9 @@ end_per_group(_Group, _Config) -> ok. init_per_suite(Config) -> - Config. + Passfile = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(Passfile, <>), + [{sqlserver_passfile, Passfile} | Config]. end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), @@ -193,7 +195,9 @@ t_setup_via_http_api_and_publish(Config) -> SQLServerConfig0 = ?config(sqlserver_config, Config), SQLServerConfig = SQLServerConfig0#{ <<"name">> => Name, - <<"type">> => BridgeType + <<"type">> => BridgeType, + %% NOTE: using literal password with HTTP API requests. + <<"password">> => <> }, ?assertMatch( {ok, _}, @@ -449,6 +453,7 @@ sqlserver_config(BridgeType, Config) -> Name = atom_to_binary(?MODULE), BatchSize = batch_size(Config), QueryMode = ?config(query_mode, Config), + Passfile = ?config(sqlserver_passfile, Config), ConfigString = io_lib:format( "bridges.~s.~s {\n" @@ -472,7 +477,7 @@ sqlserver_config(BridgeType, Config) -> Server, ?SQL_SERVER_DATABASE, ?SQL_SERVER_USERNAME, - ?SQL_SERVER_PASSWORD, + "file://" ++ Passfile, ?SQL_BRIDGE, ?SQL_SERVER_DRIVER, BatchSize, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index e363f2f9c..5375a6ba9 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index dcef8506c..522007cbc 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -6,7 +6,6 @@ -behaviour(emqx_resource). --include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -48,8 +47,8 @@ adjust_fields(Fields) -> fun ({username, OrigUsernameFn}) -> {username, add_default_fn(OrigUsernameFn, <<"root">>)}; - ({password, OrigPasswordFn}) -> - {password, make_required_fn(OrigPasswordFn)}; + ({password, _}) -> + {password, emqx_connector_schema_lib:password_field(#{required => true})}; (Field) -> Field end, @@ -62,12 +61,6 @@ add_default_fn(OrigFn, Default) -> (Field) -> OrigFn(Field) end. -make_required_fn(OrigFn) -> - fun - (required) -> true; - (Field) -> OrigFn(Field) - end. - server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?TD_HOST_OPTIONS). @@ -223,7 +216,10 @@ aggregate_query(BatchTks, BatchReqs, Acc) -> ). connect(Opts) -> - tdengine:start_link(Opts). + %% TODO: teach `tdengine` to accept 0-arity closures as passwords. + {value, {password, Secret}, OptsRest} = lists:keytake(password, 1, Opts), + NOpts = [{password, emqx_secret:unwrap(Secret)} | OptsRest], + tdengine:start_link(NOpts). query_opts(#{database := Database} = _Opts) -> [{db_name, Database}]. diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index a277fe8c8..d2357b360 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -22,15 +22,15 @@ -export([ relational_db_fields/0, ssl_fields/0, - prepare_statement_fields/0 + prepare_statement_fields/0, + password_field/0, + password_field/1 ]). -export([ pool_size/1, database/1, username/1, - password/1, - password_required/1, auto_reconnect/1 ]). @@ -68,10 +68,19 @@ relational_db_fields() -> %% See emqx_resource.hrl {pool_size, fun pool_size/1}, {username, fun username/1}, - {password, fun password/1}, + {password, password_field()}, {auto_reconnect, fun auto_reconnect/1} ]. +-spec password_field() -> hocon_schema:field_schema(). +password_field() -> + password_field(#{}). + +-spec password_field(#{atom() => _}) -> hocon_schema:field_schema(). +password_field(Overrides) -> + Base = #{desc => ?DESC("password")}, + emqx_schema_secret:mk(maps:merge(Base, Overrides)). + prepare_statement_fields() -> [{prepare_statement, fun prepare_statement/1}]. @@ -97,22 +106,6 @@ username(desc) -> ?DESC("username"); username(required) -> false; username(_) -> undefined. -password(type) -> binary(); -password(desc) -> ?DESC("password"); -password(required) -> false; -password(format) -> <<"password">>; -password(sensitive) -> true; -password(converter) -> fun emqx_schema:password_converter/2; -password(_) -> undefined. - -password_required(type) -> binary(); -password_required(desc) -> ?DESC("password"); -password_required(required) -> true; -password_required(format) -> <<"password">>; -password_required(sensitive) -> true; -password_required(converter) -> fun emqx_schema:password_converter/2; -password_required(_) -> undefined. - auto_reconnect(type) -> boolean(); auto_reconnect(desc) -> ?DESC("auto_reconnect"); auto_reconnect(default) -> true; diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl index c023ace51..830b50676 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl @@ -204,7 +204,7 @@ backend(get, #{bindings := #{backend := Type}}) -> undefined -> {404, #{code => ?BACKEND_NOT_FOUND, message => <<"Backend not found">>}}; Backend -> - {200, to_json(Backend)} + {200, to_redacted_json(Backend)} end; backend(put, #{bindings := #{backend := Backend}, body := Config}) -> ?SLOG(info, #{ @@ -264,9 +264,9 @@ valid_config(_, _, _) -> {error, invalid_config}. handle_backend_update_result({ok, #{backend := saml} = State}, _Config) -> - {200, to_json(maps:without([idp_meta, sp], State))}; + {200, to_redacted_json(maps:without([idp_meta, sp], State))}; handle_backend_update_result({ok, _State}, Config) -> - {200, to_json(Config)}; + {200, to_redacted_json(Config)}; handle_backend_update_result(ok, _) -> 204; handle_backend_update_result({error, not_exists}, _) -> @@ -278,9 +278,9 @@ handle_backend_update_result({error, Reason}, _) when is_binary(Reason) -> handle_backend_update_result({error, Reason}, _) -> {400, #{code => ?BAD_REQUEST, message => emqx_dashboard_sso:format(["Reason: ", Reason])}}. -to_json(Data) -> +to_redacted_json(Data) -> emqx_utils_maps:jsonable_map( - Data, + emqx_utils:redact(Data), fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end diff --git a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl index 8966ffca9..9e831b4d2 100644 --- a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl +++ b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl @@ -10,9 +10,11 @@ -include_lib("emqx_dashboard/include/emqx_dashboard.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -define(LDAP_HOST, "ldap"). -define(LDAP_DEFAULT_PORT, 389). +-define(LDAP_PASSWORD, <<"public">>). -define(LDAP_USER, <<"viewer1">>). -define(LDAP_USER_PASSWORD, <<"viewer1">>). -define(LDAP_BASE_DN, <<"ou=dashboard,dc=emqx,dc=io">>). @@ -128,9 +130,19 @@ t_update({init, Config}) -> Config; t_update({'end', _Config}) -> ok; -t_update(_) -> +t_update(Config) -> Path = uri(["sso", "ldap"]), - {ok, 200, Result} = request(put, Path, ldap_config(#{<<"enable">> => <<"true">>})), + %% NOTE: this time verify that supplying password through file-based secret works. + PasswordFilename = filename:join([?config(priv_dir, Config), "passfile"]), + ok = file:write_file(PasswordFilename, ?LDAP_PASSWORD), + {ok, 200, Result} = request( + put, + Path, + ldap_config(#{ + <<"enable">> => <<"true">>, + <<"password">> => iolist_to_binary(["file://", PasswordFilename]) + }) + ), check_running([<<"ldap">>]), ?assertMatch(#{backend := <<"ldap">>, enable := true}, decode_json(Result)), ?assertMatch([#{backend := <<"ldap">>, enable := true}], get_sso()), @@ -287,7 +299,7 @@ ldap_config(Override) -> <<"base_dn">> => ?LDAP_BASE_DN, <<"filter">> => ?LDAP_FILTER_WITH_UID, <<"username">> => <<"cn=root,dc=emqx,dc=io">>, - <<"password">> => <<"public">>, + <<"password">> => ?LDAP_PASSWORD, <<"pool_size">> => 8 }, Override diff --git a/apps/emqx_ldap/src/emqx_ldap.erl b/apps/emqx_ldap/src/emqx_ldap.erl index a77a8ecf0..315733b79 100644 --- a/apps/emqx_ldap/src/emqx_ldap.erl +++ b/apps/emqx_ldap/src/emqx_ldap.erl @@ -53,8 +53,6 @@ filter_tokens := params_tokens() }. --define(ECS, emqx_connector_schema_lib). - %%===================================================================== %% Hocon schema roots() -> @@ -63,9 +61,9 @@ roots() -> fields(config) -> [ {server, server()}, - {pool_size, fun ?ECS:pool_size/1}, + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {username, fun ensure_username/1}, - {password, fun ?ECS:password/1}, + {password, emqx_connector_schema_lib:password_field()}, {base_dn, ?HOCON(binary(), #{ desc => ?DESC(base_dn), @@ -124,7 +122,7 @@ server() -> ensure_username(required) -> true; ensure_username(Field) -> - ?ECS:username(Field). + emqx_connector_schema_lib:username(Field). %% =================================================================== callback_mode() -> always_sync. @@ -223,7 +221,8 @@ connect(Options) -> OpenOpts = maps:to_list(maps:with([port, sslopts], Conf)), case eldap:open([Host], [{log, fun log/3}, {timeout, RequestTimeout} | OpenOpts]) of {ok, Handle} = Ret -> - case eldap:simple_bind(Handle, Username, Password) of + %% TODO: teach `eldap` to accept 0-arity closures as passwords. + case eldap:simple_bind(Handle, Username, emqx_secret:unwrap(Password)) of ok -> Ret; Error -> Error end; @@ -320,13 +319,13 @@ log(Level, Format, Args) -> ). prepare_template(Config, State) -> - do_prepare_template(maps:to_list(maps:with([base_dn, filter], Config)), State). + maps:fold(fun prepare_template/3, State, Config). -do_prepare_template([{base_dn, V} | T], State) -> - do_prepare_template(T, State#{base_tokens => emqx_placeholder:preproc_tmpl(V)}); -do_prepare_template([{filter, V} | T], State) -> - do_prepare_template(T, State#{filter_tokens => emqx_placeholder:preproc_tmpl(V)}); -do_prepare_template([], State) -> +prepare_template(base_dn, V, State) -> + State#{base_tokens => emqx_placeholder:preproc_tmpl(V)}; +prepare_template(filter, V, State) -> + State#{filter_tokens => emqx_placeholder:preproc_tmpl(V)}; +prepare_template(_Entry, _, State) -> State. filter_escape(Binary) when is_binary(Binary) -> diff --git a/apps/emqx_mongodb/src/emqx_mongodb.app.src b/apps/emqx_mongodb/src/emqx_mongodb.app.src index eb846a7ab..2212ac7d4 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.app.src +++ b/apps/emqx_mongodb/src/emqx_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_mongodb, [ {description, "EMQX MongoDB Connector"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mongodb/src/emqx_mongodb.erl b/apps/emqx_mongodb/src/emqx_mongodb.erl index 77161911a..6e623ea23 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.erl +++ b/apps/emqx_mongodb/src/emqx_mongodb.erl @@ -140,7 +140,7 @@ mongo_fields() -> {srv_record, fun srv_record/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {username, fun emqx_connector_schema_lib:username/1}, - {password, fun emqx_connector_schema_lib:password/1}, + {password, emqx_connector_schema_lib:password_field()}, {use_legacy_protocol, hoconsc:mk(hoconsc:enum([auto, true, false]), #{ default => auto, @@ -428,8 +428,8 @@ init_worker_options([{auth_source, V} | R], Acc) -> init_worker_options(R, [{auth_source, V} | Acc]); init_worker_options([{username, V} | R], Acc) -> init_worker_options(R, [{login, V} | Acc]); -init_worker_options([{password, V} | R], Acc) -> - init_worker_options(R, [{password, emqx_secret:wrap(V)} | Acc]); +init_worker_options([{password, Secret} | R], Acc) -> + init_worker_options(R, [{password, Secret} | Acc]); init_worker_options([{w_mode, V} | R], Acc) -> init_worker_options(R, [{w_mode, V} | Acc]); init_worker_options([{r_mode, V} | R], Acc) -> diff --git a/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl b/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl index 38cd83d47..90bad1f96 100644 --- a/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl +++ b/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl @@ -20,6 +20,7 @@ -include("emqx_connector.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("stdlib/include/assert.hrl"). @@ -65,27 +66,36 @@ t_lifecycle(_Config) -> mongo_config() ). +t_start_passfile(Config) -> + ResourceID = atom_to_binary(?FUNCTION_NAME), + PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(PasswordFilename, mongo_password()), + InitialConfig = emqx_utils_maps:deep_merge(mongo_config(), #{ + <<"config">> => #{ + <<"password">> => iolist_to_binary(["file://", PasswordFilename]) + } + }), + ?assertMatch( + #{status := connected}, + create_local_resource(ResourceID, check_config(InitialConfig)) + ), + ?assertEqual( + ok, + emqx_resource:remove_local(ResourceID) + ). + perform_lifecycle_check(ResourceId, InitialConfig) -> - {ok, #{config := CheckedConfig}} = - emqx_resource:check_config(?MONGO_RESOURCE_MOD, InitialConfig), - {ok, #{ + CheckedConfig = check_config(InitialConfig), + #{ state := #{pool_name := PoolName} = State, status := InitialStatus - }} = - emqx_resource:create_local( - ResourceId, - ?CONNECTOR_RESOURCE_GROUP, - ?MONGO_RESOURCE_MOD, - CheckedConfig, - #{} - ), + } = create_local_resource(ResourceId, CheckedConfig), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource {ok, ?CONNECTOR_RESOURCE_GROUP, #{ state := State, status := InitialStatus - }} = - emqx_resource:get_instance(ResourceId), + }} = emqx_resource:get_instance(ResourceId), ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), % % Perform query as further check that the resource is working as expected ?assertMatch({ok, []}, emqx_resource:query(ResourceId, test_query_find())), @@ -123,24 +133,52 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> % %% Helpers % %%------------------------------------------------------------------------------ +check_config(Config) -> + {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MONGO_RESOURCE_MOD, Config), + CheckedConfig. + +create_local_resource(ResourceId, CheckedConfig) -> + {ok, Bridge} = emqx_resource:create_local( + ResourceId, + ?CONNECTOR_RESOURCE_GROUP, + ?MONGO_RESOURCE_MOD, + CheckedConfig, + #{} + ), + Bridge. + mongo_config() -> RawConfig = list_to_binary( io_lib:format( - "" - "\n" - " mongo_type = single\n" - " database = mqtt\n" - " pool_size = 8\n" - " server = \"~s:~b\"\n" - " " - "", - [?MONGO_HOST, ?MONGO_DEFAULT_PORT] + "\n mongo_type = single" + "\n database = mqtt" + "\n pool_size = 8" + "\n server = \"~s:~b\"" + "\n auth_source = ~p" + "\n username = ~p" + "\n password = ~p" + "\n", + [ + ?MONGO_HOST, + ?MONGO_DEFAULT_PORT, + mongo_authsource(), + mongo_username(), + mongo_password() + ] ) ), - {ok, Config} = hocon:binary(RawConfig), #{<<"config">> => Config}. +mongo_authsource() -> + os:getenv("MONGO_AUTHSOURCE", "admin"). + +mongo_username() -> + os:getenv("MONGO_USERNAME", ""). + +mongo_password() -> + os:getenv("MONGO_PASSWORD", ""). + test_query_find() -> {find, <<"foo">>, #{}, #{}}. diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index d8b7994ab..37dc3c207 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -280,7 +280,10 @@ do_check_prepares(#{prepares := {error, _}} = State) -> %% =================================================================== connect(Options) -> - mysql:start_link(Options). + %% TODO: teach `tdengine` to accept 0-arity closures as passwords. + {value, {password, Secret}, Rest} = lists:keytake(password, 1, Options), + NOptions = [{password, emqx_secret:unwrap(Secret)} | Rest], + mysql:start_link(NOptions). init_prepare(State = #{query_templates := Templates}) -> case maps:size(Templates) of diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 185af51e8..7740517ca 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 487c1d389..c12086fce 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -95,7 +95,7 @@ on_start( {host, Host}, {port, Port}, {user, emqx_utils_conv:str(User)}, - {password, jamdb_secret:wrap(maps:get(password, Config, ""))}, + {password, maps:get(password, Config, "")}, {sid, emqx_utils_conv:str(Sid)}, {service_name, ServiceName}, {pool_size, maps:get(pool_size, Config, ?DEFAULT_POOL_SIZE)}, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 814d8a074..ba1ad4be5 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -131,7 +131,7 @@ on_start( {host, Host}, {port, Port}, {username, User}, - {password, emqx_secret:wrap(maps:get(password, Config, ""))}, + {password, maps:get(password, Config, emqx_secret:wrap(""))}, {database, DB}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} @@ -357,6 +357,7 @@ validate_table_existence([], _SQL) -> connect(Opts) -> Host = proplists:get_value(host, Opts), Username = proplists:get_value(username, Opts), + %% TODO: teach `epgsql` to accept 0-arity closures as passwords. Password = emqx_secret:unwrap(proplists:get_value(password, Opts)), case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of {ok, _Conn} = Ok -> diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index d07501bcd..44137546d 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -147,7 +147,7 @@ on_start( [ {pool_size, PoolSize}, {username, maps:get(username, Config, undefined)}, - {password, eredis_secret:wrap(maps:get(password, Config, ""))}, + {password, maps:get(password, Config, "")}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} ] ++ Database ++ Servers, Options = @@ -296,7 +296,7 @@ redis_fields() -> [ {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {username, fun emqx_connector_schema_lib:username/1}, - {password, fun emqx_connector_schema_lib:password/1}, + {password, emqx_connector_schema_lib:password_field()}, {database, #{ type => non_neg_integer(), default => 0, diff --git a/changes/ce/feat-11896.en.md b/changes/ce/feat-11896.en.md new file mode 100644 index 000000000..75668e030 --- /dev/null +++ b/changes/ce/feat-11896.en.md @@ -0,0 +1 @@ +Support configuring authentication-related sensitive fields in bridges (i.e. passwords, tokens, secret keys) via secrets stored as files in the file system, through special `file://` prefix. diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index b6cec5d74..4de3b4d7c 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -40,6 +40,7 @@ ATTACH='no' STOP='no' IS_CI='no' ODBC_REQUEST='no' +UP='up' while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -72,6 +73,7 @@ while [ "$#" -gt 0 ]; do ;; --ci) IS_CI='yes' + UP='up --quiet-pull' shift 1 ;; --) @@ -254,10 +256,8 @@ else INSTALL_ODBC="echo 'msodbc driver not requested'" fi -F_OPTIONS="" - for file in "${FILES[@]}"; do - F_OPTIONS="$F_OPTIONS -f $file" + DC="$DC -f $file" done DOCKER_USER="$(id -u)" @@ -275,15 +275,14 @@ if [ "$STOP" = 'no' ]; then # some left-over log file has to be deleted before a new docker-compose up rm -f '.ci/docker-compose-file/redis/*.log' set +e - # shellcheck disable=2086 # no quotes for F_OPTIONS - $DC $F_OPTIONS up -d --build --remove-orphans + # shellcheck disable=2086 # no quotes for UP + $DC $UP -d --build --remove-orphans RESULT=$? if [ $RESULT -ne 0 ]; then mkdir -p _build/test/logs LOG='_build/test/logs/docker-compose.log' echo "Dumping docker-compose log to $LOG" - # shellcheck disable=2086 # no quotes for F_OPTIONS - $DC $F_OPTIONS logs --no-color --timestamps > "$LOG" + $DC logs --no-color --timestamps > "$LOG" exit 1 fi set -e @@ -309,8 +308,7 @@ fi set +e if [ "$STOP" = 'yes' ]; then - # shellcheck disable=2086 # no quotes for F_OPTIONS - $DC $F_OPTIONS down --remove-orphans + $DC down --remove-orphans elif [ "$ATTACH" = 'yes' ]; then docker exec -it "$ERLANG_CONTAINER" bash elif [ "$CONSOLE" = 'yes' ]; then @@ -335,12 +333,10 @@ else if [ "$RESULT" -ne 0 ]; then LOG='_build/test/logs/docker-compose.log' echo "Dumping docker-compose log to $LOG" - # shellcheck disable=2086 # no quotes for F_OPTIONS - $DC $F_OPTIONS logs --no-color --timestamps > "$LOG" + $DC logs --no-color --timestamps > "$LOG" fi if [ "$KEEP_UP" != 'yes' ]; then - # shellcheck disable=2086 # no quotes for F_OPTIONS - $DC $F_OPTIONS down + $DC down fi exit "$RESULT" fi