feat(cassa): accept wrapped secrets as passwords

This commit is contained in:
Andrew Mayorov 2023-11-07 22:02:02 +07:00
parent 8b4ac8eb4f
commit 9c5856029f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 52 additions and 48 deletions

View File

@ -70,7 +70,7 @@ cassandra_db_fields() ->
{keyspace, fun keyspace/1}, {keyspace, fun keyspace/1},
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1}, {username, fun emqx_connector_schema_lib:username/1},
{password, fun emqx_connector_schema_lib:password/1}, {password, emqx_connector_schema_lib:password_field()},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ].
@ -111,14 +111,14 @@ on_start(
emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION) emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION)
), ),
Options = [ Options =
maps:to_list(maps:with([username, password], Config)) ++
[
{nodes, Servers}, {nodes, Servers},
{keyspace, Keyspace}, {keyspace, Keyspace},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
Options1 = maybe_add_opt(username, Config, Options),
Options2 = maybe_add_opt(password, Config, Options1, _IsSensitive = true),
SslOpts = SslOpts =
case maps:get(enable, SSL) of case maps:get(enable, SSL) of
@ -131,7 +131,7 @@ on_start(
[] []
end, end,
State = parse_prepare_cql(Config), State = parse_prepare_cql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options2 ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
{error, Reason} -> {error, Reason} ->
@ -387,6 +387,7 @@ conn_opts(Opts) ->
conn_opts([], Acc) -> conn_opts([], Acc) ->
Acc; Acc;
conn_opts([{password, Password} | Opts], Acc) -> conn_opts([{password, Password} | Opts], Acc) ->
%% TODO: teach `ecql` to accept 0-arity closures as passwords.
conn_opts(Opts, [{password, emqx_secret:unwrap(Password)} | Acc]); conn_opts(Opts, [{password, emqx_secret:unwrap(Password)} | Acc]);
conn_opts([Opt | Opts], Acc) -> conn_opts([Opt | Opts], Acc) ->
conn_opts(Opts, [Opt | Acc]). conn_opts(Opts, [Opt | Acc]).
@ -512,19 +513,3 @@ maybe_assign_type(V) when is_integer(V) ->
maybe_assign_type(V) when is_float(V) -> {double, V}; maybe_assign_type(V) when is_float(V) -> {double, V};
maybe_assign_type(V) -> maybe_assign_type(V) ->
V. V.
maybe_add_opt(Key, Conf, Opts) ->
maybe_add_opt(Key, Conf, Opts, _IsSensitive = false).
maybe_add_opt(Key, Conf, Opts, IsSensitive) ->
case Conf of
#{Key := Val} ->
[{Key, maybe_wrap(IsSensitive, Val)} | Opts];
_ ->
Opts
end.
maybe_wrap(false = _IsSensitive, Val) ->
Val;
maybe_wrap(true, Val) ->
emqx_secret:wrap(Val).

View File

@ -40,10 +40,9 @@ all() ->
]. ].
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[ [
{auth, TCs}, {auth, [t_lifecycle, t_start_passfile]},
{noauth, TCs} {noauth, [t_lifecycle]}
]. ].
cassandra_servers(CassandraHost) -> cassandra_servers(CassandraHost) ->
@ -115,32 +114,37 @@ end_per_testcase(_, _Config) ->
t_lifecycle(Config) -> t_lifecycle(Config) ->
perform_lifecycle_check( perform_lifecycle_check(
<<"emqx_connector_cassandra_SUITE">>, <<?MODULE_STRING>>,
cassandra_config(Config) cassandra_config(Config)
). ).
show(X) -> t_start_passfile(Config) ->
erlang:display(X), ResourceID = atom_to_binary(?FUNCTION_NAME),
X. PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(PasswordFilename, ?CASSA_PASSWORD),
show(Label, What) -> InitialConfig = emqx_utils_maps:deep_merge(
erlang:display({Label, What}), cassandra_config(Config),
What. #{
<<"config">> => #{
password => iolist_to_binary(["file://", PasswordFilename])
}
}
),
?assertMatch(
#{status := connected},
create_local_resource(ResourceID, check_config(InitialConfig))
),
?assertEqual(
ok,
emqx_resource:remove_local(ResourceID)
).
perform_lifecycle_check(ResourceId, InitialConfig) -> perform_lifecycle_check(ResourceId, InitialConfig) ->
{ok, #{config := CheckedConfig}} = CheckedConfig = check_config(InitialConfig),
emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, InitialConfig), #{
{ok, #{
state := #{pool_name := PoolName} = State, state := #{pool_name := PoolName} = State,
status := InitialStatus status := InitialStatus
}} = } = create_local_resource(ResourceId, CheckedConfig),
emqx_resource:create_local(
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?CASSANDRA_RESOURCE_MOD,
CheckedConfig,
#{}
),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{ {ok, ?CONNECTOR_RESOURCE_GROUP, #{
@ -191,6 +195,21 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
%% utils %% utils
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check_config(Config) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, Config),
CheckedConfig.
create_local_resource(ResourceId, CheckedConfig) ->
{ok, Bridge} =
emqx_resource:create_local(
ResourceId,
?CONNECTOR_RESOURCE_GROUP,
?CASSANDRA_RESOURCE_MOD,
CheckedConfig,
#{}
),
Bridge.
cassandra_config(Config) -> cassandra_config(Config) ->
Host = ?config(cassa_host, Config), Host = ?config(cassa_host, Config),
AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)), AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)),