feat(rabbitmq): accept wrapped secrets as passwords

This commit is contained in:
Andrew Mayorov 2023-11-07 21:11:25 +07:00
parent a69a78d024
commit 34aeeab041
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 58 additions and 33 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [ {application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"}, {description, "EMQX Enterprise RabbitMQ Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -72,7 +72,7 @@ fields(config) ->
desc => ?DESC("username") desc => ?DESC("username")
} }
)}, )},
{password, fun emqx_connector_schema_lib:password_required/1}, {password, emqx_connector_schema_lib:password_field(#{required => true})},
{pool_size, {pool_size,
hoconsc:mk( hoconsc:mk(
typerefl:pos_integer(), typerefl:pos_integer(),
@ -194,7 +194,6 @@ on_start(
#{ #{
pool_size := PoolSize, pool_size := PoolSize,
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
password := Password,
delivery_mode := InitialDeliveryMode delivery_mode := InitialDeliveryMode
} = InitialConfig } = InitialConfig
) -> ) ->
@ -204,7 +203,6 @@ on_start(
persistent -> 2 persistent -> 2
end, end,
Config = InitialConfig#{ Config = InitialConfig#{
password => emqx_secret:wrap(Password),
delivery_mode => DeliveryMode delivery_mode => DeliveryMode
}, },
?SLOG(info, #{ ?SLOG(info, #{
@ -240,13 +238,11 @@ on_start(
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
LogMessage = ?SLOG(info, #{
#{
msg => "rabbitmq_connector_start_failed", msg => "rabbitmq_connector_start_failed",
error_reason => Reason, error_reason => Reason,
config => emqx_utils:redact(Config) config => emqx_utils:redact(Config)
}, }),
?SLOG(info, LogMessage),
{error, Reason} {error, Reason}
end. end.
@ -319,6 +315,7 @@ create_rabbitmq_connection_and_channel(Config) ->
heartbeat := Heartbeat, heartbeat := Heartbeat,
wait_for_publish_confirmations := WaitForPublishConfirmations wait_for_publish_confirmations := WaitForPublishConfirmations
} = Config, } = Config,
%% TODO: teach `amqp` to accept 0-arity closures as passwords.
Password = emqx_secret:unwrap(WrappedPassword), Password = emqx_secret:unwrap(WrappedPassword),
SSLOptions = SSLOptions =
case maps:get(ssl, Config, #{}) of case maps:get(ssl, Config, #{}) of

View File

@ -10,6 +10,7 @@
-include("emqx_connector.hrl"). -include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/amqp_client.hrl").
%% This test SUITE requires a running RabbitMQ instance. If you don't want to %% This test SUITE requires a running RabbitMQ instance. If you don't want to
@ -26,6 +27,9 @@ rabbit_mq_host() ->
rabbit_mq_port() -> rabbit_mq_port() ->
5672. 5672.
rabbit_mq_password() ->
<<"guest">>.
rabbit_mq_exchange() -> rabbit_mq_exchange() ->
<<"test_exchange">>. <<"test_exchange">>.
@ -45,12 +49,12 @@ init_per_suite(Config) ->
) )
of of
true -> true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]), Apps = emqx_cth_suite:start(
ok = emqx_connector_test_helpers:start_apps([emqx_resource]), [emqx_conf, emqx_connector, emqx_bridge_rabbitmq],
{ok, _} = application:ensure_all_started(emqx_connector), #{work_dir => emqx_cth_suite:work_dir(Config)}
{ok, _} = application:ensure_all_started(amqp_client), ),
ChannelConnection = setup_rabbit_mq_exchange_and_queue(), ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
[{channel_connection, ChannelConnection} | Config]; [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config];
false -> false ->
case os:getenv("IS_CI") of case os:getenv("IS_CI") of
"yes" -> "yes" ->
@ -106,13 +110,11 @@ end_per_suite(Config) ->
connection := Connection, connection := Connection,
channel := Channel channel := Channel
} = get_channel_connection(Config), } = get_channel_connection(Config),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
%% Close the channel %% Close the channel
ok = amqp_channel:close(Channel), ok = amqp_channel:close(Channel),
%% Close the connection %% Close the connection
ok = amqp_connection:close(Connection). ok = amqp_connection:close(Connection),
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
% %%------------------------------------------------------------------------------ % %%------------------------------------------------------------------------------
% %% Testcases % %% Testcases
@ -125,23 +127,31 @@ t_lifecycle(Config) ->
Config Config
). ).
t_start_passfile(Config) ->
ResourceID = atom_to_binary(?FUNCTION_NAME),
PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(PasswordFilename, rabbit_mq_password()),
InitialConfig = rabbitmq_config(#{
password => iolist_to_binary(["file://", PasswordFilename])
}),
?assertMatch(
#{status := connected},
create_local_resource(ResourceID, check_config(InitialConfig))
),
?assertEqual(
ok,
emqx_resource:remove_local(ResourceID)
).
perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
#{ #{
channel := Channel channel := Channel
} = get_channel_connection(TestConfig), } = get_channel_connection(TestConfig),
{ok, #{config := CheckedConfig}} = CheckedConfig = check_config(InitialConfig),
emqx_resource:check_config(emqx_bridge_rabbitmq_connector, InitialConfig), #{
{ok, #{
state := #{poolname := PoolName} = State, state := #{poolname := PoolName} = State,
status := InitialStatus status := InitialStatus
}} = } = create_local_resource(ResourceID, CheckedConfig),
emqx_resource:create_local(
ResourceID,
?CONNECTOR_RESOURCE_GROUP,
emqx_bridge_rabbitmq_connector,
CheckedConfig,
#{}
),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
%% Instance should match the state and status of the just started resource %% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{ {ok, ?CONNECTOR_RESOURCE_GROUP, #{
@ -184,6 +194,21 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) ->
% %% Helpers % %% Helpers
% %%------------------------------------------------------------------------------ % %%------------------------------------------------------------------------------
check_config(Config) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(emqx_bridge_rabbitmq_connector, Config),
CheckedConfig.
create_local_resource(ResourceID, CheckedConfig) ->
{ok, Bridge} = emqx_resource:create_local(
ResourceID,
?CONNECTOR_RESOURCE_GROUP,
emqx_bridge_rabbitmq_connector,
CheckedConfig,
#{}
),
Bridge.
perform_query(PoolName, Channel) -> perform_query(PoolName, Channel) ->
%% Send message to queue: %% Send message to queue:
ok = emqx_resource:query(PoolName, {query, test_data()}), ok = emqx_resource:query(PoolName, {query, test_data()}),
@ -216,16 +241,19 @@ receive_simple_test_message(Channel) ->
end. end.
rabbitmq_config() -> rabbitmq_config() ->
rabbitmq_config(#{}).
rabbitmq_config(Overrides) ->
Config = Config =
#{ #{
server => rabbit_mq_host(), server => rabbit_mq_host(),
port => 5672, port => 5672,
username => <<"guest">>, username => <<"guest">>,
password => <<"guest">>, password => rabbit_mq_password(),
exchange => rabbit_mq_exchange(), exchange => rabbit_mq_exchange(),
routing_key => rabbit_mq_routing_key() routing_key => rabbit_mq_routing_key()
}, },
#{<<"config">> => Config}. #{<<"config">> => maps:merge(Config, Overrides)}.
test_data() -> test_data() ->
#{<<"msg_field">> => <<"Hello">>}. #{<<"msg_field">> => <<"Hello">>}.