Merge pull request #13274 from kjellwinblad/kjell/add_ssl_to_rocksdb/EMQX-12289

feat: add SSL support to RocketMQ connector
This commit is contained in:
Kjell Winblad 2024-06-18 09:38:28 +02:00 committed by GitHub
commit 3f7723b2dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 170 additions and 15 deletions

View File

@ -0,0 +1,41 @@
version: '3.9'
services:
mqnamesrvssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_namesrv_ssl
# ports:
# - 9876:9876
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
environment:
JAVA_OPT: "-Dtls.server.mode=enforcing"
command: ./mqnamesrv
networks:
- emqx_bridge
mqbrokerssl:
image: apache/rocketmq:4.9.4
container_name: rocketmq_broker_ssl
# ports:
# - 10909:10909
# - 10911:10911
volumes:
- ./rocketmq/logs_ssl:/opt/logs
- ./rocketmq/store_ssl:/opt/store
- ./rocketmq/conf_ssl/broker.conf:/etc/rocketmq/broker.conf
- ./rocketmq/conf_ssl/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml
environment:
NAMESRV_ADDR: "rocketmq_namesrv_ssl:9876"
JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99"
JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn512m -Dtls.server.mode=enforcing"
command: ./mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- mqnamesrvssl
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,24 @@
brokerClusterName=DefaultClusterSSL
brokerName=broker-a
brokerId=0
brokerIP1=rocketmq_broker_ssl
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=100
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
aclEnable=true

View File

@ -0,0 +1,12 @@
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: PUB|SUB
topicPerms:
- TopicTest=PUB|SUB
- Topic2=PUB|SUB

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.3"}}}, {rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.6.1"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}} {emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [ {application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [ {env, [

View File

@ -178,6 +178,7 @@ fields(action_parameters) ->
Parameters, Parameters,
[ [
servers, servers,
ssl,
namespace, namespace,
pool_size, pool_size,
auto_reconnect, auto_reconnect,

View File

@ -80,7 +80,7 @@ fields(config) ->
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ] ++ emqx_connector_schema_lib:ssl_fields().
servers() -> servers() ->
Meta = #{desc => ?DESC("servers")}, Meta = #{desc => ?DESC("servers")},
@ -98,7 +98,8 @@ on_start(
servers := BinServers, servers := BinServers,
access_key := AccessKey, access_key := AccessKey,
secret_key := SecretKey, secret_key := SecretKey,
security_token := SecurityToken security_token := SecurityToken,
ssl := SSLOptsMap
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -113,13 +114,15 @@ on_start(
ClientId = client_id(InstanceId), ClientId = client_id(InstanceId),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
Namespace = maps:get(namespace, Config, <<>>), Namespace = maps:get(namespace, Config, <<>>),
ClientCfg = #{acl_info => ACLInfo, namespace => Namespace}, ClientCfg0 = #{acl_info => ACLInfo, namespace => Namespace},
SSLOpts = emqx_tls_lib:to_client_opts(SSLOptsMap),
ClientCfg = emqx_utils_maps:put_if(ClientCfg0, ssl_opts, SSLOpts, SSLOpts =/= []),
State = #{ State = #{
client_id => ClientId, client_id => ClientId,
acl_info => ACLInfo, acl_info => ACLInfo,
namespace => Namespace, namespace => Namespace,
installed_channels => #{} installed_channels => #{},
ssl_opts => SSLOpts
}, },
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId), ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
@ -142,12 +145,13 @@ on_add_channel(
#{ #{
installed_channels := InstalledChannels, installed_channels := InstalledChannels,
namespace := Namespace, namespace := Namespace,
acl_info := ACLInfo acl_info := ACLInfo,
ssl_opts := SSLOpts
} = OldState, } = OldState,
ChannelId, ChannelId,
ChannelConfig ChannelConfig
) -> ) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace), {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace, SSLOpts),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state %% Update state
NewState = OldState#{installed_channels => NewInstalledChannels}, NewState = OldState#{installed_channels => NewInstalledChannels},
@ -156,7 +160,8 @@ on_add_channel(
create_channel_state( create_channel_state(
#{parameters := Conf} = _ChannelConfig, #{parameters := Conf} = _ChannelConfig,
ACLInfo, ACLInfo,
Namespace Namespace,
SSLOpts
) -> ) ->
#{ #{
topic := Topic, topic := Topic,
@ -164,7 +169,7 @@ create_channel_state(
strategy := Strategy strategy := Strategy
} = Conf, } = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic), TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy), ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy, SSLOpts),
Templates = parse_template(Conf), Templates = parse_template(Conf),
DispatchStrategy = parse_dispatch_strategy(Strategy), DispatchStrategy = parse_dispatch_strategy(Strategy),
State = #{ State = #{
@ -407,9 +412,10 @@ make_producer_opts(
}, },
ACLInfo, ACLInfo,
Namespace, Namespace,
Strategy Strategy,
SSLOpts
) -> ) ->
#{ ProducerOpts = #{
tcp_opts => [{sndbuf, SendBuff}], tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval, ref_topic_route_interval => RefreshInterval,
acl_info => emqx_secret:wrap(ACLInfo), acl_info => emqx_secret:wrap(ACLInfo),
@ -419,7 +425,8 @@ make_producer_opts(
roundrobin -> roundrobin; roundrobin -> roundrobin;
_ -> key_dispatch _ -> key_dispatch
end end
}. },
emqx_utils_maps:put_if(ProducerOpts, ssl_opts, SSLOpts, SSLOpts =/= []).
acl_info(<<>>, _, _) -> acl_info(<<>>, _, _) ->
#{}; #{};

View File

@ -116,9 +116,17 @@ common_init(ConfigT) ->
_ = emqx_bridge_enterprise:module_info(), _ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
{Name, RocketMQConf} = rocketmq_config(BridgeType, Config0), {Name, RocketMQConf} = rocketmq_config(BridgeType, Config0),
RocketMQSSLConf = RocketMQConf#{
<<"servers">> => <<"rocketmq_namesrv_ssl:9876">>,
<<"ssl">> => #{
<<"enable">> => true,
<<"verify">> => verify_none
}
},
Config = Config =
[ [
{rocketmq_config, RocketMQConf}, {rocketmq_config, RocketMQConf},
{rocketmq_config_ssl, RocketMQSSLConf},
{rocketmq_bridge_type, BridgeType}, {rocketmq_bridge_type, BridgeType},
{rocketmq_name, Name}, {rocketmq_name, Name},
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
@ -180,6 +188,28 @@ create_bridge(Config) ->
RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), RocketMQConf = ?GET_CONFIG(rocketmq_config, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf). emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config),
emqx_bridge:create(BridgeType, Name, RocketMQConf).
create_bridge_ssl_bad_ssl_opts(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf0 = ?GET_CONFIG(rocketmq_config_ssl, Config),
%% This config is wrong because we use verify_peer without
%% a cert that can be used in the verification.
RocketMQConf1 = maps:put(
<<"ssl">>,
#{
<<"enable">> => true,
<<"verify">> => verify_peer
},
RocketMQConf0
),
emqx_bridge:create(BridgeType, Name, RocketMQConf1).
delete_bridge(Config) -> delete_bridge(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config), Name = ?GET_CONFIG(rocketmq_name, Config),
@ -233,6 +263,44 @@ t_setup_via_config_and_publish(Config) ->
), ),
ok. ok.
t_setup_via_config_and_publish_ssl(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl(Config)
),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
ok.
%% Check that we can not connect to the SSL only RocketMQ instance
%% with incorrect SSL options
t_setup_via_config_ssl_host_bad_ssl_opts(Config) ->
?assertMatch(
{ok, _},
create_bridge_ssl_bad_ssl_opts(Config)
),
Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := disconnected}, emqx_bridge_v2:health_check(BridgeType, Name)),
ok.
t_setup_via_http_api_and_publish(Config) -> t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config), Name = ?GET_CONFIG(rocketmq_name, Config),

View File

@ -0,0 +1 @@
The RocketMQ connector has got support for configuring SSL settings.

View File

@ -200,7 +200,8 @@ for dep in ${CT_DEPS}; do
FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' )
;; ;;
rocketmq) rocketmq)
FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml'
'.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml' )
;; ;;
cassandra) cassandra)
FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' )