From 34aeeab041dcb5653e019da5617b4a52826fa66d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 7 Nov 2023 21:11:25 +0700 Subject: [PATCH] feat(rabbitmq): accept wrapped secrets as passwords --- .../src/emqx_bridge_rabbitmq.app.src | 2 +- .../src/emqx_bridge_rabbitmq_connector.erl | 17 ++--- .../emqx_bridge_rabbitmq_connector_SUITE.erl | 72 +++++++++++++------ 3 files changed, 58 insertions(+), 33 deletions(-) diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index c7d931c93..7e32b5a89 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index ff439b676..04a93e08e 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -72,7 +72,7 @@ fields(config) -> desc => ?DESC("username") } )}, - {password, fun emqx_connector_schema_lib:password_required/1}, + {password, emqx_connector_schema_lib:password_field(#{required => true})}, {pool_size, hoconsc:mk( typerefl:pos_integer(), @@ -194,7 +194,6 @@ on_start( #{ pool_size := PoolSize, payload_template := PayloadTemplate, - password := Password, delivery_mode := InitialDeliveryMode } = InitialConfig ) -> @@ -204,7 +203,6 @@ on_start( persistent -> 2 end, Config = InitialConfig#{ - password => emqx_secret:wrap(Password), delivery_mode => DeliveryMode }, ?SLOG(info, #{ @@ -240,13 +238,11 @@ on_start( ok -> {ok, State}; {error, Reason} -> - LogMessage = - #{ - msg => "rabbitmq_connector_start_failed", - error_reason => Reason, - config => emqx_utils:redact(Config) - }, - ?SLOG(info, LogMessage), + ?SLOG(info, #{ + msg => "rabbitmq_connector_start_failed", + error_reason => Reason, + config => emqx_utils:redact(Config) + }), {error, Reason} end. @@ -319,6 +315,7 @@ create_rabbitmq_connection_and_channel(Config) -> heartbeat := Heartbeat, wait_for_publish_confirmations := WaitForPublishConfirmations } = Config, + %% TODO: teach `amqp` to accept 0-arity closures as passwords. Password = emqx_secret:unwrap(WrappedPassword), SSLOptions = case maps:get(ssl, Config, #{}) of diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index 106a4d67b..689c39dc5 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -10,6 +10,7 @@ -include("emqx_connector.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). %% This test SUITE requires a running RabbitMQ instance. If you don't want to @@ -26,6 +27,9 @@ rabbit_mq_host() -> rabbit_mq_port() -> 5672. +rabbit_mq_password() -> + <<"guest">>. + rabbit_mq_exchange() -> <<"test_exchange">>. @@ -45,12 +49,12 @@ init_per_suite(Config) -> ) of true -> - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource]), - {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(amqp_client), + Apps = emqx_cth_suite:start( + [emqx_conf, emqx_connector, emqx_bridge_rabbitmq], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), ChannelConnection = setup_rabbit_mq_exchange_and_queue(), - [{channel_connection, ChannelConnection} | Config]; + [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config]; false -> case os:getenv("IS_CI") of "yes" -> @@ -106,13 +110,11 @@ end_per_suite(Config) -> connection := Connection, channel := Channel } = get_channel_connection(Config), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector), %% Close the channel ok = amqp_channel:close(Channel), %% Close the connection - ok = amqp_connection:close(Connection). + ok = amqp_connection:close(Connection), + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). % %%------------------------------------------------------------------------------ % %% Testcases @@ -125,23 +127,31 @@ t_lifecycle(Config) -> Config ). +t_start_passfile(Config) -> + ResourceID = atom_to_binary(?FUNCTION_NAME), + PasswordFilename = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(PasswordFilename, rabbit_mq_password()), + InitialConfig = rabbitmq_config(#{ + password => iolist_to_binary(["file://", PasswordFilename]) + }), + ?assertMatch( + #{status := connected}, + create_local_resource(ResourceID, check_config(InitialConfig)) + ), + ?assertEqual( + ok, + emqx_resource:remove_local(ResourceID) + ). + perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> #{ channel := Channel } = get_channel_connection(TestConfig), - {ok, #{config := CheckedConfig}} = - emqx_resource:check_config(emqx_bridge_rabbitmq_connector, InitialConfig), - {ok, #{ + CheckedConfig = check_config(InitialConfig), + #{ state := #{poolname := PoolName} = State, status := InitialStatus - }} = - emqx_resource:create_local( - ResourceID, - ?CONNECTOR_RESOURCE_GROUP, - emqx_bridge_rabbitmq_connector, - CheckedConfig, - #{} - ), + } = create_local_resource(ResourceID, CheckedConfig), ?assertEqual(InitialStatus, connected), %% Instance should match the state and status of the just started resource {ok, ?CONNECTOR_RESOURCE_GROUP, #{ @@ -184,6 +194,21 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> % %% Helpers % %%------------------------------------------------------------------------------ +check_config(Config) -> + {ok, #{config := CheckedConfig}} = + emqx_resource:check_config(emqx_bridge_rabbitmq_connector, Config), + CheckedConfig. + +create_local_resource(ResourceID, CheckedConfig) -> + {ok, Bridge} = emqx_resource:create_local( + ResourceID, + ?CONNECTOR_RESOURCE_GROUP, + emqx_bridge_rabbitmq_connector, + CheckedConfig, + #{} + ), + Bridge. + perform_query(PoolName, Channel) -> %% Send message to queue: ok = emqx_resource:query(PoolName, {query, test_data()}), @@ -216,16 +241,19 @@ receive_simple_test_message(Channel) -> end. rabbitmq_config() -> + rabbitmq_config(#{}). + +rabbitmq_config(Overrides) -> Config = #{ server => rabbit_mq_host(), port => 5672, username => <<"guest">>, - password => <<"guest">>, + password => rabbit_mq_password(), exchange => rabbit_mq_exchange(), routing_key => rabbit_mq_routing_key() }, - #{<<"config">> => Config}. + #{<<"config">> => maps:merge(Config, Overrides)}. test_data() -> #{<<"msg_field">> => <<"Hello">>}.