Merge pull request #11896 from keynslug/ft/EMQX-10808/opt-file-secret-bridges

feat(bridge): accept wrapped secrets as passwords
This commit is contained in:
Andrew Mayorov 2023-11-14 23:08:38 +07:00 committed by GitHub
commit e80600ca0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 891 additions and 822 deletions

View File

@ -0,0 +1,7 @@
MONGO_USERNAME=emqx
MONGO_PASSWORD=passw0rd
MONGO_AUTHSOURCE=admin
# See "Environment Variables" @ https://hub.docker.com/_/mongo
MONGO_INITDB_ROOT_USERNAME=${MONGO_USERNAME}
MONGO_INITDB_ROOT_PASSWORD=${MONGO_PASSWORD}

View File

@ -9,6 +9,9 @@ services:
- emqx_bridge - emqx_bridge
ports: ports:
- "27017:27017" - "27017:27017"
env_file:
- .env
- credentials.env
command: command:
--ipv6 --ipv6
--bind_ip_all --bind_ip_all

View File

@ -5,6 +5,7 @@ services:
container_name: erlang container_name: erlang
image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04} image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04}
env_file: env_file:
- credentials.env
- conf.env - conf.env
environment: environment:
GITHUB_ACTIONS: ${GITHUB_ACTIONS:-} GITHUB_ACTIONS: ${GITHUB_ACTIONS:-}

View File

@ -278,6 +278,10 @@ raw_mongo_auth_config() ->
<<"server">> => mongo_server(), <<"server">> => mongo_server(),
<<"w_mode">> => <<"unsafe">>, <<"w_mode">> => <<"unsafe">>,
<<"auth_source">> => mongo_authsource(),
<<"username">> => mongo_username(),
<<"password">> => mongo_password(),
<<"filter">> => #{<<"username">> => <<"${username}">>}, <<"filter">> => #{<<"username">> => <<"${username}">>},
<<"password_hash_field">> => <<"password_hash">>, <<"password_hash_field">> => <<"password_hash">>,
<<"salt_field">> => <<"salt">>, <<"salt_field">> => <<"salt">>,
@ -464,9 +468,21 @@ mongo_config() ->
{database, <<"mqtt">>}, {database, <<"mqtt">>},
{host, ?MONGO_HOST}, {host, ?MONGO_HOST},
{port, ?MONGO_DEFAULT_PORT}, {port, ?MONGO_DEFAULT_PORT},
{auth_source, mongo_authsource()},
{login, mongo_username()},
{password, mongo_password()},
{register, ?MONGO_CLIENT} {register, ?MONGO_CLIENT}
]. ].
mongo_authsource() ->
iolist_to_binary(os:getenv("MONGO_AUTHSOURCE", "admin")).
mongo_username() ->
iolist_to_binary(os:getenv("MONGO_USERNAME", "")).
mongo_password() ->
iolist_to_binary(os:getenv("MONGO_PASSWORD", "")).
start_apps(Apps) -> start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps). lists:foreach(fun application:ensure_all_started/1, Apps).

View File

@ -397,6 +397,10 @@ raw_mongo_authz_config() ->
<<"collection">> => <<"acl">>, <<"collection">> => <<"acl">>,
<<"server">> => mongo_server(), <<"server">> => mongo_server(),
<<"auth_source">> => mongo_authsource(),
<<"username">> => mongo_username(),
<<"password">> => mongo_password(),
<<"filter">> => #{<<"username">> => <<"${username}">>} <<"filter">> => #{<<"username">> => <<"${username}">>}
}. }.
@ -408,9 +412,21 @@ mongo_config() ->
{database, <<"mqtt">>}, {database, <<"mqtt">>},
{host, ?MONGO_HOST}, {host, ?MONGO_HOST},
{port, ?MONGO_DEFAULT_PORT}, {port, ?MONGO_DEFAULT_PORT},
{auth_source, mongo_authsource()},
{login, mongo_username()},
{password, mongo_password()},
{register, ?MONGO_CLIENT} {register, ?MONGO_CLIENT}
]. ].
mongo_authsource() ->
iolist_to_binary(os:getenv("MONGO_AUTHSOURCE", "admin")).
mongo_username() ->
iolist_to_binary(os:getenv("MONGO_USERNAME", "")).
mongo_password() ->
iolist_to_binary(os:getenv("MONGO_PASSWORD", "")).
start_apps(Apps) -> start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps). lists:foreach(fun application:ensure_all_started/1, Apps).

View File

@ -356,9 +356,10 @@ parse_confs(<<"iotdb">>, Name, Conf) ->
authentication := authentication :=
#{ #{
username := Username, username := Username,
password := Password password := Secret
} }
} = Conf, } = Conf,
Password = emqx_secret:unwrap(Secret),
BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>), BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
%% This version atom correspond to the macro ?VSN_1_1_X in %% This version atom correspond to the macro ?VSN_1_1_X in
%% emqx_bridge_iotdb.hrl. It would be better to use the macro directly, but %% emqx_bridge_iotdb.hrl. It would be better to use the macro directly, but

View File

@ -70,7 +70,7 @@ cassandra_db_fields() ->
{keyspace, fun keyspace/1}, {keyspace, fun keyspace/1},
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1}, {username, fun emqx_connector_schema_lib:username/1},
{password, fun emqx_connector_schema_lib:password/1}, {password, emqx_connector_schema_lib:password_field()},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ].
@ -111,14 +111,14 @@ on_start(
emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION) emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION)
), ),
Options = [ Options =
{nodes, Servers}, maps:to_list(maps:with([username, password], Config)) ++
{keyspace, Keyspace}, [
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {nodes, Servers},
{pool_size, PoolSize} {keyspace, Keyspace},
], {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
Options1 = maybe_add_opt(username, Config, Options), {pool_size, PoolSize}
Options2 = maybe_add_opt(password, Config, Options1, _IsSensitive = true), ],
SslOpts = SslOpts =
case maps:get(enable, SSL) of case maps:get(enable, SSL) of
@ -131,7 +131,7 @@ on_start(
[] []
end, end,
State = parse_prepare_cql(Config), State = parse_prepare_cql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options2 ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
{error, Reason} -> {error, Reason} ->
@ -387,6 +387,7 @@ conn_opts(Opts) ->
conn_opts([], Acc) -> conn_opts([], Acc) ->
Acc; Acc;
conn_opts([{password, Password} | Opts], Acc) -> conn_opts([{password, Password} | Opts], Acc) ->
%% TODO: teach `ecql` to accept 0-arity closures as passwords.
conn_opts(Opts, [{password, emqx_secret:unwrap(Password)} | Acc]); conn_opts(Opts, [{password, emqx_secret:unwrap(Password)} | Acc]);
conn_opts([Opt | Opts], Acc) -> conn_opts([Opt | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]). conn_opts(Opts, [Opt | Acc]).
@ -512,19 +513,3 @@ maybe_assign_type(V) when is_integer(V) ->
maybe_assign_type(V) when is_float(V) -> {double, V}; maybe_assign_type(V) when is_float(V) -> {double, V};
maybe_assign_type(V) -> maybe_assign_type(V) ->
V. V.
maybe_add_opt(Key, Conf, Opts) ->
maybe_add_opt(Key, Conf, Opts, _IsSensitive = false).
maybe_add_opt(Key, Conf, Opts, IsSensitive) ->
case Conf of
#{Key := Val} ->
[{Key, maybe_wrap(IsSensitive, Val)} | Opts];
_ ->
Opts
end.
maybe_wrap(false = _IsSensitive, Val) ->
Val;
maybe_wrap(true, Val) ->
emqx_secret:wrap(Val).

View File

@ -40,10 +40,9 @@ all() ->
]. ].
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[ [
{auth, TCs}, {auth, [t_lifecycle, t_start_passfile]},
{noauth, TCs} {noauth, [t_lifecycle]}
]. ].
cassandra_servers(CassandraHost) -> cassandra_servers(CassandraHost) ->
@ -115,32 +114,37 @@ end_per_testcase(_, _Config) ->
t_lifecycle(Config) -> t_lifecycle(Config) ->
perform_lifecycle_check( perform_lifecycle_check(
<<"emqx_connector_cassandra_SUITE">>, <<?MODULE_STRING>>,
cassandra_config(Config) cassandra_config(Config)
). ).
show(X) -> t_start_passfile(Config) ->
erlang:display(X), ResourceID = atom_to_binary(?FUNCTION_NAME),
X. PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(PasswordFilename, ?CASSA_PASSWORD),
show(Label, What) -> InitialConfig = emqx_utils_maps:deep_merge(
erlang:display({Label, What}), cassandra_config(Config),
What. #{
<<"config">> => #{
password => iolist_to_binary(["file://", PasswordFilename])
}
}
),
?assertMatch(
#{status := connected},
create_local_resource(ResourceID, check_config(InitialConfig))
),
?assertEqual(
ok,
emqx_resource:remove_local(ResourceID)
).
perform_lifecycle_check(ResourceId, InitialConfig) -> perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} = CheckedConfig = check_config(InitialConfig),
emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, InitialConfig), #{
{ok, #{
state := #{pool_name := PoolName} = State, state := #{pool_name := PoolName} = State,
status := InitialStatus status := InitialStatus
}} = } = create_local_resource(ResourceId, CheckedConfig),
emqx_resource:create_local(
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?CASSANDRA_RESOURCE_MOD,
CheckedConfig,
#{}
),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{ {ok, ?CONNECTOR_RESOURCE_GROUP, #{
@ -191,6 +195,21 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
%% utils %% utils
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check_config(Config) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, Config),
CheckedConfig.
create_local_resource(ResourceId, CheckedConfig) ->
{ok, Bridge} =
emqx_resource:create_local(
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?CASSANDRA_RESOURCE_MOD,
CheckedConfig,
#{}
),
Bridge.
cassandra_config(Config) -> cassandra_config(Config) ->
Host = ?config(cassa_host, Config), Host = ?config(cassa_host, Config),
AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)), AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_clickhouse, [ {application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"}, {description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.2.3"}, {vsn, "0.2.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -145,7 +145,7 @@ on_start(
Options = [ Options = [
{url, URL}, {url, URL},
{user, maps:get(username, Config, "default")}, {user, maps:get(username, Config, "default")},
{key, emqx_secret:wrap(maps:get(password, Config, "public"))}, {key, maps:get(password, Config, emqx_secret:wrap("public"))},
{database, DB}, {database, DB},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize}, {pool_size, PoolSize},
@ -243,6 +243,7 @@ connect(Options) ->
URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))), URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
User = proplists:get_value(user, Options), User = proplists:get_value(user, Options),
Database = proplists:get_value(database, Options), Database = proplists:get_value(database, Options),
%% TODO: teach `clickhouse` to accept 0-arity closures as passwords.
Key = emqx_secret:unwrap(proplists:get_value(key, Options)), Key = emqx_secret:unwrap(proplists:get_value(key, Options)),
Pool = proplists:get_value(pool, Options), Pool = proplists:get_value(pool, Options),
PoolSize = proplists:get_value(pool_size, Options), PoolSize = proplists:get_value(pool_size, Options),

View File

@ -10,10 +10,12 @@
-include("emqx_connector.hrl"). -include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-define(APP, emqx_bridge_clickhouse). -define(APP, emqx_bridge_clickhouse).
-define(CLICKHOUSE_HOST, "clickhouse"). -define(CLICKHOUSE_HOST, "clickhouse").
-define(CLICKHOUSE_RESOURCE_MOD, emqx_bridge_clickhouse_connector). -define(CLICKHOUSE_RESOURCE_MOD, emqx_bridge_clickhouse_connector).
-define(CLICKHOUSE_PASSWORD, "public").
%% This test SUITE requires a running clickhouse instance. If you don't want to %% This test SUITE requires a running clickhouse instance. If you don't want to
%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script %% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
@ -57,7 +59,7 @@ init_per_suite(Config) ->
clickhouse:start_link([ clickhouse:start_link([
{url, clickhouse_url()}, {url, clickhouse_url()},
{user, <<"default">>}, {user, <<"default">>},
{key, "public"}, {key, ?CLICKHOUSE_PASSWORD},
{pool, tmp_pool} {pool, tmp_pool}
]), ]),
{ok, _, _} = clickhouse:query(Conn, <<"CREATE DATABASE IF NOT EXISTS mqtt">>, #{}), {ok, _, _} = clickhouse:query(Conn, <<"CREATE DATABASE IF NOT EXISTS mqtt">>, #{}),
@ -92,6 +94,31 @@ t_lifecycle(_Config) ->
clickhouse_config() clickhouse_config()
). ).
t_start_passfile(Config) ->
ResourceID = atom_to_binary(?FUNCTION_NAME),
PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(PasswordFilename, <<?CLICKHOUSE_PASSWORD>>),
InitialConfig = clickhouse_config(#{
password => iolist_to_binary(["file://", PasswordFilename])
}),
{ok, #{config := ResourceConfig}} =
emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig),
?assertMatch(
{ok, #{status := connected}},
emqx_resource:create_local(
ResourceID,
?CONNECTOR_RESOURCE_GROUP,
?CLICKHOUSE_RESOURCE_MOD,
ResourceConfig,
#{}
)
),
?assertEqual(
ok,
emqx_resource:remove_local(ResourceID)
),
ok.
show(X) -> show(X) ->
erlang:display(X), erlang:display(X),
X. X.
@ -168,12 +195,15 @@ perform_lifecycle_check(ResourceID, InitialConfig) ->
% %%------------------------------------------------------------------------------ % %%------------------------------------------------------------------------------
clickhouse_config() -> clickhouse_config() ->
clickhouse_config(#{}).
clickhouse_config(Overrides) ->
Config = Config =
#{ #{
auto_reconnect => true, auto_reconnect => true,
database => <<"mqtt">>, database => <<"mqtt">>,
username => <<"default">>, username => <<"default">>,
password => <<"public">>, password => <<?CLICKHOUSE_PASSWORD>>,
pool_size => 8, pool_size => 8,
url => iolist_to_binary( url => iolist_to_binary(
io_lib:format( io_lib:format(
@ -186,7 +216,7 @@ clickhouse_config() ->
), ),
connect_timeout => <<"10s">> connect_timeout => <<"10s">>
}, },
#{<<"config">> => Config}. #{<<"config">> => maps:merge(Config, Overrides)}.
test_query_no_params() -> test_query_no_params() ->
{query, <<"SELECT 1">>}. {query, <<"SELECT 1">>}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_dynamo, [ {application, emqx_bridge_dynamo, [
{description, "EMQX Enterprise Dynamo Bridge"}, {description, "EMQX Enterprise Dynamo Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -45,12 +45,10 @@ fields(config) ->
#{required => true, desc => ?DESC("aws_access_key_id")} #{required => true, desc => ?DESC("aws_access_key_id")}
)}, )},
{aws_secret_access_key, {aws_secret_access_key,
mk( emqx_schema_secret:mk(
binary(),
#{ #{
required => true, required => true,
desc => ?DESC("aws_secret_access_key"), desc => ?DESC("aws_secret_access_key")
sensitive => true
} }
)}, )},
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
@ -89,7 +87,7 @@ on_start(
host => Host, host => Host,
port => Port, port => Port,
aws_access_key_id => to_str(AccessKeyID), aws_access_key_id => to_str(AccessKeyID),
aws_secret_access_key => to_str(SecretAccessKey), aws_secret_access_key => SecretAccessKey,
schema => Schema schema => Schema
}}, }},
{pool_size, PoolSize} {pool_size, PoolSize}
@ -182,9 +180,8 @@ do_query(
end. end.
connect(Opts) -> connect(Opts) ->
Options = proplists:get_value(config, Opts), Config = proplists:get_value(config, Opts),
{ok, _Pid} = Result = emqx_bridge_dynamo_connector_client:start_link(Options), {ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config).
Result.
parse_template(Config) -> parse_template(Config) ->
Templates = Templates =

View File

@ -20,8 +20,7 @@
handle_cast/2, handle_cast/2,
handle_info/2, handle_info/2,
terminate/2, terminate/2,
code_change/3, code_change/3
format_status/2
]). ]).
-ifdef(TEST). -ifdef(TEST).
@ -62,11 +61,13 @@ start_link(Options) ->
%% Initialize dynamodb data bridge %% Initialize dynamodb data bridge
init(#{ init(#{
aws_access_key_id := AccessKeyID, aws_access_key_id := AccessKeyID,
aws_secret_access_key := SecretAccessKey, aws_secret_access_key := Secret,
host := Host, host := Host,
port := Port, port := Port,
schema := Schema schema := Schema
}) -> }) ->
%% TODO: teach `erlcloud` to to accept 0-arity closures as passwords.
SecretAccessKey = to_str(emqx_secret:unwrap(Secret)),
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
{ok, #{}}. {ok, #{}}.
@ -101,13 +102,6 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
-spec format_status(
Opt :: normal | terminate,
Status :: list()
) -> Status :: term().
format_status(_Opt, Status) ->
Status.
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
@ -184,3 +178,8 @@ convert2binary(Value) when is_list(Value) ->
unicode:characters_to_binary(Value); unicode:characters_to_binary(Value);
convert2binary(Value) when is_map(Value) -> convert2binary(Value) when is_map(Value) ->
emqx_utils_json:encode(Value). emqx_utils_json:encode(Value).
to_str(List) when is_list(List) ->
List;
to_str(Bin) when is_binary(Bin) ->
erlang:binary_to_list(Bin).

View File

@ -22,8 +22,6 @@
-define(BATCH_SIZE, 10). -define(BATCH_SIZE, 10).
-define(PAYLOAD, <<"HELLO">>). -define(PAYLOAD, <<"HELLO">>).
-define(GET_CONFIG(KEY__, CFG__), proplists:get_value(KEY__, CFG__)).
%% How to run it locally (all commands are run in $PROJ_ROOT dir): %% How to run it locally (all commands are run in $PROJ_ROOT dir):
%% run ct in docker container %% run ct in docker container
%% run script: %% run script:
@ -84,7 +82,9 @@ end_per_group(_Group, _Config) ->
ok. ok.
init_per_suite(Config) -> init_per_suite(Config) ->
Config. SecretFile = filename:join(?config(priv_dir, Config), "secret"),
ok = file:write_file(SecretFile, <<?SECRET_ACCESS_KEY>>),
[{dynamo_secretfile, SecretFile} | Config].
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(), emqx_mgmt_api_test_util:end_suite(),
@ -158,32 +158,35 @@ common_init(ConfigT) ->
end. end.
dynamo_config(BridgeType, Config) -> dynamo_config(BridgeType, Config) ->
Port = integer_to_list(?GET_CONFIG(port, Config)), Host = ?config(host, Config),
Url = "http://" ++ ?GET_CONFIG(host, Config) ++ ":" ++ Port, Port = ?config(port, Config),
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
BatchSize = ?GET_CONFIG(batch_size, Config), BatchSize = ?config(batch_size, Config),
QueryMode = ?GET_CONFIG(query_mode, Config), QueryMode = ?config(query_mode, Config),
SecretFile = ?config(dynamo_secretfile, Config),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {"
" enable = true\n" "\n enable = true"
" url = ~p\n" "\n url = \"http://~s:~p\""
" table = ~p\n" "\n table = ~p"
" aws_access_key_id = ~p\n" "\n aws_access_key_id = ~p"
" aws_secret_access_key = ~p\n" "\n aws_secret_access_key = ~p"
" resource_opts = {\n" "\n resource_opts = {"
" request_ttl = 500ms\n" "\n request_ttl = 500ms"
" batch_size = ~b\n" "\n batch_size = ~b"
" query_mode = ~s\n" "\n query_mode = ~s"
" }\n" "\n }"
"}", "\n }",
[ [
BridgeType, BridgeType,
Name, Name,
Url, Host,
Port,
?TABLE, ?TABLE,
?ACCESS_KEY_ID, ?ACCESS_KEY_ID,
?SECRET_ACCESS_KEY, %% NOTE: using file-based secrets with HOCON configs
"file://" ++ SecretFile,
BatchSize, BatchSize,
QueryMode QueryMode
] ]
@ -252,8 +255,8 @@ delete_table(_Config) ->
erlcloud_ddb2:delete_table(?TABLE_BIN). erlcloud_ddb2:delete_table(?TABLE_BIN).
setup_dynamo(Config) -> setup_dynamo(Config) ->
Host = ?GET_CONFIG(host, Config), Host = ?config(host, Config),
Port = ?GET_CONFIG(port, Config), Port = ?config(port, Config),
erlcloud_ddb2:configure(?ACCESS_KEY_ID, ?SECRET_ACCESS_KEY, Host, Port, ?SCHEMA). erlcloud_ddb2:configure(?ACCESS_KEY_ID, ?SECRET_ACCESS_KEY, Host, Port, ?SCHEMA).
directly_setup_dynamo() -> directly_setup_dynamo() ->
@ -313,7 +316,9 @@ t_setup_via_http_api_and_publish(Config) ->
PgsqlConfig0 = ?config(dynamo_config, Config), PgsqlConfig0 = ?config(dynamo_config, Config),
PgsqlConfig = PgsqlConfig0#{ PgsqlConfig = PgsqlConfig0#{
<<"name">> => Name, <<"name">> => Name,
<<"type">> => BridgeType <<"type">> => BridgeType,
%% NOTE: using literal secret with HTTP API requests.
<<"aws_secret_access_key">> => <<?SECRET_ACCESS_KEY>>
}, },
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -400,7 +405,7 @@ t_simple_query(Config) ->
), ),
Request = {get_item, {<<"id">>, <<"not_exists">>}}, Request = {get_item, {<<"id">>, <<"not_exists">>}},
Result = query_resource(Config, Request), Result = query_resource(Config, Request),
case ?GET_CONFIG(batch_size, Config) of case ?config(batch_size, Config) of
?BATCH_SIZE -> ?BATCH_SIZE ->
?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result); ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result);
1 -> 1 ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [ {application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"}, {description, "EMQX GreptimeDB Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -147,13 +147,7 @@ fields(greptimedb) ->
[ [
{dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})}, {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})},
{username, mk(binary(), #{desc => ?DESC("username")})}, {username, mk(binary(), #{desc => ?DESC("username")})},
{password, {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
mk(binary(), #{
desc => ?DESC("password"),
format => <<"password">>,
sensitive => true,
converter => fun emqx_schema:password_converter/2
})}
] ++ emqx_connector_schema_lib:ssl_fields(). ] ++ emqx_connector_schema_lib:ssl_fields().
server() -> server() ->
@ -302,7 +296,8 @@ ssl_config(SSL = #{enable := true}) ->
auth(#{username := Username, password := Password}) -> auth(#{username := Username, password := Password}) ->
[ [
{auth, {basic, #{username => str(Username), password => str(Password)}}} %% TODO: teach `greptimedb` to accept 0-arity closures as passwords.
{auth, {basic, #{username => str(Username), password => emqx_secret:unwrap(Password)}}}
]; ];
auth(_) -> auth(_) ->
[]. [].

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [ {application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"}, {description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -192,20 +192,14 @@ fields(influxdb_api_v1) ->
[ [
{database, mk(binary(), #{required => true, desc => ?DESC("database")})}, {database, mk(binary(), #{required => true, desc => ?DESC("database")})},
{username, mk(binary(), #{desc => ?DESC("username")})}, {username, mk(binary(), #{desc => ?DESC("username")})},
{password, {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
mk(binary(), #{
desc => ?DESC("password"),
format => <<"password">>,
sensitive => true,
converter => fun emqx_schema:password_converter/2
})}
] ++ emqx_connector_schema_lib:ssl_fields(); ] ++ emqx_connector_schema_lib:ssl_fields();
fields(influxdb_api_v2) -> fields(influxdb_api_v2) ->
fields(common) ++ fields(common) ++
[ [
{bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
{org, mk(binary(), #{required => true, desc => ?DESC("org")})}, {org, mk(binary(), #{required => true, desc => ?DESC("org")})},
{token, mk(binary(), #{required => true, desc => ?DESC("token")})} {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})}
] ++ emqx_connector_schema_lib:ssl_fields(). ] ++ emqx_connector_schema_lib:ssl_fields().
server() -> server() ->
@ -363,7 +357,8 @@ protocol_config(#{
{version, v2}, {version, v2},
{bucket, str(Bucket)}, {bucket, str(Bucket)},
{org, str(Org)}, {org, str(Org)},
{token, Token} %% TODO: teach `influxdb` to accept 0-arity closures as passwords.
{token, emqx_secret:unwrap(Token)}
] ++ ssl_config(SSL). ] ++ ssl_config(SSL).
ssl_config(#{enable := false}) -> ssl_config(#{enable := false}) ->
@ -383,7 +378,8 @@ username(_) ->
[]. [].
password(#{password := Password}) -> password(#{password := Password}) ->
[{password, str(Password)}]; %% TODO: teach `influxdb` to accept 0-arity closures as passwords.
[{password, str(emqx_secret:unwrap(Password))}];
password(_) -> password(_) ->
[]. [].

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [ {application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"}, {description, "EMQX Enterprise Apache IoTDB Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{modules, [ {modules, [
emqx_bridge_iotdb, emqx_bridge_iotdb,
emqx_bridge_iotdb_impl emqx_bridge_iotdb_impl

View File

@ -51,12 +51,9 @@ fields(auth_basic) ->
[ [
{username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
{password, {password,
mk(binary(), #{ emqx_schema_secret:mk(#{
required => true, required => true,
desc => ?DESC("config_auth_basic_password"), desc => ?DESC("config_auth_basic_password")
format => <<"password">>,
sensitive => true,
converter => fun emqx_schema:password_converter/2
})} })}
]. ].

View File

@ -283,11 +283,9 @@ fields(auth_username_password) ->
})}, })},
{username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})}, {username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})},
{password, {password,
mk(binary(), #{ emqx_connector_schema_lib:password_field(#{
required => true, required => true,
sensitive => true, desc => ?DESC(auth_sasl_password)
desc => ?DESC(auth_sasl_password),
converter => fun emqx_schema:password_converter/2
})} })}
]; ];
fields(auth_gssapi_kerberos) -> fields(auth_gssapi_kerberos) ->

View File

@ -31,8 +31,8 @@ make_client_id(BridgeType0, BridgeName0) ->
sasl(none) -> sasl(none) ->
undefined; undefined;
sasl(#{mechanism := Mechanism, username := Username, password := Password}) -> sasl(#{mechanism := Mechanism, username := Username, password := Secret}) ->
{Mechanism, Username, emqx_secret:wrap(Password)}; {Mechanism, Username, Secret};
sasl(#{ sasl(#{
kerberos_principal := Principal, kerberos_principal := Principal,
kerberos_keytab_file := KeyTabFile kerberos_keytab_file := KeyTabFile

View File

@ -30,29 +30,41 @@ all() ->
]. ].
groups() -> groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE), SASLGroups = [
SASLAuths = [ {sasl_auth_plain, testcases(sasl)},
sasl_auth_plain, {sasl_auth_scram256, testcases(sasl)},
sasl_auth_scram256, {sasl_auth_scram512, testcases(sasl)},
sasl_auth_scram512, {sasl_auth_kerberos, testcases(sasl_auth_kerberos)}
sasl_auth_kerberos
], ],
SASLAuthGroups = [{group, Type} || Type <- SASLAuths], SASLAuthGroups = [{group, Group} || {Group, _} <- SASLGroups],
OnlyOnceTCs = only_once_tests(),
MatrixTCs = AllTCs -- OnlyOnceTCs,
SASLTests = [{Group, MatrixTCs} || Group <- SASLAuths],
[ [
{plain, MatrixTCs ++ OnlyOnceTCs}, {plain, testcases(plain)},
{ssl, MatrixTCs}, {ssl, testcases(common)},
{sasl_plain, SASLAuthGroups}, {sasl_plain, SASLAuthGroups},
{sasl_ssl, SASLAuthGroups} {sasl_ssl, SASLAuthGroups}
] ++ SASLTests. | SASLGroups
].
sasl_only_tests() -> testcases(all) ->
[t_failed_creation_then_fixed]. emqx_common_test_helpers:all(?MODULE);
testcases(plain) ->
%% tests that do not need to be run on all groups %% NOTE: relevant only for a subset of SASL testcases
only_once_tests() -> Exclude = [t_failed_creation_then_fixed],
testcases(all) -- Exclude;
testcases(common) ->
testcases(plain) -- testcases(once);
testcases(sasl) ->
testcases(all) -- testcases(once);
testcases(sasl_auth_kerberos) ->
%% NOTE: need a proxy to run these tests
Exclude = [
t_failed_creation_then_fixed,
t_on_get_status,
t_receive_after_recovery
],
testcases(sasl) -- Exclude;
testcases(once) ->
%% tests that do not need to be run on all groups
[ [
t_begin_offset_earliest, t_begin_offset_earliest,
t_bridge_rule_action_source, t_bridge_rule_action_source,
@ -220,7 +232,7 @@ init_per_group(sasl_auth_kerberos, Config0) ->
(KV) -> (KV) ->
KV KV
end, end,
[{has_proxy, false}, {sasl_auth_mechanism, kerberos} | Config0] [{sasl_auth_mechanism, kerberos} | Config0]
), ),
Config; Config;
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
@ -264,43 +276,6 @@ end_per_group(Group, Config) when
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(TestCase, Config) when
TestCase =:= t_failed_creation_then_fixed
->
KafkaType = ?config(kafka_type, Config),
AuthMechanism = ?config(sasl_auth_mechanism, Config),
IsSASL = lists:member(KafkaType, [sasl_plain, sasl_ssl]),
case {IsSASL, AuthMechanism} of
{true, kerberos} ->
[{skip_does_not_apply, true}];
{true, _} ->
common_init_per_testcase(TestCase, Config);
{false, _} ->
[{skip_does_not_apply, true}]
end;
init_per_testcase(TestCase, Config) when
TestCase =:= t_failed_creation_then_fixed
->
%% test with one partiton only for this case because
%% the wait probe may not be always sent to the same partition
HasProxy = proplists:get_value(has_proxy, Config, true),
case HasProxy of
false ->
[{skip_does_not_apply, true}];
true ->
common_init_per_testcase(TestCase, [{num_partitions, 1} | Config])
end;
init_per_testcase(TestCase, Config) when
TestCase =:= t_on_get_status;
TestCase =:= t_receive_after_recovery
->
HasProxy = proplists:get_value(has_proxy, Config, true),
case HasProxy of
false ->
[{skip_does_not_apply, true}];
true ->
common_init_per_testcase(TestCase, Config)
end;
init_per_testcase(t_cluster_group = TestCase, Config0) -> init_per_testcase(t_cluster_group = TestCase, Config0) ->
Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]), Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]),
common_init_per_testcase(TestCase, Config); common_init_per_testcase(TestCase, Config);
@ -393,30 +368,24 @@ common_init_per_testcase(TestCase, Config0) ->
]. ].
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of ProxyHost = ?config(proxy_host, Config),
true -> ProxyPort = ?config(proxy_port, Config),
ok; ProducersConfigs = ?config(kafka_producers, Config),
false -> emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ProxyHost = ?config(proxy_host, Config), delete_all_bridges(),
ProxyPort = ?config(proxy_port, Config), #{clientid := KafkaProducerClientId, producers := ProducersMapping} =
ProducersConfigs = ?config(kafka_producers, Config), ProducersConfigs,
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), lists:foreach(
delete_all_bridges(), fun(Producers) ->
#{clientid := KafkaProducerClientId, producers := ProducersMapping} = ok = wolff:stop_and_delete_supervised_producers(Producers)
ProducersConfigs, end,
lists:foreach( maps:values(ProducersMapping)
fun(Producers) -> ),
ok = wolff:stop_and_delete_supervised_producers(Producers) ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId),
end, %% in CI, apparently this needs more time since the
maps:values(ProducersMapping) %% machines struggle with all the containers running...
), emqx_common_test_helpers:call_janitor(60_000),
ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), ok = snabbkaffe:stop().
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
@ -1391,14 +1360,6 @@ t_multiple_topic_mappings(Config) ->
ok. ok.
t_on_get_status(Config) -> t_on_get_status(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
do_t_on_get_status(Config)
end.
do_t_on_get_status(Config) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
@ -1421,14 +1382,6 @@ do_t_on_get_status(Config) ->
%% ensure that we can create and use the bridge successfully after %% ensure that we can create and use the bridge successfully after
%% creating it with bad config. %% creating it with bad config.
t_failed_creation_then_fixed(Config) -> t_failed_creation_then_fixed(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
?check_trace(do_t_failed_creation_then_fixed(Config), [])
end.
do_t_failed_creation_then_fixed(Config) ->
ct:timetrap({seconds, 180}), ct:timetrap({seconds, 180}),
MQTTTopic = ?config(mqtt_topic, Config), MQTTTopic = ?config(mqtt_topic, Config),
MQTTQoS = ?config(mqtt_qos, Config), MQTTQoS = ?config(mqtt_qos, Config),
@ -1516,14 +1469,6 @@ do_t_failed_creation_then_fixed(Config) ->
%% recovering from a network partition will make the subscribers %% recovering from a network partition will make the subscribers
%% consume the messages produced during the down time. %% consume the messages produced during the down time.
t_receive_after_recovery(Config) -> t_receive_after_recovery(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
do_t_receive_after_recovery(Config)
end.
do_t_receive_after_recovery(Config) ->
ct:timetrap(120_000), ct:timetrap(120_000),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),

View File

@ -28,13 +28,8 @@
). ).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx_dashboard/include/emqx_dashboard.hrl").
-define(HOST, "http://127.0.0.1:18083"). -define(HOST, "http://127.0.0.1:18083").
%% -define(API_VERSION, "v5").
-define(BASE_PATH, "/api/v5"). -define(BASE_PATH, "/api/v5").
%% NOTE: it's "kafka", but not "kafka_producer" %% NOTE: it's "kafka", but not "kafka_producer"
@ -48,13 +43,6 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
case code:get_object_code(cthr) of
{Module, Code, Filename} ->
{module, Module} = code:load_binary(Module, Filename, Code),
ok;
error ->
error
end,
All0 = emqx_common_test_helpers:all(?MODULE), All0 = emqx_common_test_helpers:all(?MODULE),
All = All0 -- matrix_cases(), All = All0 -- matrix_cases(),
Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
@ -105,23 +93,12 @@ init_per_suite(Config0) ->
emqx_connector, emqx_connector,
emqx_bridge_kafka, emqx_bridge_kafka,
emqx_bridge, emqx_bridge,
emqx_rule_engine emqx_rule_engine,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
], ],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
emqx_mgmt_api_test_util:init_suite(),
wait_until_kafka_is_up(), wait_until_kafka_is_up(),
%% Wait until bridges API is up
(fun WaitUntilRestApiUp() ->
case http_get(["bridges"]) of
{ok, 200, _Res} ->
ok;
Val ->
ct:pal("REST API for bridges not up. Wait and try again. Response: ~p", [Val]),
timer:sleep(1000),
WaitUntilRestApiUp()
end
end)(),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
@ -183,6 +160,7 @@ t_query_mode_async(CtConfig) ->
t_publish(matrix) -> t_publish(matrix) ->
{publish, [ {publish, [
[tcp, none, key_dispatch, sync], [tcp, none, key_dispatch, sync],
[ssl, plain_passfile, random, sync],
[ssl, scram_sha512, random, async], [ssl, scram_sha512, random, async],
[ssl, kerberos, random, sync] [ssl, kerberos, random, sync]
]}; ]};
@ -200,9 +178,15 @@ t_publish(Config) ->
end, end,
Auth1 = Auth1 =
case Auth of case Auth of
none -> "none"; none ->
scram_sha512 -> valid_sasl_scram512_settings(); "none";
kerberos -> valid_sasl_kerberos_settings() plain_passfile ->
Passfile = filename:join(?config(priv_dir, Config), "passfile"),
valid_sasl_plain_passfile_settings(Passfile);
scram_sha512 ->
valid_sasl_scram512_settings();
kerberos ->
valid_sasl_kerberos_settings()
end, end,
ConnCfg = #{ ConnCfg = #{
"bootstrap_hosts" => Hosts, "bootstrap_hosts" => Hosts,
@ -1018,112 +1002,89 @@ hocon_config(Args, ConfigTemplateFun) ->
), ),
Hocon. Hocon.
%% erlfmt-ignore
hocon_config_template() -> hocon_config_template() ->
""" "bridges.kafka.{{ bridge_name }} {"
bridges.kafka.{{ bridge_name }} { "\n bootstrap_hosts = \"{{ kafka_hosts_string }}\""
bootstrap_hosts = \"{{ kafka_hosts_string }}\" "\n enable = true"
enable = true "\n authentication = {{{ authentication }}}"
authentication = {{{ authentication }}} "\n ssl = {{{ ssl }}}"
ssl = {{{ ssl }}} "\n local_topic = \"{{ local_topic }}\""
local_topic = \"{{ local_topic }}\" "\n kafka = {"
kafka = { "\n message = {"
message = { "\n key = \"${clientid}\""
key = \"${clientid}\" "\n value = \"${.payload}\""
value = \"${.payload}\" "\n timestamp = \"${timestamp}\""
timestamp = \"${timestamp}\" "\n }"
} "\n buffer = {"
buffer = { "\n memory_overload_protection = false"
memory_overload_protection = false "\n }"
} "\n partition_strategy = {{ partition_strategy }}"
partition_strategy = {{ partition_strategy }} "\n topic = \"{{ kafka_topic }}\""
topic = \"{{ kafka_topic }}\" "\n query_mode = {{ query_mode }}"
query_mode = {{ query_mode }} "\n }"
} "\n metadata_request_timeout = 5s"
metadata_request_timeout = 5s "\n min_metadata_refresh_interval = 3s"
min_metadata_refresh_interval = 3s "\n socket_opts {"
socket_opts { "\n nodelay = true"
nodelay = true "\n }"
} "\n connect_timeout = 5s"
connect_timeout = 5s "\n }".
}
""".
%% erlfmt-ignore
hocon_config_template_with_headers() -> hocon_config_template_with_headers() ->
""" "bridges.kafka.{{ bridge_name }} {"
bridges.kafka.{{ bridge_name }} { "\n bootstrap_hosts = \"{{ kafka_hosts_string }}\""
bootstrap_hosts = \"{{ kafka_hosts_string }}\" "\n enable = true"
enable = true "\n authentication = {{{ authentication }}}"
authentication = {{{ authentication }}} "\n ssl = {{{ ssl }}}"
ssl = {{{ ssl }}} "\n local_topic = \"{{ local_topic }}\""
local_topic = \"{{ local_topic }}\" "\n kafka = {"
kafka = { "\n message = {"
message = { "\n key = \"${clientid}\""
key = \"${clientid}\" "\n value = \"${.payload}\""
value = \"${.payload}\" "\n timestamp = \"${timestamp}\""
timestamp = \"${timestamp}\" "\n }"
} "\n buffer = {"
buffer = { "\n memory_overload_protection = false"
memory_overload_protection = false "\n }"
} "\n kafka_headers = \"{{ kafka_headers }}\""
kafka_headers = \"{{ kafka_headers }}\" "\n kafka_header_value_encode_mode: json"
kafka_header_value_encode_mode: json "\n kafka_ext_headers: {{{ kafka_ext_headers }}}"
kafka_ext_headers: {{{ kafka_ext_headers }}} "\n partition_strategy = {{ partition_strategy }}"
partition_strategy = {{ partition_strategy }} "\n topic = \"{{ kafka_topic }}\""
topic = \"{{ kafka_topic }}\" "\n query_mode = {{ query_mode }}"
query_mode = {{ query_mode }} "\n }"
} "\n metadata_request_timeout = 5s"
metadata_request_timeout = 5s "\n min_metadata_refresh_interval = 3s"
min_metadata_refresh_interval = 3s "\n socket_opts {"
socket_opts { "\n nodelay = true"
nodelay = true "\n }"
} "\n connect_timeout = 5s"
connect_timeout = 5s "\n }".
}
""".
%% erlfmt-ignore
hocon_config_template_authentication("none") -> hocon_config_template_authentication("none") ->
"none"; "none";
hocon_config_template_authentication(#{"mechanism" := _}) -> hocon_config_template_authentication(#{"mechanism" := _}) ->
""" "{"
{ "\n mechanism = {{ mechanism }}"
mechanism = {{ mechanism }} "\n password = \"{{ password }}\""
password = {{ password }} "\n username = \"{{ username }}\""
username = {{ username }} "\n }";
}
""";
hocon_config_template_authentication(#{"kerberos_principal" := _}) -> hocon_config_template_authentication(#{"kerberos_principal" := _}) ->
""" "{"
{ "\n kerberos_principal = \"{{ kerberos_principal }}\""
kerberos_principal = \"{{ kerberos_principal }}\" "\n kerberos_keytab_file = \"{{ kerberos_keytab_file }}\""
kerberos_keytab_file = \"{{ kerberos_keytab_file }}\" "\n }".
}
""".
%% erlfmt-ignore
hocon_config_template_ssl(Map) when map_size(Map) =:= 0 -> hocon_config_template_ssl(Map) when map_size(Map) =:= 0 ->
""" "{ enable = false }";
{
enable = false
}
""";
hocon_config_template_ssl(#{"enable" := "false"}) -> hocon_config_template_ssl(#{"enable" := "false"}) ->
""" "{ enable = false }";
{
enable = false
}
""";
hocon_config_template_ssl(#{"enable" := "true"}) -> hocon_config_template_ssl(#{"enable" := "true"}) ->
""" "{ enable = true"
{ "\n cacertfile = \"{{{cacertfile}}}\""
enable = true "\n certfile = \"{{{certfile}}}\""
cacertfile = \"{{{cacertfile}}}\" "\n keyfile = \"{{{keyfile}}}\""
certfile = \"{{{certfile}}}\" "\n }".
keyfile = \"{{{keyfile}}}\"
}
""".
kafka_hosts_string(tcp, none) -> kafka_hosts_string(tcp, none) ->
kafka_hosts_string(); kafka_hosts_string();
@ -1197,6 +1158,13 @@ valid_sasl_kerberos_settings() ->
"kerberos_keytab_file" => shared_secret(rig_keytab) "kerberos_keytab_file" => shared_secret(rig_keytab)
}. }.
valid_sasl_plain_passfile_settings(Passfile) ->
Auth = valid_sasl_plain_settings(),
ok = file:write_file(Passfile, maps:get("password", Auth)),
Auth#{
"password" := "file://" ++ Passfile
}.
kafka_hosts() -> kafka_hosts() ->
kpro:parse_endpoints(kafka_hosts_string()). kpro:parse_endpoints(kafka_hosts_string()).

View File

@ -223,144 +223,136 @@ check_atom_key(Conf) when is_map(Conf) ->
%% Data section %% Data section
%%=========================================================================== %%===========================================================================
%% erlfmt-ignore
kafka_producer_old_hocon(_WithLocalTopic = true) -> kafka_producer_old_hocon(_WithLocalTopic = true) ->
kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n"); kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n");
kafka_producer_old_hocon(_WithLocalTopic = false) -> kafka_producer_old_hocon(_WithLocalTopic = false) ->
kafka_producer_old_hocon("mqtt {}\n"); kafka_producer_old_hocon("mqtt {}\n");
kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) -> kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) ->
""" [
bridges.kafka { "bridges.kafka {"
myproducer { "\n myproducer {"
authentication = \"none\" "\n authentication = \"none\""
bootstrap_hosts = \"toxiproxy:9292\" "\n bootstrap_hosts = \"toxiproxy:9292\""
connect_timeout = \"5s\" "\n connect_timeout = \"5s\""
metadata_request_timeout = \"5s\" "\n metadata_request_timeout = \"5s\""
min_metadata_refresh_interval = \"3s\" "\n min_metadata_refresh_interval = \"3s\""
producer { "\n producer {"
kafka { "\n kafka {"
buffer { "\n buffer {"
memory_overload_protection = false "\n memory_overload_protection = false"
mode = \"memory\" "\n mode = \"memory\""
per_partition_limit = \"2GB\" "\n per_partition_limit = \"2GB\""
segment_bytes = \"100MB\" "\n segment_bytes = \"100MB\""
} "\n }"
compression = \"no_compression\" "\n compression = \"no_compression\""
max_batch_bytes = \"896KB\" "\n max_batch_bytes = \"896KB\""
max_inflight = 10 "\n max_inflight = 10"
message { "\n message {"
key = \"${.clientid}\" "\n key = \"${.clientid}\""
timestamp = \"${.timestamp}\" "\n timestamp = \"${.timestamp}\""
value = \"${.}\" "\n value = \"${.}\""
} "\n }"
partition_count_refresh_interval = \"60s\" "\n partition_count_refresh_interval = \"60s\""
partition_strategy = \"random\" "\n partition_strategy = \"random\""
required_acks = \"all_isr\" "\n required_acks = \"all_isr\""
topic = \"test-topic-two-partitions\" "\n topic = \"test-topic-two-partitions\""
} "\n }",
""" ++ MQTTConfig ++ MQTTConfig,
""" "\n }"
} "\n socket_opts {"
socket_opts { "\n nodelay = true"
nodelay = true "\n recbuf = \"1024KB\""
recbuf = \"1024KB\" "\n sndbuf = \"1024KB\""
sndbuf = \"1024KB\" "\n }"
} "\n ssl {enable = false, verify = \"verify_peer\"}"
ssl {enable = false, verify = \"verify_peer\"} "\n }"
} "\n}"
} ].
""".
kafka_producer_new_hocon() -> kafka_producer_new_hocon() ->
"" "bridges.kafka {"
"\n" "\n myproducer {"
"bridges.kafka {\n" "\n authentication = \"none\""
" myproducer {\n" "\n bootstrap_hosts = \"toxiproxy:9292\""
" authentication = \"none\"\n" "\n connect_timeout = \"5s\""
" bootstrap_hosts = \"toxiproxy:9292\"\n" "\n metadata_request_timeout = \"5s\""
" connect_timeout = \"5s\"\n" "\n min_metadata_refresh_interval = \"3s\""
" metadata_request_timeout = \"5s\"\n" "\n kafka {"
" min_metadata_refresh_interval = \"3s\"\n" "\n buffer {"
" kafka {\n" "\n memory_overload_protection = false"
" buffer {\n" "\n mode = \"memory\""
" memory_overload_protection = false\n" "\n per_partition_limit = \"2GB\""
" mode = \"memory\"\n" "\n segment_bytes = \"100MB\""
" per_partition_limit = \"2GB\"\n" "\n }"
" segment_bytes = \"100MB\"\n" "\n compression = \"no_compression\""
" }\n" "\n max_batch_bytes = \"896KB\""
" compression = \"no_compression\"\n" "\n max_inflight = 10"
" max_batch_bytes = \"896KB\"\n" "\n message {"
" max_inflight = 10\n" "\n key = \"${.clientid}\""
" message {\n" "\n timestamp = \"${.timestamp}\""
" key = \"${.clientid}\"\n" "\n value = \"${.}\""
" timestamp = \"${.timestamp}\"\n" "\n }"
" value = \"${.}\"\n" "\n partition_count_refresh_interval = \"60s\""
" }\n" "\n partition_strategy = \"random\""
" partition_count_refresh_interval = \"60s\"\n" "\n required_acks = \"all_isr\""
" partition_strategy = \"random\"\n" "\n topic = \"test-topic-two-partitions\""
" required_acks = \"all_isr\"\n" "\n }"
" topic = \"test-topic-two-partitions\"\n" "\n local_topic = \"mqtt/local\""
" }\n" "\n socket_opts {"
" local_topic = \"mqtt/local\"\n" "\n nodelay = true"
" socket_opts {\n" "\n recbuf = \"1024KB\""
" nodelay = true\n" "\n sndbuf = \"1024KB\""
" recbuf = \"1024KB\"\n" "\n }"
" sndbuf = \"1024KB\"\n" "\n ssl {enable = false, verify = \"verify_peer\"}"
" }\n" "\n resource_opts {"
" ssl {enable = false, verify = \"verify_peer\"}\n" "\n health_check_interval = 10s"
" resource_opts {\n" "\n }"
" health_check_interval = 10s\n" "\n }"
" }\n" "\n}".
" }\n"
"}\n"
"".
%% erlfmt-ignore
kafka_consumer_hocon() -> kafka_consumer_hocon() ->
""" "bridges.kafka_consumer.my_consumer {"
bridges.kafka_consumer.my_consumer { "\n enable = true"
enable = true "\n bootstrap_hosts = \"kafka-1.emqx.net:9292\""
bootstrap_hosts = \"kafka-1.emqx.net:9292\" "\n connect_timeout = 5s"
connect_timeout = 5s "\n min_metadata_refresh_interval = 3s"
min_metadata_refresh_interval = 3s "\n metadata_request_timeout = 5s"
metadata_request_timeout = 5s "\n authentication = {"
authentication = { "\n mechanism = plain"
mechanism = plain "\n username = emqxuser"
username = emqxuser "\n password = password"
password = password "\n }"
} "\n kafka {"
kafka { "\n max_batch_bytes = 896KB"
max_batch_bytes = 896KB "\n max_rejoin_attempts = 5"
max_rejoin_attempts = 5 "\n offset_commit_interval_seconds = 3s"
offset_commit_interval_seconds = 3s "\n offset_reset_policy = latest"
offset_reset_policy = latest "\n }"
} "\n topic_mapping = ["
topic_mapping = [ "\n {"
{ "\n kafka_topic = \"kafka-topic-1\""
kafka_topic = \"kafka-topic-1\" "\n mqtt_topic = \"mqtt/topic/1\""
mqtt_topic = \"mqtt/topic/1\" "\n qos = 1"
qos = 1 "\n payload_template = \"${.}\""
payload_template = \"${.}\" "\n },"
}, "\n {"
{ "\n kafka_topic = \"kafka-topic-2\""
kafka_topic = \"kafka-topic-2\" "\n mqtt_topic = \"mqtt/topic/2\""
mqtt_topic = \"mqtt/topic/2\" "\n qos = 2"
qos = 2 "\n payload_template = \"v = ${.value}\""
payload_template = \"v = ${.value}\" "\n }"
} "\n ]"
] "\n key_encoding_mode = none"
key_encoding_mode = none "\n value_encoding_mode = none"
value_encoding_mode = none "\n ssl {"
ssl { "\n enable = false"
enable = false "\n verify = verify_none"
verify = verify_none "\n server_name_indication = \"auto\""
server_name_indication = \"auto\" "\n }"
} "\n resource_opts {"
resource_opts { "\n health_check_interval = 10s"
health_check_interval = 10s "\n }"
} "\n }".
}
""".
%% assert compatibility %% assert compatibility
bridge_schema_json_test() -> bridge_schema_json_test() ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kinesis, [ {application, emqx_bridge_kinesis, [
{description, "EMQX Enterprise Amazon Kinesis Bridge"}, {description, "EMQX Enterprise Amazon Kinesis Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -62,12 +62,10 @@ fields(connector_config) ->
} }
)}, )},
{aws_secret_access_key, {aws_secret_access_key,
mk( emqx_schema_secret:mk(
binary(),
#{ #{
required => true, required => true,
desc => ?DESC("aws_secret_access_key"), desc => ?DESC("aws_secret_access_key")
sensitive => true
} }
)}, )},
{endpoint, {endpoint,

View File

@ -97,7 +97,13 @@ init(#{
partition_key => PartitionKey, partition_key => PartitionKey,
stream_name => StreamName stream_name => StreamName
}, },
New = %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords.
ok = erlcloud_config:configure(
to_str(AwsAccessKey),
to_str(emqx_secret:unwrap(AwsSecretAccessKey)),
Host,
Port,
Scheme,
fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) -> fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) ->
Config0 = erlcloud_kinesis:new( Config0 = erlcloud_kinesis:new(
AccessKeyID, AccessKeyID,
@ -107,9 +113,7 @@ init(#{
ConnectionScheme ++ "://" ConnectionScheme ++ "://"
), ),
Config0#aws_config{retry_num = MaxRetries} Config0#aws_config{retry_num = MaxRetries}
end, end
erlcloud_config:configure(
to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New
), ),
% check the connection % check the connection
case erlcloud_kinesis:list_streams() of case erlcloud_kinesis:list_streams() of

View File

@ -15,7 +15,7 @@
-type config() :: #{ -type config() :: #{
aws_access_key_id := binary(), aws_access_key_id := binary(),
aws_secret_access_key := binary(), aws_secret_access_key := emqx_secret:t(binary()),
endpoint := binary(), endpoint := binary(),
stream_name := binary(), stream_name := binary(),
partition_key := binary(), partition_key := binary(),

View File

@ -11,10 +11,11 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(PRODUCER, emqx_bridge_kinesis_impl_producer).
-define(BRIDGE_TYPE, kinesis_producer). -define(BRIDGE_TYPE, kinesis_producer).
-define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>). -define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>).
-define(KINESIS_PORT, 4566). -define(KINESIS_PORT, 4566).
-define(KINESIS_ACCESS_KEY, "aws_access_key_id").
-define(KINESIS_SECRET_KEY, "aws_secret_access_key").
-define(TOPIC, <<"t/topic">>). -define(TOPIC, <<"t/topic">>).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -38,6 +39,8 @@ init_per_suite(Config) ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"), ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
ProxyName = "kinesis", ProxyName = "kinesis",
SecretFile = filename:join(?config(priv_dir, Config), "secret"),
ok = file:write_file(SecretFile, <<?KINESIS_SECRET_KEY>>),
ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
{ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_connector),
@ -46,6 +49,7 @@ init_per_suite(Config) ->
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
{proxy_port, ProxyPort}, {proxy_port, ProxyPort},
{kinesis_port, ?KINESIS_PORT}, {kinesis_port, ?KINESIS_PORT},
{kinesis_secretfile, SecretFile},
{proxy_name, ProxyName} {proxy_name, ProxyName}
| Config | Config
]. ].
@ -130,6 +134,7 @@ kinesis_config(Config) ->
Scheme = proplists:get_value(connection_scheme, Config, "http"), Scheme = proplists:get_value(connection_scheme, Config, "http"),
ProxyHost = proplists:get_value(proxy_host, Config), ProxyHost = proplists:get_value(proxy_host, Config),
KinesisPort = proplists:get_value(kinesis_port, Config), KinesisPort = proplists:get_value(kinesis_port, Config),
SecretFile = proplists:get_value(kinesis_secretfile, Config),
BatchSize = proplists:get_value(batch_size, Config, 100), BatchSize = proplists:get_value(batch_size, Config, 100),
BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>), BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>),
PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"), PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"),
@ -140,29 +145,32 @@ kinesis_config(Config) ->
Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>, Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>,
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.kinesis_producer.~s {\n" "bridges.kinesis_producer.~s {"
" enable = true\n" "\n enable = true"
" aws_access_key_id = \"aws_access_key_id\"\n" "\n aws_access_key_id = ~p"
" aws_secret_access_key = \"aws_secret_access_key\"\n" "\n aws_secret_access_key = ~p"
" endpoint = \"~s://~s:~b\"\n" "\n endpoint = \"~s://~s:~b\""
" stream_name = \"~s\"\n" "\n stream_name = \"~s\""
" partition_key = \"~s\"\n" "\n partition_key = \"~s\""
" payload_template = \"~s\"\n" "\n payload_template = \"~s\""
" max_retries = ~b\n" "\n max_retries = ~b"
" pool_size = 1\n" "\n pool_size = 1"
" resource_opts = {\n" "\n resource_opts = {"
" health_check_interval = \"3s\"\n" "\n health_check_interval = \"3s\""
" request_ttl = 30s\n" "\n request_ttl = 30s"
" resume_interval = 1s\n" "\n resume_interval = 1s"
" metrics_flush_interval = \"700ms\"\n" "\n metrics_flush_interval = \"700ms\""
" worker_pool_size = 1\n" "\n worker_pool_size = 1"
" query_mode = ~s\n" "\n query_mode = ~s"
" batch_size = ~b\n" "\n batch_size = ~b"
" batch_time = \"~s\"\n" "\n batch_time = \"~s\""
" }\n" "\n }"
"}\n", "\n }",
[ [
Name, Name,
?KINESIS_ACCESS_KEY,
%% NOTE: using file-based secrets with HOCON configs.
"file://" ++ SecretFile,
Scheme, Scheme,
ProxyHost, ProxyHost,
KinesisPort, KinesisPort,
@ -203,9 +211,6 @@ delete_bridge(Config) ->
ct:pal("deleting bridge ~p", [{Type, Name}]), ct:pal("deleting bridge ~p", [{Type, Name}]),
emqx_bridge:remove(Type, Name). emqx_bridge:remove(Type, Name).
create_bridge_http(Config) ->
create_bridge_http(Config, _KinesisConfigOverrides = #{}).
create_bridge_http(Config, KinesisConfigOverrides) -> create_bridge_http(Config, KinesisConfigOverrides) ->
TypeBin = ?BRIDGE_TYPE_BIN, TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kinesis_name, Config), Name = ?config(kinesis_name, Config),
@ -489,7 +494,11 @@ to_bin(Str) when is_list(Str) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_create_via_http(Config) -> t_create_via_http(Config) ->
?assertMatch({ok, _}, create_bridge_http(Config)), Overrides = #{
%% NOTE: using literal secret with HTTP API requests.
<<"aws_secret_access_key">> => <<?KINESIS_SECRET_KEY>>
},
?assertMatch({ok, _}, create_bridge_http(Config, Overrides)),
ok. ok.
t_start_failed_then_fix(Config) -> t_start_failed_then_fix(Config) ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mongodb, [ {application, emqx_bridge_mongodb, [
{description, "EMQX Enterprise MongoDB Bridge"}, {description, "EMQX Enterprise MongoDB Bridge"},
{vsn, "0.2.1"}, {vsn, "0.2.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -6,9 +6,6 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").

View File

@ -11,6 +11,8 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_utils_conv, [bin/1]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -96,14 +98,27 @@ init_per_group(Type = single, Config) ->
true -> true ->
ok = start_apps(), ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config), %% NOTE: `mongo-single` has auth enabled, see `credentials.env`.
AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")),
Username = bin(os:getenv("MONGO_USERNAME", "")),
Password = bin(os:getenv("MONGO_PASSWORD", "")),
Passfile = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(Passfile, Password),
NConfig = [
{mongo_authsource, AuthSource},
{mongo_username, Username},
{mongo_password, Password},
{mongo_passfile, Passfile}
| Config
],
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, NConfig),
[ [
{mongo_host, MongoHost}, {mongo_host, MongoHost},
{mongo_port, MongoPort}, {mongo_port, MongoPort},
{mongo_config, MongoConfig}, {mongo_config, MongoConfig},
{mongo_type, Type}, {mongo_type, Type},
{mongo_name, Name} {mongo_name, Name}
| Config | NConfig
]; ];
false -> false ->
{skip, no_mongo} {skip, no_mongo}
@ -121,13 +136,13 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
catch clear_db(Config), clear_db(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(), snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
catch clear_db(Config), clear_db(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:stop(), snabbkaffe:stop(),
ok. ok.
@ -175,19 +190,19 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.mongodb_rs.~s {\n" "bridges.mongodb_rs.~s {"
" enable = true\n" "\n enable = true"
" collection = mycol\n" "\n collection = mycol"
" replica_set_name = rs0\n" "\n replica_set_name = rs0"
" servers = [~p]\n" "\n servers = [~p]"
" w_mode = safe\n" "\n w_mode = safe"
" use_legacy_protocol = auto\n" "\n use_legacy_protocol = auto"
" database = mqtt\n" "\n database = mqtt"
" resource_opts = {\n" "\n resource_opts = {"
" query_mode = ~s\n" "\n query_mode = ~s"
" worker_pool_size = 1\n" "\n worker_pool_size = 1"
" }\n" "\n }"
"}", "\n }",
[ [
Name, Name,
Servers, Servers,
@ -202,18 +217,18 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.mongodb_sharded.~s {\n" "bridges.mongodb_sharded.~s {"
" enable = true\n" "\n enable = true"
" collection = mycol\n" "\n collection = mycol"
" servers = [~p]\n" "\n servers = [~p]"
" w_mode = safe\n" "\n w_mode = safe"
" use_legacy_protocol = auto\n" "\n use_legacy_protocol = auto"
" database = mqtt\n" "\n database = mqtt"
" resource_opts = {\n" "\n resource_opts = {"
" query_mode = ~s\n" "\n query_mode = ~s"
" worker_pool_size = 1\n" "\n worker_pool_size = 1"
" }\n" "\n }"
"}", "\n }",
[ [
Name, Name,
Servers, Servers,
@ -228,21 +243,27 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.mongodb_single.~s {\n" "bridges.mongodb_single.~s {"
" enable = true\n" "\n enable = true"
" collection = mycol\n" "\n collection = mycol"
" server = ~p\n" "\n server = ~p"
" w_mode = safe\n" "\n w_mode = safe"
" use_legacy_protocol = auto\n" "\n use_legacy_protocol = auto"
" database = mqtt\n" "\n database = mqtt"
" resource_opts = {\n" "\n auth_source = ~s"
" query_mode = ~s\n" "\n username = ~s"
" worker_pool_size = 1\n" "\n password = \"file://~s\""
" }\n" "\n resource_opts = {"
"}", "\n query_mode = ~s"
"\n worker_pool_size = 1"
"\n }"
"\n }",
[ [
Name, Name,
Server, Server,
?config(mongo_authsource, Config),
?config(mongo_username, Config),
?config(mongo_passfile, Config),
QueryMode QueryMode
] ]
), ),
@ -284,8 +305,24 @@ clear_db(Config) ->
Host = ?config(mongo_host, Config), Host = ?config(mongo_host, Config),
Port = ?config(mongo_port, Config), Port = ?config(mongo_port, Config),
Server = Host ++ ":" ++ integer_to_list(Port), Server = Host ++ ":" ++ integer_to_list(Port),
#{<<"database">> := Db, <<"collection">> := Collection} = ?config(mongo_config, Config), #{
{ok, Client} = mongo_api:connect(Type, [Server], [], [{database, Db}, {w_mode, unsafe}]), <<"database">> := Db,
<<"collection">> := Collection
} = ?config(mongo_config, Config),
WorkerOpts = [
{database, Db},
{w_mode, unsafe}
| lists:flatmap(
fun
({mongo_authsource, AS}) -> [{auth_source, AS}];
({mongo_username, User}) -> [{login, User}];
({mongo_password, Pass}) -> [{password, Pass}];
(_) -> []
end,
Config
)
],
{ok, Client} = mongo_api:connect(Type, [Server], [], WorkerOpts),
{true, _} = mongo_api:delete(Client, Collection, _Selector = #{}), {true, _} = mongo_api:delete(Client, Collection, _Selector = #{}),
mongo_api:disconnect(Client). mongo_api:disconnect(Client).
@ -386,13 +423,21 @@ t_setup_via_config_and_publish(Config) ->
ok. ok.
t_setup_via_http_api_and_publish(Config) -> t_setup_via_http_api_and_publish(Config) ->
Type = mongo_type_bin(?config(mongo_type, Config)), Type = ?config(mongo_type, Config),
Name = ?config(mongo_name, Config), Name = ?config(mongo_name, Config),
MongoConfig0 = ?config(mongo_config, Config), MongoConfig0 = ?config(mongo_config, Config),
MongoConfig = MongoConfig0#{ MongoConfig1 = MongoConfig0#{
<<"name">> => Name, <<"name">> => Name,
<<"type">> => Type <<"type">> => mongo_type_bin(Type)
}, },
MongoConfig =
case Type of
single ->
%% NOTE: using literal password with HTTP API requests.
MongoConfig1#{<<"password">> => ?config(mongo_password, Config)};
_ ->
MongoConfig1
end,
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge_http(MongoConfig) create_bridge_http(MongoConfig)

View File

@ -21,7 +21,6 @@
"DEFAULT CHARSET=utf8MB4;" "DEFAULT CHARSET=utf8MB4;"
). ).
-define(SQL_DROP_TABLE, "DROP TABLE mqtt_test"). -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
-define(SQL_DELETE, "DELETE from mqtt_test").
-define(SQL_SELECT, "SELECT payload FROM mqtt_test"). -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
% DB defaults % DB defaults
@ -112,8 +111,8 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
connect_and_drop_table(Config),
connect_and_create_table(Config), connect_and_create_table(Config),
connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(), snabbkaffe:start_trace(),
Config. Config.
@ -122,9 +121,7 @@ end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
connect_and_clear_table(Config),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
delete_bridge(Config),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok. ok.
@ -323,9 +320,6 @@ connect_and_create_table(Config) ->
connect_and_drop_table(Config) -> connect_and_drop_table(Config) ->
query_direct_mysql(Config, ?SQL_DROP_TABLE). query_direct_mysql(Config, ?SQL_DROP_TABLE).
connect_and_clear_table(Config) ->
query_direct_mysql(Config, ?SQL_DELETE).
connect_and_get_payload(Config) -> connect_and_get_payload(Config) ->
query_direct_mysql(Config, ?SQL_SELECT). query_direct_mysql(Config, ?SQL_SELECT).
@ -777,28 +771,21 @@ t_table_removed(Config) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace( connect_and_create_table(Config),
begin ?assertMatch({ok, _}, create_bridge(Config)),
connect_and_create_table(Config), ?retry(
?assertMatch({ok, _}, create_bridge(Config)), _Sleep = 1_000,
?retry( _Attempts = 20,
_Sleep = 1_000, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
_Attempts = 20, ),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) connect_and_drop_table(Config),
), Val = integer_to_binary(erlang:unique_integer()),
connect_and_drop_table(Config), SentData = #{payload => Val, timestamp => 1668602148000},
Val = integer_to_binary(erlang:unique_integer()), Timeout = 1000,
SentData = #{payload => Val, timestamp => 1668602148000}, ?assertMatch(
Timeout = 1000, {error,
?assertMatch( {unrecoverable_error, {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
{error, sync_query_resource(Config, {send_message, SentData, [], Timeout})
{unrecoverable_error,
{1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
sync_query_resource(Config, {send_message, SentData, [], Timeout})
),
ok
end,
[]
), ),
ok. ok.
@ -807,38 +794,31 @@ t_nested_payload_template(Config) ->
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Value = integer_to_binary(erlang:unique_integer()), Value = integer_to_binary(erlang:unique_integer()),
?check_trace( {ok, _} = create_bridge(
begin Config,
connect_and_create_table(Config), #{
{ok, _} = create_bridge( <<"sql">> =>
Config, "INSERT INTO mqtt_test(payload, arrived) "
#{ "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))"
<<"sql">> => }
"INSERT INTO mqtt_test(payload, arrived) " ),
"VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))" {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config),
} ?retry(
), _Sleep = 1_000,
{ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), _Attempts = 20,
?retry( ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
_Sleep = 1_000, ),
_Attempts = 20, %% send message via rule action
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) Payload = emqx_utils_json:encode(#{value => Value}),
), Message = emqx_message:make(Topic, Payload),
%% send message via rule action {_, {ok, _}} =
Payload = emqx_utils_json:encode(#{value => Value}), ?wait_async_action(
Message = emqx_message:make(Topic, Payload), emqx:publish(Message),
{_, {ok, _}} = #{?snk_kind := mysql_connector_query_return},
?wait_async_action( 10_000
emqx:publish(Message), ),
#{?snk_kind := mysql_connector_query_return}, ?assertEqual(
10_000 {ok, [<<"payload">>], [[Value]]},
), connect_and_get_payload(Config)
?assertEqual(
{ok, [<<"payload">>], [[Value]]},
connect_and_get_payload(Config)
),
ok
end,
[]
), ),
ok. ok.

View File

@ -16,7 +16,6 @@
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]). -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]).
-define(SID, "XE"). -define(SID, "XE").
-define(RULE_TOPIC, "mqtt/rule"). -define(RULE_TOPIC, "mqtt/rule").
% -define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
@ -33,9 +32,6 @@ groups() ->
{plain, AllTCs} {plain, AllTCs}
]. ].
only_once_tests() ->
[t_create_via_http].
init_per_suite(Config) -> init_per_suite(Config) ->
Config. Config.

View File

@ -183,31 +183,33 @@ pgsql_config(BridgeType, Config) ->
end, end,
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
TlsEnabled = ?config(enable_tls, Config), TlsEnabled = ?config(enable_tls, Config),
%% NOTE: supplying password through a file here, to verify that it works.
Password = create_passfile(BridgeType, Config),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {"
" enable = true\n" "\n enable = true"
" server = ~p\n" "\n server = ~p"
" database = ~p\n" "\n database = ~p"
" username = ~p\n" "\n username = ~p"
" password = ~p\n" "\n password = ~p"
" sql = ~p\n" "\n sql = ~p"
" resource_opts = {\n" "\n resource_opts = {"
" request_ttl = 500ms\n" "\n request_ttl = 500ms"
" batch_size = ~b\n" "\n batch_size = ~b"
" query_mode = ~s\n" "\n query_mode = ~s"
" }\n" "\n }"
" ssl = {\n" "\n ssl = {"
" enable = ~w\n" "\n enable = ~w"
" }\n" "\n }"
"}", "\n }",
[ [
BridgeType, BridgeType,
Name, Name,
Server, Server,
?PGSQL_DATABASE, ?PGSQL_DATABASE,
?PGSQL_USERNAME, ?PGSQL_USERNAME,
?PGSQL_PASSWORD, Password,
?SQL_BRIDGE, ?SQL_BRIDGE,
BatchSize, BatchSize,
QueryMode, QueryMode,
@ -216,6 +218,12 @@ pgsql_config(BridgeType, Config) ->
), ),
{Name, parse_and_check(ConfigString, BridgeType, Name)}. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
create_passfile(BridgeType, Config) ->
Filename = binary_to_list(BridgeType) ++ ".passfile",
Filepath = filename:join(?config(priv_dir, Config), Filename),
ok = file:write_file(Filepath, ?PGSQL_PASSWORD),
"file://" ++ Filepath.
parse_and_check(ConfigString, BridgeType, Name) -> parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}), {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
@ -379,7 +387,9 @@ t_setup_via_http_api_and_publish(Config) ->
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
PgsqlConfig = PgsqlConfig0#{ PgsqlConfig = PgsqlConfig0#{
<<"name">> => Name, <<"name">> => Name,
<<"type">> => BridgeType <<"type">> => BridgeType,
%% NOTE: using literal passwords with HTTP API requests.
<<"password">> => <<?PGSQL_PASSWORD>>
}, },
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},

View File

@ -170,21 +170,17 @@ fields(auth_basic) ->
[ [
{username, mk(binary(), #{required => true, desc => ?DESC("auth_basic_username")})}, {username, mk(binary(), #{required => true, desc => ?DESC("auth_basic_username")})},
{password, {password,
mk(binary(), #{ emqx_schema_secret:mk(#{
required => true, required => true,
desc => ?DESC("auth_basic_password"), desc => ?DESC("auth_basic_password")
sensitive => true,
converter => fun emqx_schema:password_converter/2
})} })}
]; ];
fields(auth_token) -> fields(auth_token) ->
[ [
{jwt, {jwt,
mk(binary(), #{ emqx_schema_secret:mk(#{
required => true, required => true,
desc => ?DESC("auth_token_jwt"), desc => ?DESC("auth_token_jwt")
sensitive => true,
converter => fun emqx_schema:password_converter/2
})} })}
]; ];
fields("get_" ++ Type) -> fields("get_" ++ Type) ->

View File

@ -78,7 +78,6 @@ query_mode(_Config) ->
-spec on_start(resource_id(), config()) -> {ok, state()}. -spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) -> on_start(InstanceId, Config) ->
#{ #{
authentication := _Auth,
bridge_name := BridgeName, bridge_name := BridgeName,
servers := Servers0, servers := Servers0,
ssl := SSL ssl := SSL
@ -263,12 +262,14 @@ conn_opts(#{authentication := none}) ->
#{}; #{};
conn_opts(#{authentication := #{username := Username, password := Password}}) -> conn_opts(#{authentication := #{username := Username, password := Password}}) ->
#{ #{
auth_data => iolist_to_binary([Username, <<":">>, Password]), %% TODO: teach `pulsar` to accept 0-arity closures as passwords.
auth_data => iolist_to_binary([Username, <<":">>, emqx_secret:unwrap(Password)]),
auth_method_name => <<"basic">> auth_method_name => <<"basic">>
}; };
conn_opts(#{authentication := #{jwt := JWT}}) -> conn_opts(#{authentication := #{jwt := JWT}}) ->
#{ #{
auth_data => JWT, %% TODO: teach `pulsar` to accept 0-arity closures as passwords.
auth_data => emqx_secret:unwrap(JWT),
auth_method_name => <<"token">> auth_method_name => <<"token">>
}. }.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [ {application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"}, {description, "EMQX Enterprise RabbitMQ Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -72,7 +72,7 @@ fields(config) ->
desc => ?DESC("username") desc => ?DESC("username")
} }
)}, )},
{password, fun emqx_connector_schema_lib:password_required/1}, {password, emqx_connector_schema_lib:password_field(#{required => true})},
{pool_size, {pool_size,
hoconsc:mk( hoconsc:mk(
typerefl:pos_integer(), typerefl:pos_integer(),
@ -194,7 +194,6 @@ on_start(
#{ #{
pool_size := PoolSize, pool_size := PoolSize,
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
password := Password,
delivery_mode := InitialDeliveryMode delivery_mode := InitialDeliveryMode
} = InitialConfig } = InitialConfig
) -> ) ->
@ -204,7 +203,6 @@ on_start(
persistent -> 2 persistent -> 2
end, end,
Config = InitialConfig#{ Config = InitialConfig#{
password => emqx_secret:wrap(Password),
delivery_mode => DeliveryMode delivery_mode => DeliveryMode
}, },
?SLOG(info, #{ ?SLOG(info, #{
@ -240,13 +238,11 @@ on_start(
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
LogMessage = ?SLOG(info, #{
#{ msg => "rabbitmq_connector_start_failed",
msg => "rabbitmq_connector_start_failed", error_reason => Reason,
error_reason => Reason, config => emqx_utils:redact(Config)
config => emqx_utils:redact(Config) }),
},
?SLOG(info, LogMessage),
{error, Reason} {error, Reason}
end. end.
@ -319,6 +315,7 @@ create_rabbitmq_connection_and_channel(Config) ->
heartbeat := Heartbeat, heartbeat := Heartbeat,
wait_for_publish_confirmations := WaitForPublishConfirmations wait_for_publish_confirmations := WaitForPublishConfirmations
} = Config, } = Config,
%% TODO: teach `amqp` to accept 0-arity closures as passwords.
Password = emqx_secret:unwrap(WrappedPassword), Password = emqx_secret:unwrap(WrappedPassword),
SSLOptions = SSLOptions =
case maps:get(ssl, Config, #{}) of case maps:get(ssl, Config, #{}) of

View File

@ -10,6 +10,7 @@
-include("emqx_connector.hrl"). -include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/amqp_client.hrl").
%% This test SUITE requires a running RabbitMQ instance. If you don't want to %% This test SUITE requires a running RabbitMQ instance. If you don't want to
@ -26,6 +27,9 @@ rabbit_mq_host() ->
rabbit_mq_port() -> rabbit_mq_port() ->
5672. 5672.
rabbit_mq_password() ->
<<"guest">>.
rabbit_mq_exchange() -> rabbit_mq_exchange() ->
<<"test_exchange">>. <<"test_exchange">>.
@ -45,12 +49,12 @@ init_per_suite(Config) ->
) )
of of
true -> true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]), Apps = emqx_cth_suite:start(
ok = emqx_connector_test_helpers:start_apps([emqx_resource]), [emqx_conf, emqx_connector, emqx_bridge_rabbitmq],
{ok, _} = application:ensure_all_started(emqx_connector), #{work_dir => emqx_cth_suite:work_dir(Config)}
{ok, _} = application:ensure_all_started(amqp_client), ),
ChannelConnection = setup_rabbit_mq_exchange_and_queue(), ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
[{channel_connection, ChannelConnection} | Config]; [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config];
false -> false ->
case os:getenv("IS_CI") of case os:getenv("IS_CI") of
"yes" -> "yes" ->
@ -106,13 +110,11 @@ end_per_suite(Config) ->
connection := Connection, connection := Connection,
channel := Channel channel := Channel
} = get_channel_connection(Config), } = get_channel_connection(Config),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
%% Close the channel %% Close the channel
ok = amqp_channel:close(Channel), ok = amqp_channel:close(Channel),
%% Close the connection %% Close the connection
ok = amqp_connection:close(Connection). ok = amqp_connection:close(Connection),
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
% %%------------------------------------------------------------------------------ % %%------------------------------------------------------------------------------
% %% Testcases % %% Testcases
@ -125,23 +127,31 @@ t_lifecycle(Config) ->
Config Config
). ).
t_start_passfile(Config) ->
ResourceID = atom_to_binary(?FUNCTION_NAME),
PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(PasswordFilename, rabbit_mq_password()),
InitialConfig = rabbitmq_config(#{
password => iolist_to_binary(["file://", PasswordFilename])
}),
?assertMatch(
#{status := connected},
create_local_resource(ResourceID, check_config(InitialConfig))
),
?assertEqual(
ok,
emqx_resource:remove_local(ResourceID)
).
perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
#{ #{
channel := Channel channel := Channel
} = get_channel_connection(TestConfig), } = get_channel_connection(TestConfig),
{ok, #{config := CheckedConfig}} = CheckedConfig = check_config(InitialConfig),
emqx_resource:check_config(emqx_bridge_rabbitmq_connector, InitialConfig), #{
{ok, #{
state := #{poolname := PoolName} = State, state := #{poolname := PoolName} = State,
status := InitialStatus status := InitialStatus
}} = } = create_local_resource(ResourceID, CheckedConfig),
emqx_resource:create_local(
ResourceID,
?CONNECTOR_RESOURCE_GROUP,
emqx_bridge_rabbitmq_connector,
CheckedConfig,
#{}
),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
%% Instance should match the state and status of the just started resource %% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{ {ok, ?CONNECTOR_RESOURCE_GROUP, #{
@ -184,6 +194,21 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
% %% Helpers % %% Helpers
% %%------------------------------------------------------------------------------ % %%------------------------------------------------------------------------------
check_config(Config) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(emqx_bridge_rabbitmq_connector, Config),
CheckedConfig.
create_local_resource(ResourceID, CheckedConfig) ->
{ok, Bridge} = emqx_resource:create_local(
ResourceID,
?CONNECTOR_RESOURCE_GROUP,
emqx_bridge_rabbitmq_connector,
CheckedConfig,
#{}
),
Bridge.
perform_query(PoolName, Channel) -> perform_query(PoolName, Channel) ->
%% Send message to queue: %% Send message to queue:
ok = emqx_resource:query(PoolName, {query, test_data()}), ok = emqx_resource:query(PoolName, {query, test_data()}),
@ -216,16 +241,19 @@ receive_simple_test_message(Channel) ->
end. end.
rabbitmq_config() -> rabbitmq_config() ->
rabbitmq_config(#{}).
rabbitmq_config(Overrides) ->
Config = Config =
#{ #{
server => rabbit_mq_host(), server => rabbit_mq_host(),
port => 5672, port => 5672,
username => <<"guest">>, username => <<"guest">>,
password => <<"guest">>, password => rabbit_mq_password(),
exchange => rabbit_mq_exchange(), exchange => rabbit_mq_exchange(),
routing_key => rabbit_mq_routing_key() routing_key => rabbit_mq_routing_key()
}, },
#{<<"config">> => Config}. #{<<"config">> => maps:merge(Config, Overrides)}.
test_data() -> test_data() ->
#{<<"msg_field">> => <<"Hello">>}. #{<<"msg_field">> => <<"Hello">>}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [ {application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, []}, {env, []},

View File

@ -48,13 +48,8 @@ fields(config) ->
binary(), binary(),
#{default => <<>>, desc => ?DESC("access_key")} #{default => <<>>, desc => ?DESC("access_key")}
)}, )},
{secret_key, {secret_key, emqx_schema_secret:mk(#{default => <<>>, desc => ?DESC("secret_key")})},
mk( {security_token, emqx_schema_secret:mk(#{default => <<>>, desc => ?DESC(security_token)})},
binary(),
#{default => <<>>, desc => ?DESC("secret_key"), sensitive => true}
)},
{security_token,
mk(binary(), #{default => <<>>, desc => ?DESC(security_token), sensitive => true})},
{sync_timeout, {sync_timeout,
mk( mk(
emqx_schema:timeout_duration(), emqx_schema:timeout_duration(),
@ -294,21 +289,19 @@ make_producer_opts(
acl_info => emqx_secret:wrap(ACLInfo) acl_info => emqx_secret:wrap(ACLInfo)
}. }.
acl_info(<<>>, <<>>, <<>>) -> acl_info(<<>>, _, _) ->
#{}; #{};
acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) -> acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) ->
#{ Info = #{
access_key => AccessKey, access_key => AccessKey,
secret_key => SecretKey secret_key => emqx_maybe:define(emqx_secret:unwrap(SecretKey), <<>>)
}; },
acl_info(AccessKey, SecretKey, SecurityToken) when case emqx_maybe:define(emqx_secret:unwrap(SecurityToken), <<>>) of
is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken) <<>> ->
-> Info;
#{ Token ->
access_key => AccessKey, Info#{security_token => Token}
secret_key => SecretKey, end;
security_token => SecurityToken
};
acl_info(_, _, _) -> acl_info(_, _, _) ->
#{}. #{}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_sqlserver, [ {application, emqx_bridge_sqlserver, [
{description, "EMQX Enterprise SQL Server Bridge"}, {description, "EMQX Enterprise SQL Server Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, odbc]}, {applications, [kernel, stdlib, emqx_resource, odbc]},
{env, []}, {env, []},

View File

@ -199,7 +199,7 @@ on_start(
Options = [ Options = [
{server, to_bin(Server)}, {server, to_bin(Server)},
{username, Username}, {username, Username},
{password, emqx_secret:wrap(maps:get(password, Config, ""))}, {password, maps:get(password, Config, emqx_secret:wrap(""))},
{driver, Driver}, {driver, Driver},
{database, Database}, {database, Database},
{pool_size, PoolSize} {pool_size, PoolSize}

View File

@ -130,7 +130,9 @@ end_per_group(_Group, _Config) ->
ok. ok.
init_per_suite(Config) -> init_per_suite(Config) ->
Config. Passfile = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(Passfile, <<?SQL_SERVER_PASSWORD>>),
[{sqlserver_passfile, Passfile} | Config].
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(), emqx_mgmt_api_test_util:end_suite(),
@ -193,7 +195,9 @@ t_setup_via_http_api_and_publish(Config) ->
SQLServerConfig0 = ?config(sqlserver_config, Config), SQLServerConfig0 = ?config(sqlserver_config, Config),
SQLServerConfig = SQLServerConfig0#{ SQLServerConfig = SQLServerConfig0#{
<<"name">> => Name, <<"name">> => Name,
<<"type">> => BridgeType <<"type">> => BridgeType,
%% NOTE: using literal password with HTTP API requests.
<<"password">> => <<?SQL_SERVER_PASSWORD>>
}, },
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -449,6 +453,7 @@ sqlserver_config(BridgeType, Config) ->
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
BatchSize = batch_size(Config), BatchSize = batch_size(Config),
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
Passfile = ?config(sqlserver_passfile, Config),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
@ -472,7 +477,7 @@ sqlserver_config(BridgeType, Config) ->
Server, Server,
?SQL_SERVER_DATABASE, ?SQL_SERVER_DATABASE,
?SQL_SERVER_USERNAME, ?SQL_SERVER_USERNAME,
?SQL_SERVER_PASSWORD, "file://" ++ Passfile,
?SQL_BRIDGE, ?SQL_BRIDGE,
?SQL_SERVER_DRIVER, ?SQL_SERVER_DRIVER,
BatchSize, BatchSize,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_tdengine, [ {application, emqx_bridge_tdengine, [
{description, "EMQX Enterprise TDEngine Bridge"}, {description, "EMQX Enterprise TDEngine Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -6,7 +6,6 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -48,8 +47,8 @@ adjust_fields(Fields) ->
fun fun
({username, OrigUsernameFn}) -> ({username, OrigUsernameFn}) ->
{username, add_default_fn(OrigUsernameFn, <<"root">>)}; {username, add_default_fn(OrigUsernameFn, <<"root">>)};
({password, OrigPasswordFn}) -> ({password, _}) ->
{password, make_required_fn(OrigPasswordFn)}; {password, emqx_connector_schema_lib:password_field(#{required => true})};
(Field) -> (Field) ->
Field Field
end, end,
@ -62,12 +61,6 @@ add_default_fn(OrigFn, Default) ->
(Field) -> OrigFn(Field) (Field) -> OrigFn(Field)
end. end.
make_required_fn(OrigFn) ->
fun
(required) -> true;
(Field) -> OrigFn(Field)
end.
server() -> server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?TD_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?TD_HOST_OPTIONS).
@ -223,7 +216,10 @@ aggregate_query(BatchTks, BatchReqs, Acc) ->
). ).
connect(Opts) -> connect(Opts) ->
tdengine:start_link(Opts). %% TODO: teach `tdengine` to accept 0-arity closures as passwords.
{value, {password, Secret}, OptsRest} = lists:keytake(password, 1, Opts),
NOpts = [{password, emqx_secret:unwrap(Secret)} | OptsRest],
tdengine:start_link(NOpts).
query_opts(#{database := Database} = _Opts) -> query_opts(#{database := Database} = _Opts) ->
[{db_name, Database}]. [{db_name, Database}].

View File

@ -22,15 +22,15 @@
-export([ -export([
relational_db_fields/0, relational_db_fields/0,
ssl_fields/0, ssl_fields/0,
prepare_statement_fields/0 prepare_statement_fields/0,
password_field/0,
password_field/1
]). ]).
-export([ -export([
pool_size/1, pool_size/1,
database/1, database/1,
username/1, username/1,
password/1,
password_required/1,
auto_reconnect/1 auto_reconnect/1
]). ]).
@ -68,10 +68,19 @@ relational_db_fields() ->
%% See emqx_resource.hrl %% See emqx_resource.hrl
{pool_size, fun pool_size/1}, {pool_size, fun pool_size/1},
{username, fun username/1}, {username, fun username/1},
{password, fun password/1}, {password, password_field()},
{auto_reconnect, fun auto_reconnect/1} {auto_reconnect, fun auto_reconnect/1}
]. ].
-spec password_field() -> hocon_schema:field_schema().
password_field() ->
password_field(#{}).
-spec password_field(#{atom() => _}) -> hocon_schema:field_schema().
password_field(Overrides) ->
Base = #{desc => ?DESC("password")},
emqx_schema_secret:mk(maps:merge(Base, Overrides)).
prepare_statement_fields() -> prepare_statement_fields() ->
[{prepare_statement, fun prepare_statement/1}]. [{prepare_statement, fun prepare_statement/1}].
@ -97,22 +106,6 @@ username(desc) -> ?DESC("username");
username(required) -> false; username(required) -> false;
username(_) -> undefined. username(_) -> undefined.
password(type) -> binary();
password(desc) -> ?DESC("password");
password(required) -> false;
password(format) -> <<"password">>;
password(sensitive) -> true;
password(converter) -> fun emqx_schema:password_converter/2;
password(_) -> undefined.
password_required(type) -> binary();
password_required(desc) -> ?DESC("password");
password_required(required) -> true;
password_required(format) -> <<"password">>;
password_required(sensitive) -> true;
password_required(converter) -> fun emqx_schema:password_converter/2;
password_required(_) -> undefined.
auto_reconnect(type) -> boolean(); auto_reconnect(type) -> boolean();
auto_reconnect(desc) -> ?DESC("auto_reconnect"); auto_reconnect(desc) -> ?DESC("auto_reconnect");
auto_reconnect(default) -> true; auto_reconnect(default) -> true;

View File

@ -204,7 +204,7 @@ backend(get, #{bindings := #{backend := Type}}) ->
undefined -> undefined ->
{404, #{code => ?BACKEND_NOT_FOUND, message => <<"Backend not found">>}}; {404, #{code => ?BACKEND_NOT_FOUND, message => <<"Backend not found">>}};
Backend -> Backend ->
{200, to_json(Backend)} {200, to_redacted_json(Backend)}
end; end;
backend(put, #{bindings := #{backend := Backend}, body := Config}) -> backend(put, #{bindings := #{backend := Backend}, body := Config}) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -264,9 +264,9 @@ valid_config(_, _, _) ->
{error, invalid_config}. {error, invalid_config}.
handle_backend_update_result({ok, #{backend := saml} = State}, _Config) -> handle_backend_update_result({ok, #{backend := saml} = State}, _Config) ->
{200, to_json(maps:without([idp_meta, sp], State))}; {200, to_redacted_json(maps:without([idp_meta, sp], State))};
handle_backend_update_result({ok, _State}, Config) -> handle_backend_update_result({ok, _State}, Config) ->
{200, to_json(Config)}; {200, to_redacted_json(Config)};
handle_backend_update_result(ok, _) -> handle_backend_update_result(ok, _) ->
204; 204;
handle_backend_update_result({error, not_exists}, _) -> handle_backend_update_result({error, not_exists}, _) ->
@ -278,9 +278,9 @@ handle_backend_update_result({error, Reason}, _) when is_binary(Reason) ->
handle_backend_update_result({error, Reason}, _) -> handle_backend_update_result({error, Reason}, _) ->
{400, #{code => ?BAD_REQUEST, message => emqx_dashboard_sso:format(["Reason: ", Reason])}}. {400, #{code => ?BAD_REQUEST, message => emqx_dashboard_sso:format(["Reason: ", Reason])}}.
to_json(Data) -> to_redacted_json(Data) ->
emqx_utils_maps:jsonable_map( emqx_utils_maps:jsonable_map(
Data, emqx_utils:redact(Data),
fun(K, V) -> fun(K, V) ->
{K, emqx_utils_maps:binary_string(V)} {K, emqx_utils_maps:binary_string(V)}
end end

View File

@ -10,9 +10,11 @@
-include_lib("emqx_dashboard/include/emqx_dashboard.hrl"). -include_lib("emqx_dashboard/include/emqx_dashboard.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(LDAP_HOST, "ldap"). -define(LDAP_HOST, "ldap").
-define(LDAP_DEFAULT_PORT, 389). -define(LDAP_DEFAULT_PORT, 389).
-define(LDAP_PASSWORD, <<"public">>).
-define(LDAP_USER, <<"viewer1">>). -define(LDAP_USER, <<"viewer1">>).
-define(LDAP_USER_PASSWORD, <<"viewer1">>). -define(LDAP_USER_PASSWORD, <<"viewer1">>).
-define(LDAP_BASE_DN, <<"ou=dashboard,dc=emqx,dc=io">>). -define(LDAP_BASE_DN, <<"ou=dashboard,dc=emqx,dc=io">>).
@ -128,9 +130,19 @@ t_update({init, Config}) ->
Config; Config;
t_update({'end', _Config}) -> t_update({'end', _Config}) ->
ok; ok;
t_update(_) -> t_update(Config) ->
Path = uri(["sso", "ldap"]), Path = uri(["sso", "ldap"]),
{ok, 200, Result} = request(put, Path, ldap_config(#{<<"enable">> => <<"true">>})), %% NOTE: this time verify that supplying password through file-based secret works.
PasswordFilename = filename:join([?config(priv_dir, Config), "passfile"]),
ok = file:write_file(PasswordFilename, ?LDAP_PASSWORD),
{ok, 200, Result} = request(
put,
Path,
ldap_config(#{
<<"enable">> => <<"true">>,
<<"password">> => iolist_to_binary(["file://", PasswordFilename])
})
),
check_running([<<"ldap">>]), check_running([<<"ldap">>]),
?assertMatch(#{backend := <<"ldap">>, enable := true}, decode_json(Result)), ?assertMatch(#{backend := <<"ldap">>, enable := true}, decode_json(Result)),
?assertMatch([#{backend := <<"ldap">>, enable := true}], get_sso()), ?assertMatch([#{backend := <<"ldap">>, enable := true}], get_sso()),
@ -287,7 +299,7 @@ ldap_config(Override) ->
<<"base_dn">> => ?LDAP_BASE_DN, <<"base_dn">> => ?LDAP_BASE_DN,
<<"filter">> => ?LDAP_FILTER_WITH_UID, <<"filter">> => ?LDAP_FILTER_WITH_UID,
<<"username">> => <<"cn=root,dc=emqx,dc=io">>, <<"username">> => <<"cn=root,dc=emqx,dc=io">>,
<<"password">> => <<"public">>, <<"password">> => ?LDAP_PASSWORD,
<<"pool_size">> => 8 <<"pool_size">> => 8
}, },
Override Override

View File

@ -53,8 +53,6 @@
filter_tokens := params_tokens() filter_tokens := params_tokens()
}. }.
-define(ECS, emqx_connector_schema_lib).
%%===================================================================== %%=====================================================================
%% Hocon schema %% Hocon schema
roots() -> roots() ->
@ -63,9 +61,9 @@ roots() ->
fields(config) -> fields(config) ->
[ [
{server, server()}, {server, server()},
{pool_size, fun ?ECS:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun ensure_username/1}, {username, fun ensure_username/1},
{password, fun ?ECS:password/1}, {password, emqx_connector_schema_lib:password_field()},
{base_dn, {base_dn,
?HOCON(binary(), #{ ?HOCON(binary(), #{
desc => ?DESC(base_dn), desc => ?DESC(base_dn),
@ -124,7 +122,7 @@ server() ->
ensure_username(required) -> ensure_username(required) ->
true; true;
ensure_username(Field) -> ensure_username(Field) ->
?ECS:username(Field). emqx_connector_schema_lib:username(Field).
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync. callback_mode() -> always_sync.
@ -223,7 +221,8 @@ connect(Options) ->
OpenOpts = maps:to_list(maps:with([port, sslopts], Conf)), OpenOpts = maps:to_list(maps:with([port, sslopts], Conf)),
case eldap:open([Host], [{log, fun log/3}, {timeout, RequestTimeout} | OpenOpts]) of case eldap:open([Host], [{log, fun log/3}, {timeout, RequestTimeout} | OpenOpts]) of
{ok, Handle} = Ret -> {ok, Handle} = Ret ->
case eldap:simple_bind(Handle, Username, Password) of %% TODO: teach `eldap` to accept 0-arity closures as passwords.
case eldap:simple_bind(Handle, Username, emqx_secret:unwrap(Password)) of
ok -> Ret; ok -> Ret;
Error -> Error Error -> Error
end; end;
@ -320,13 +319,13 @@ log(Level, Format, Args) ->
). ).
prepare_template(Config, State) -> prepare_template(Config, State) ->
do_prepare_template(maps:to_list(maps:with([base_dn, filter], Config)), State). maps:fold(fun prepare_template/3, State, Config).
do_prepare_template([{base_dn, V} | T], State) -> prepare_template(base_dn, V, State) ->
do_prepare_template(T, State#{base_tokens => emqx_placeholder:preproc_tmpl(V)}); State#{base_tokens => emqx_placeholder:preproc_tmpl(V)};
do_prepare_template([{filter, V} | T], State) -> prepare_template(filter, V, State) ->
do_prepare_template(T, State#{filter_tokens => emqx_placeholder:preproc_tmpl(V)}); State#{filter_tokens => emqx_placeholder:preproc_tmpl(V)};
do_prepare_template([], State) -> prepare_template(_Entry, _, State) ->
State. State.
filter_escape(Binary) when is_binary(Binary) -> filter_escape(Binary) when is_binary(Binary) ->

View File

@ -1,6 +1,6 @@
{application, emqx_mongodb, [ {application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"}, {description, "EMQX MongoDB Connector"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -140,7 +140,7 @@ mongo_fields() ->
{srv_record, fun srv_record/1}, {srv_record, fun srv_record/1},
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1}, {username, fun emqx_connector_schema_lib:username/1},
{password, fun emqx_connector_schema_lib:password/1}, {password, emqx_connector_schema_lib:password_field()},
{use_legacy_protocol, {use_legacy_protocol,
hoconsc:mk(hoconsc:enum([auto, true, false]), #{ hoconsc:mk(hoconsc:enum([auto, true, false]), #{
default => auto, default => auto,
@ -428,8 +428,8 @@ init_worker_options([{auth_source, V} | R], Acc) ->
init_worker_options(R, [{auth_source, V} | Acc]); init_worker_options(R, [{auth_source, V} | Acc]);
init_worker_options([{username, V} | R], Acc) -> init_worker_options([{username, V} | R], Acc) ->
init_worker_options(R, [{login, V} | Acc]); init_worker_options(R, [{login, V} | Acc]);
init_worker_options([{password, V} | R], Acc) -> init_worker_options([{password, Secret} | R], Acc) ->
init_worker_options(R, [{password, emqx_secret:wrap(V)} | Acc]); init_worker_options(R, [{password, Secret} | Acc]);
init_worker_options([{w_mode, V} | R], Acc) -> init_worker_options([{w_mode, V} | R], Acc) ->
init_worker_options(R, [{w_mode, V} | Acc]); init_worker_options(R, [{w_mode, V} | Acc]);
init_worker_options([{r_mode, V} | R], Acc) -> init_worker_options([{r_mode, V} | R], Acc) ->

View File

@ -20,6 +20,7 @@
-include("emqx_connector.hrl"). -include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
@ -65,27 +66,36 @@ t_lifecycle(_Config) ->
mongo_config() mongo_config()
). ).
t_start_passfile(Config) ->
ResourceID = atom_to_binary(?FUNCTION_NAME),
PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(PasswordFilename, mongo_password()),
InitialConfig = emqx_utils_maps:deep_merge(mongo_config(), #{
<<"config">> => #{
<<"password">> => iolist_to_binary(["file://", PasswordFilename])
}
}),
?assertMatch(
#{status := connected},
create_local_resource(ResourceID, check_config(InitialConfig))
),
?assertEqual(
ok,
emqx_resource:remove_local(ResourceID)
).
perform_lifecycle_check(ResourceId, InitialConfig) -> perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} = CheckedConfig = check_config(InitialConfig),
emqx_resource:check_config(?MONGO_RESOURCE_MOD, InitialConfig), #{
{ok, #{
state := #{pool_name := PoolName} = State, state := #{pool_name := PoolName} = State,
status := InitialStatus status := InitialStatus
}} = } = create_local_resource(ResourceId, CheckedConfig),
emqx_resource:create_local(
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?MONGO_RESOURCE_MOD,
CheckedConfig,
#{}
),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{ {ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State, state := State,
status := InitialStatus status := InitialStatus
}} = }} = emqx_resource:get_instance(ResourceId),
emqx_resource:get_instance(ResourceId),
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)),
% % Perform query as further check that the resource is working as expected % % Perform query as further check that the resource is working as expected
?assertMatch({ok, []}, emqx_resource:query(ResourceId, test_query_find())), ?assertMatch({ok, []}, emqx_resource:query(ResourceId, test_query_find())),
@ -123,24 +133,52 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
% %% Helpers % %% Helpers
% %%------------------------------------------------------------------------------ % %%------------------------------------------------------------------------------
check_config(Config) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MONGO_RESOURCE_MOD, Config),
CheckedConfig.
create_local_resource(ResourceId, CheckedConfig) ->
{ok, Bridge} = emqx_resource:create_local(
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?MONGO_RESOURCE_MOD,
CheckedConfig,
#{}
),
Bridge.
mongo_config() -> mongo_config() ->
RawConfig = list_to_binary( RawConfig = list_to_binary(
io_lib:format( io_lib:format(
"" "\n mongo_type = single"
"\n" "\n database = mqtt"
" mongo_type = single\n" "\n pool_size = 8"
" database = mqtt\n" "\n server = \"~s:~b\""
" pool_size = 8\n" "\n auth_source = ~p"
" server = \"~s:~b\"\n" "\n username = ~p"
" " "\n password = ~p"
"", "\n",
[?MONGO_HOST, ?MONGO_DEFAULT_PORT] [
?MONGO_HOST,
?MONGO_DEFAULT_PORT,
mongo_authsource(),
mongo_username(),
mongo_password()
]
) )
), ),
{ok, Config} = hocon:binary(RawConfig), {ok, Config} = hocon:binary(RawConfig),
#{<<"config">> => Config}. #{<<"config">> => Config}.
mongo_authsource() ->
os:getenv("MONGO_AUTHSOURCE", "admin").
mongo_username() ->
os:getenv("MONGO_USERNAME", "").
mongo_password() ->
os:getenv("MONGO_PASSWORD", "").
test_query_find() -> test_query_find() ->
{find, <<"foo">>, #{}, #{}}. {find, <<"foo">>, #{}, #{}}.

View File

@ -280,7 +280,10 @@ do_check_prepares(#{prepares := {error, _}} = State) ->
%% =================================================================== %% ===================================================================
connect(Options) -> connect(Options) ->
mysql:start_link(Options). %% TODO: teach `tdengine` to accept 0-arity closures as passwords.
{value, {password, Secret}, Rest} = lists:keytake(password, 1, Options),
NOptions = [{password, emqx_secret:unwrap(Secret)} | Rest],
mysql:start_link(NOptions).
init_prepare(State = #{query_templates := Templates}) -> init_prepare(State = #{query_templates := Templates}) ->
case maps:size(Templates) of case maps:size(Templates) of

View File

@ -1,6 +1,6 @@
{application, emqx_oracle, [ {application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"}, {description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.1.7"}, {vsn, "0.1.8"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -95,7 +95,7 @@ on_start(
{host, Host}, {host, Host},
{port, Port}, {port, Port},
{user, emqx_utils_conv:str(User)}, {user, emqx_utils_conv:str(User)},
{password, jamdb_secret:wrap(maps:get(password, Config, ""))}, {password, maps:get(password, Config, "")},
{sid, emqx_utils_conv:str(Sid)}, {sid, emqx_utils_conv:str(Sid)},
{service_name, ServiceName}, {service_name, ServiceName},
{pool_size, maps:get(pool_size, Config, ?DEFAULT_POOL_SIZE)}, {pool_size, maps:get(pool_size, Config, ?DEFAULT_POOL_SIZE)},

View File

@ -131,7 +131,7 @@ on_start(
{host, Host}, {host, Host},
{port, Port}, {port, Port},
{username, User}, {username, User},
{password, emqx_secret:wrap(maps:get(password, Config, ""))}, {password, maps:get(password, Config, emqx_secret:wrap(""))},
{database, DB}, {database, DB},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize} {pool_size, PoolSize}
@ -357,6 +357,7 @@ validate_table_existence([], _SQL) ->
connect(Opts) -> connect(Opts) ->
Host = proplists:get_value(host, Opts), Host = proplists:get_value(host, Opts),
Username = proplists:get_value(username, Opts), Username = proplists:get_value(username, Opts),
%% TODO: teach `epgsql` to accept 0-arity closures as passwords.
Password = emqx_secret:unwrap(proplists:get_value(password, Opts)), Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
{ok, _Conn} = Ok -> {ok, _Conn} = Ok ->

View File

@ -147,7 +147,7 @@ on_start(
[ [
{pool_size, PoolSize}, {pool_size, PoolSize},
{username, maps:get(username, Config, undefined)}, {username, maps:get(username, Config, undefined)},
{password, eredis_secret:wrap(maps:get(password, Config, ""))}, {password, maps:get(password, Config, "")},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL} {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
] ++ Database ++ Servers, ] ++ Database ++ Servers,
Options = Options =
@ -296,7 +296,7 @@ redis_fields() ->
[ [
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1}, {username, fun emqx_connector_schema_lib:username/1},
{password, fun emqx_connector_schema_lib:password/1}, {password, emqx_connector_schema_lib:password_field()},
{database, #{ {database, #{
type => non_neg_integer(), type => non_neg_integer(),
default => 0, default => 0,

View File

@ -0,0 +1 @@
Support configuring authentication-related sensitive fields in bridges (i.e. passwords, tokens, secret keys) via secrets stored as files in the file system, through special `file://` prefix.

View File

@ -40,6 +40,7 @@ ATTACH='no'
STOP='no' STOP='no'
IS_CI='no' IS_CI='no'
ODBC_REQUEST='no' ODBC_REQUEST='no'
UP='up'
while [ "$#" -gt 0 ]; do while [ "$#" -gt 0 ]; do
case $1 in case $1 in
-h|--help) -h|--help)
@ -72,6 +73,7 @@ while [ "$#" -gt 0 ]; do
;; ;;
--ci) --ci)
IS_CI='yes' IS_CI='yes'
UP='up --quiet-pull'
shift 1 shift 1
;; ;;
--) --)
@ -254,10 +256,8 @@ else
INSTALL_ODBC="echo 'msodbc driver not requested'" INSTALL_ODBC="echo 'msodbc driver not requested'"
fi fi
F_OPTIONS=""
for file in "${FILES[@]}"; do for file in "${FILES[@]}"; do
F_OPTIONS="$F_OPTIONS -f $file" DC="$DC -f $file"
done done
DOCKER_USER="$(id -u)" DOCKER_USER="$(id -u)"
@ -275,15 +275,14 @@ if [ "$STOP" = 'no' ]; then
# some left-over log file has to be deleted before a new docker-compose up # some left-over log file has to be deleted before a new docker-compose up
rm -f '.ci/docker-compose-file/redis/*.log' rm -f '.ci/docker-compose-file/redis/*.log'
set +e set +e
# shellcheck disable=2086 # no quotes for F_OPTIONS # shellcheck disable=2086 # no quotes for UP
$DC $F_OPTIONS up -d --build --remove-orphans $DC $UP -d --build --remove-orphans
RESULT=$? RESULT=$?
if [ $RESULT -ne 0 ]; then if [ $RESULT -ne 0 ]; then
mkdir -p _build/test/logs mkdir -p _build/test/logs
LOG='_build/test/logs/docker-compose.log' LOG='_build/test/logs/docker-compose.log'
echo "Dumping docker-compose log to $LOG" echo "Dumping docker-compose log to $LOG"
# shellcheck disable=2086 # no quotes for F_OPTIONS $DC logs --no-color --timestamps > "$LOG"
$DC $F_OPTIONS logs --no-color --timestamps > "$LOG"
exit 1 exit 1
fi fi
set -e set -e
@ -309,8 +308,7 @@ fi
set +e set +e
if [ "$STOP" = 'yes' ]; then if [ "$STOP" = 'yes' ]; then
# shellcheck disable=2086 # no quotes for F_OPTIONS $DC down --remove-orphans
$DC $F_OPTIONS down --remove-orphans
elif [ "$ATTACH" = 'yes' ]; then elif [ "$ATTACH" = 'yes' ]; then
docker exec -it "$ERLANG_CONTAINER" bash docker exec -it "$ERLANG_CONTAINER" bash
elif [ "$CONSOLE" = 'yes' ]; then elif [ "$CONSOLE" = 'yes' ]; then
@ -335,12 +333,10 @@ else
if [ "$RESULT" -ne 0 ]; then if [ "$RESULT" -ne 0 ]; then
LOG='_build/test/logs/docker-compose.log' LOG='_build/test/logs/docker-compose.log'
echo "Dumping docker-compose log to $LOG" echo "Dumping docker-compose log to $LOG"
# shellcheck disable=2086 # no quotes for F_OPTIONS $DC logs --no-color --timestamps > "$LOG"
$DC $F_OPTIONS logs --no-color --timestamps > "$LOG"
fi fi
if [ "$KEEP_UP" != 'yes' ]; then if [ "$KEEP_UP" != 'yes' ]; then
# shellcheck disable=2086 # no quotes for F_OPTIONS $DC down
$DC $F_OPTIONS down
fi fi
exit "$RESULT" exit "$RESULT"
fi fi