Merge pull request #11363 from thalesmg/rabbit-tls-20230727
feat(rabbitmq_bridge): add TLS support
This commit is contained in:
commit
d6344ab709
|
@ -13,5 +13,10 @@ services:
|
|||
# ports:
|
||||
# - "15672:15672"
|
||||
# - "5672:5672"
|
||||
volumes:
|
||||
- ./certs/ca.crt:/opt/certs/ca.crt
|
||||
- ./certs/server.crt:/opt/certs/server.crt
|
||||
- ./certs/server.key:/opt/certs/server.key
|
||||
- ./rabbitmq/20-tls.conf:/etc/rabbitmq/conf.d/20-tls.conf
|
||||
networks:
|
||||
- emqx_bridge
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
listeners.ssl.default = 5671
|
||||
|
||||
ssl_options.cacertfile = /opt/certs/ca.crt
|
||||
ssl_options.certfile = /opt/certs/server.crt
|
||||
ssl_options.keyfile = /opt/certs/server.key
|
||||
ssl_options.verify = verify_peer
|
||||
ssl_options.fail_if_no_peer_cert = true
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_rabbitmq, [
|
||||
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
||||
{vsn, "0.1.3"},
|
||||
{vsn, "0.1.4"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
|
|
@ -155,7 +155,7 @@ fields(config) ->
|
|||
desc => ?DESC("payload_template")
|
||||
}
|
||||
)}
|
||||
].
|
||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||
|
||||
values(post) ->
|
||||
maps:merge(values(put), #{name => <<"connector">>});
|
||||
|
@ -320,10 +320,18 @@ create_rabbitmq_connection_and_channel(Config) ->
|
|||
wait_for_publish_confirmations := WaitForPublishConfirmations
|
||||
} = Config,
|
||||
Password = emqx_secret:unwrap(WrappedPassword),
|
||||
SSLOptions =
|
||||
case maps:get(ssl, Config, #{}) of
|
||||
#{enable := true} = SSLOpts ->
|
||||
emqx_tls_lib:to_client_opts(SSLOpts);
|
||||
_ ->
|
||||
none
|
||||
end,
|
||||
RabbitMQConnectionOptions =
|
||||
#amqp_params_network{
|
||||
host = erlang:binary_to_list(Host),
|
||||
port = Port,
|
||||
ssl_options = SSLOptions,
|
||||
username = Username,
|
||||
password = Password,
|
||||
connection_timeout = Timeout,
|
||||
|
|
|
@ -38,22 +38,34 @@ get_channel_connection(Config) ->
|
|||
%% Common Test Setup, Teardown and Testcase List
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, tcp},
|
||||
{group, tls}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||
[
|
||||
{tcp, AllTCs},
|
||||
{tls, AllTCs}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
% snabbkaffe:fix_ct_logging(),
|
||||
case
|
||||
emqx_common_test_helpers:is_tcp_server_available(
|
||||
erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port()
|
||||
)
|
||||
of
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
init_per_group(tcp, Config) ->
|
||||
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
|
||||
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
|
||||
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
||||
true ->
|
||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(amqp_client),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
|
||||
[{channel_connection, ChannelConnection} | Config];
|
||||
Config1 = common_init_per_group(#{
|
||||
host => RabbitMQHost, port => RabbitMQPort, tls => false
|
||||
}),
|
||||
Config1 ++ Config;
|
||||
false ->
|
||||
case os:getenv("IS_CI") of
|
||||
"yes" ->
|
||||
|
@ -61,14 +73,64 @@ init_per_suite(Config) ->
|
|||
_ ->
|
||||
{skip, no_rabbitmq}
|
||||
end
|
||||
end.
|
||||
end;
|
||||
init_per_group(tls, Config) ->
|
||||
RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"),
|
||||
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")),
|
||||
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
||||
true ->
|
||||
Config1 = common_init_per_group(#{
|
||||
host => RabbitMQHost, port => RabbitMQPort, tls => true
|
||||
}),
|
||||
Config1 ++ Config;
|
||||
false ->
|
||||
case os:getenv("IS_CI") of
|
||||
"yes" ->
|
||||
throw(no_rabbitmq);
|
||||
_ ->
|
||||
{skip, no_rabbitmq}
|
||||
end
|
||||
end;
|
||||
init_per_group(_Group, Config) ->
|
||||
Config.
|
||||
|
||||
setup_rabbit_mq_exchange_and_queue() ->
|
||||
common_init_per_group(Opts) ->
|
||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
{ok, _} = application:ensure_all_started(amqp_client),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Opts),
|
||||
[{channel_connection, ChannelConnection}].
|
||||
|
||||
setup_rabbit_mq_exchange_and_queue(#{host := RabbitMQHost, port := RabbitMQPort, tls := UseTLS}) ->
|
||||
SSLOptions =
|
||||
case UseTLS of
|
||||
false ->
|
||||
none;
|
||||
true ->
|
||||
CertsDir = filename:join([
|
||||
emqx_common_test_helpers:proj_root(),
|
||||
".ci",
|
||||
"docker-compose-file",
|
||||
"certs"
|
||||
]),
|
||||
emqx_tls_lib:to_client_opts(
|
||||
#{
|
||||
enable => true,
|
||||
cacertfile => filename:join([CertsDir, "ca.crt"]),
|
||||
certfile => filename:join([CertsDir, "client.pem"]),
|
||||
keyfile => filename:join([CertsDir, "client.key"])
|
||||
}
|
||||
)
|
||||
end,
|
||||
%% Create an exachange and a queue
|
||||
{ok, Connection} =
|
||||
amqp_connection:start(#amqp_params_network{
|
||||
host = erlang:binary_to_list(rabbit_mq_host()),
|
||||
port = rabbit_mq_port()
|
||||
host = RabbitMQHost,
|
||||
port = RabbitMQPort,
|
||||
ssl_options = SSLOptions
|
||||
}),
|
||||
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||
%% Create an exchange
|
||||
|
@ -101,7 +163,7 @@ setup_rabbit_mq_exchange_and_queue() ->
|
|||
channel => Channel
|
||||
}.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
end_per_group(_Group, Config) ->
|
||||
#{
|
||||
connection := Connection,
|
||||
channel := Channel
|
||||
|
@ -122,9 +184,6 @@ init_per_testcase(_, Config) ->
|
|||
end_per_testcase(_, _Config) ->
|
||||
ok.
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
rabbitmq_config(Config) ->
|
||||
%%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
|
||||
BatchSize = maps:get(batch_size, Config, 1),
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Added TLS connection support to RabbitMQ bridge.
|
Loading…
Reference in New Issue