feat(rabbitmq_bridge): add TLS support
Fixes https://emqx.atlassian.net/browse/EMQX-10605
This commit is contained in:
parent
74f4fce9ed
commit
399f849f7b
|
@ -13,5 +13,10 @@ services:
|
||||||
# ports:
|
# ports:
|
||||||
# - "15672:15672"
|
# - "15672:15672"
|
||||||
# - "5672:5672"
|
# - "5672:5672"
|
||||||
|
volumes:
|
||||||
|
- ./certs/ca.crt:/opt/certs/ca.crt
|
||||||
|
- ./certs/server.crt:/opt/certs/server.crt
|
||||||
|
- ./certs/server.key:/opt/certs/server.key
|
||||||
|
- ./rabbitmq/20-tls.conf:/etc/rabbitmq/conf.d/20-tls.conf
|
||||||
networks:
|
networks:
|
||||||
- emqx_bridge
|
- emqx_bridge
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
listeners.ssl.default = 5671
|
||||||
|
|
||||||
|
ssl_options.cacertfile = /opt/certs/ca.crt
|
||||||
|
ssl_options.certfile = /opt/certs/server.crt
|
||||||
|
ssl_options.keyfile = /opt/certs/server.key
|
||||||
|
ssl_options.verify = verify_peer
|
||||||
|
ssl_options.fail_if_no_peer_cert = true
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_rabbitmq, [
|
{application, emqx_bridge_rabbitmq, [
|
||||||
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
||||||
{vsn, "0.1.3"},
|
{vsn, "0.1.4"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -155,7 +155,7 @@ fields(config) ->
|
||||||
desc => ?DESC("payload_template")
|
desc => ?DESC("payload_template")
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
].
|
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
values(post) ->
|
values(post) ->
|
||||||
maps:merge(values(put), #{name => <<"connector">>});
|
maps:merge(values(put), #{name => <<"connector">>});
|
||||||
|
@ -320,10 +320,18 @@ create_rabbitmq_connection_and_channel(Config) ->
|
||||||
wait_for_publish_confirmations := WaitForPublishConfirmations
|
wait_for_publish_confirmations := WaitForPublishConfirmations
|
||||||
} = Config,
|
} = Config,
|
||||||
Password = emqx_secret:unwrap(WrappedPassword),
|
Password = emqx_secret:unwrap(WrappedPassword),
|
||||||
|
SSLOptions =
|
||||||
|
case maps:get(ssl, Config, #{}) of
|
||||||
|
#{enable := true} = SSLOpts ->
|
||||||
|
emqx_tls_lib:to_client_opts(SSLOpts);
|
||||||
|
_ ->
|
||||||
|
none
|
||||||
|
end,
|
||||||
RabbitMQConnectionOptions =
|
RabbitMQConnectionOptions =
|
||||||
#amqp_params_network{
|
#amqp_params_network{
|
||||||
host = erlang:binary_to_list(Host),
|
host = erlang:binary_to_list(Host),
|
||||||
port = Port,
|
port = Port,
|
||||||
|
ssl_options = SSLOptions,
|
||||||
username = Username,
|
username = Username,
|
||||||
password = Password,
|
password = Password,
|
||||||
connection_timeout = Timeout,
|
connection_timeout = Timeout,
|
||||||
|
|
|
@ -38,22 +38,34 @@ get_channel_connection(Config) ->
|
||||||
%% Common Test Setup, Teardown and Testcase List
|
%% Common Test Setup, Teardown and Testcase List
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, tcp},
|
||||||
|
{group, tls}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
|
[
|
||||||
|
{tcp, AllTCs},
|
||||||
|
{tls, AllTCs}
|
||||||
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
% snabbkaffe:fix_ct_logging(),
|
Config.
|
||||||
case
|
|
||||||
emqx_common_test_helpers:is_tcp_server_available(
|
end_per_suite(_Config) ->
|
||||||
erlang:binary_to_list(rabbit_mq_host()), rabbit_mq_port()
|
ok.
|
||||||
)
|
|
||||||
of
|
init_per_group(tcp, Config) ->
|
||||||
|
RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"),
|
||||||
|
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")),
|
||||||
|
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
||||||
true ->
|
true ->
|
||||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
Config1 = common_init_per_group(#{
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
host => RabbitMQHost, port => RabbitMQPort, tls => false
|
||||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
}),
|
||||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
Config1 ++ Config;
|
||||||
{ok, _} = application:ensure_all_started(amqp_client),
|
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
|
||||||
ChannelConnection = setup_rabbit_mq_exchange_and_queue(),
|
|
||||||
[{channel_connection, ChannelConnection} | Config];
|
|
||||||
false ->
|
false ->
|
||||||
case os:getenv("IS_CI") of
|
case os:getenv("IS_CI") of
|
||||||
"yes" ->
|
"yes" ->
|
||||||
|
@ -61,14 +73,64 @@ init_per_suite(Config) ->
|
||||||
_ ->
|
_ ->
|
||||||
{skip, no_rabbitmq}
|
{skip, no_rabbitmq}
|
||||||
end
|
end
|
||||||
end.
|
end;
|
||||||
|
init_per_group(tls, Config) ->
|
||||||
|
RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"),
|
||||||
|
RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")),
|
||||||
|
case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of
|
||||||
|
true ->
|
||||||
|
Config1 = common_init_per_group(#{
|
||||||
|
host => RabbitMQHost, port => RabbitMQPort, tls => true
|
||||||
|
}),
|
||||||
|
Config1 ++ Config;
|
||||||
|
false ->
|
||||||
|
case os:getenv("IS_CI") of
|
||||||
|
"yes" ->
|
||||||
|
throw(no_rabbitmq);
|
||||||
|
_ ->
|
||||||
|
{skip, no_rabbitmq}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
init_per_group(_Group, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
setup_rabbit_mq_exchange_and_queue() ->
|
common_init_per_group(Opts) ->
|
||||||
|
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||||
|
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
|
{ok, _} = application:ensure_all_started(amqp_client),
|
||||||
|
emqx_mgmt_api_test_util:init_suite(),
|
||||||
|
ChannelConnection = setup_rabbit_mq_exchange_and_queue(Opts),
|
||||||
|
[{channel_connection, ChannelConnection}].
|
||||||
|
|
||||||
|
setup_rabbit_mq_exchange_and_queue(#{host := RabbitMQHost, port := RabbitMQPort, tls := UseTLS}) ->
|
||||||
|
SSLOptions =
|
||||||
|
case UseTLS of
|
||||||
|
false ->
|
||||||
|
none;
|
||||||
|
true ->
|
||||||
|
CertsDir = filename:join([
|
||||||
|
emqx_common_test_helpers:proj_root(),
|
||||||
|
".ci",
|
||||||
|
"docker-compose-file",
|
||||||
|
"certs"
|
||||||
|
]),
|
||||||
|
emqx_tls_lib:to_client_opts(
|
||||||
|
#{
|
||||||
|
enable => true,
|
||||||
|
cacertfile => filename:join([CertsDir, "ca.crt"]),
|
||||||
|
certfile => filename:join([CertsDir, "client.pem"]),
|
||||||
|
keyfile => filename:join([CertsDir, "client.key"])
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end,
|
||||||
%% Create an exachange and a queue
|
%% Create an exachange and a queue
|
||||||
{ok, Connection} =
|
{ok, Connection} =
|
||||||
amqp_connection:start(#amqp_params_network{
|
amqp_connection:start(#amqp_params_network{
|
||||||
host = erlang:binary_to_list(rabbit_mq_host()),
|
host = RabbitMQHost,
|
||||||
port = rabbit_mq_port()
|
port = RabbitMQPort,
|
||||||
|
ssl_options = SSLOptions
|
||||||
}),
|
}),
|
||||||
{ok, Channel} = amqp_connection:open_channel(Connection),
|
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||||
%% Create an exchange
|
%% Create an exchange
|
||||||
|
@ -101,7 +163,7 @@ setup_rabbit_mq_exchange_and_queue() ->
|
||||||
channel => Channel
|
channel => Channel
|
||||||
}.
|
}.
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
end_per_group(_Group, Config) ->
|
||||||
#{
|
#{
|
||||||
connection := Connection,
|
connection := Connection,
|
||||||
channel := Channel
|
channel := Channel
|
||||||
|
@ -122,9 +184,6 @@ init_per_testcase(_, Config) ->
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
all() ->
|
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
|
||||||
|
|
||||||
rabbitmq_config(Config) ->
|
rabbitmq_config(Config) ->
|
||||||
%%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
|
%%SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
|
||||||
BatchSize = maps:get(batch_size, Config, 1),
|
BatchSize = maps:get(batch_size, Config, 1),
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Added TLS connection support to RabbitMQ bridge.
|
Loading…
Reference in New Issue