feat: add SSL support to RocketMQ connector

This commit is contained in:
Kjell Winblad 2024-06-17 14:17:43 +02:00
parent b64f0c0ca7
commit c39c544c96
12 changed files with 180 additions and 14 deletions

View File

@ -0,0 +1,41 @@
version: '3.9'
services:
mqnamesrvssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_namesrv_ssl
# ports:
# - 9876:9876
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
environment:
JAVA_OPT: "-Dtls.server.mode=enforcing"
command: ./mqnamesrv
networks:
- emqx_bridge
mqbrokerssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_broker_ssl
# ports:
# - 10909:10909
# - 10911:10911
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
- ./rocketmq/conf_ssl/broker.conf:/etc/rocketmq/broker.conf
- ./rocketmq/conf_ssl/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml
environment:
NAMESRV_ADDR: "rocketmq_namesrv_ssl:9876"
JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99"
JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn512m -Dtls.server.mode=enforcing"
command: ./mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- mqnamesrvssl
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,24 @@
brokerClusterName=DefaultClusterSSL
brokerName=broker-a
brokerId=0
brokerIP1=rocketmq_broker_ssl
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
aclEnable=true

View File

@ -0,0 +1,12 @@
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: PUB|SUB
topicPerms:
- TopicTest=PUB|SUB
- Topic2=PUB|SUB

View File

@ -1,2 +1,3 @@
toxiproxy
rocketmq
rocketmq_ssl

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}.
{deps, [
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.3"}}},
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.6.1"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [

View File

@ -178,6 +178,7 @@ fields(action_parameters) ->
Parameters,
[
servers,
ssl,
namespace,
pool_size,
auto_reconnect,

View File

@ -80,7 +80,7 @@ fields(config) ->
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
].
] ++ emqx_connector_schema_lib:ssl_fields().
servers() ->
Meta = #{desc => ?DESC("servers")},
@ -98,7 +98,8 @@ on_start(
servers := BinServers,
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken
security_token := SecurityToken,
ssl := SSLOptsMap
} = Config
) ->
?SLOG(info, #{
@ -113,13 +114,21 @@ on_start(
ClientId = client_id(InstanceId),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
Namespace = maps:get(namespace, Config, <<>>),
ClientCfg = #{acl_info => ACLInfo, namespace => Namespace},
ClientCfg0 = #{acl_info => ACLInfo, namespace => Namespace},
SSLOpts = emqx_tls_lib:to_client_opts(SSLOptsMap),
ClientCfg =
case SSLOpts of
[] ->
ClientCfg0;
SSLOpts ->
ClientCfg0#{ssl_opts => SSLOpts}
end,
State = #{
client_id => ClientId,
acl_info => ACLInfo,
namespace => Namespace,
installed_channels => #{}
installed_channels => #{},
ssl_opts => SSLOpts
},
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
@ -142,12 +151,13 @@ on_add_channel(
#{
installed_channels := InstalledChannels,
namespace := Namespace,
acl_info := ACLInfo
acl_info := ACLInfo,
ssl_opts := SSLOpts
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace),
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace, SSLOpts),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
@ -156,7 +166,8 @@ on_add_channel(
create_channel_state(
#{parameters := Conf} = _ChannelConfig,
ACLInfo,
Namespace
Namespace,
SSLOpts
) ->
#{
topic := Topic,
@ -164,7 +175,7 @@ create_channel_state(
strategy := Strategy
} = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy, SSLOpts),
Templates = parse_template(Conf),
DispatchStrategy = parse_dispatch_strategy(Strategy),
State = #{
@ -407,9 +418,10 @@ make_producer_opts(
},
ACLInfo,
Namespace,
Strategy
Strategy,
SSLOpts
) ->
#{
ProducerOpts = #{
tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval,
acl_info => emqx_secret:wrap(ACLInfo),
@ -419,7 +431,13 @@ make_producer_opts(
roundrobin -> roundrobin;
_ -> key_dispatch
end
}.
},
case SSLOpts of
[] ->
ProducerOpts;
_ ->
ProducerOpts#{ssl_opts => SSLOpts}
end.
acl_info(<<>>, _, _) ->
#{};

View File

@ -116,9 +116,17 @@ common_init(ConfigT) ->
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
{Name, RocketMQConf} = rocketmq_config(BridgeType, Config0),
RocketMQSSLConf = RocketMQConf#{
<<"servers">> => <<"rocketmq_namesrv_ssl:9876">>,
<<"ssl">> => #{
<<"enable">> => true,
<<"verify">> => verify_none
}
},
Config =
[
{rocketmq_config, RocketMQConf},
{rocketmq_config_ssl, RocketMQSSLConf},
{rocketmq_bridge_type, BridgeType},
{rocketmq_name, Name},
{proxy_host, ProxyHost},
@ -180,6 +188,26 @@ create_bridge(Config) ->
RocketMQConf = ?GET_CONFIG(rocketmq_config, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl_bad_ssl_opts(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf0 = ?GET_CONFIG(rocketmq_config_ssl, Config),
RocketMQConf1 = maps:put(
<<"ssl">>,
#{
<<"enable">> => true,
<<"verify">> => verify_peer
},
RocketMQConf0
),
emqx_bridge:create(BridgeType, Name, RocketMQConf1).
delete_bridge(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
@ -233,6 +261,44 @@ t_setup_via_config_and_publish(Config) ->
),
ok.
t_setup_via_config_and_publish_ssl(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl(Config)
),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
ok.
%% Check that we can not connect to the SSL only RocketMQ instance
%% with incorrect SSL options
t_setup_via_config_ssl_host_bad_ssl_opts(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl_bad_ssl_opts(Config)
),
Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := disconnected}, emqx_bridge_v2:health_check(BridgeType, Name)),
ok.
t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),

View File

@ -202,6 +202,9 @@ for dep in ${CT_DEPS}; do
rocketmq)
FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' )
;;
rocketmq_ssl)
FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml' )
;;
cassandra)
FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' )
;;