From c39c544c968e30e986217e7cf2615b73af9badf3 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 17 Jun 2024 14:17:43 +0200 Subject: [PATCH] feat: add SSL support to RocketMQ connector --- .../docker-compose-rocketmq-ssl.yaml | 41 ++++++++++++ .../rocketmq/conf_ssl/broker.conf | 24 +++++++ .../rocketmq/conf_ssl/plain_acl.yml | 12 ++++ .../rocketmq/logs_ssl/.gitkeep | 0 .../rocketmq/store_ssl/.gitkeep | 0 apps/emqx_bridge_rocketmq/docker-ct | 1 + apps/emqx_bridge_rocketmq/rebar.config | 2 +- .../src/emqx_bridge_rocketmq.app.src | 2 +- .../src/emqx_bridge_rocketmq.erl | 1 + .../src/emqx_bridge_rocketmq_connector.erl | 42 ++++++++---- .../test/emqx_bridge_rocketmq_SUITE.erl | 66 +++++++++++++++++++ scripts/ct/run.sh | 3 + 12 files changed, 180 insertions(+), 14 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml create mode 100644 .ci/docker-compose-file/rocketmq/conf_ssl/broker.conf create mode 100644 .ci/docker-compose-file/rocketmq/conf_ssl/plain_acl.yml create mode 100644 .ci/docker-compose-file/rocketmq/logs_ssl/.gitkeep create mode 100644 .ci/docker-compose-file/rocketmq/store_ssl/.gitkeep diff --git a/.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml b/.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml new file mode 100644 index 000000000..8321a05a3 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml @@ -0,0 +1,41 @@ +version: '3.9' + +services: + mqnamesrvssl: + image: apache/rocketmq:4.9.4 + container_name: rocketmq_namesrv_ssl +# ports: +# - 9876:9876 + volumes: + - ./rocketmq/logs_ssl:/opt/logs + - ./rocketmq/store_ssl:/opt/store + environment: + JAVA_OPT: "-Dtls.server.mode=enforcing" + command: ./mqnamesrv + networks: + - emqx_bridge + + mqbrokerssl: + image: apache/rocketmq:4.9.4 + container_name: rocketmq_broker_ssl +# ports: +# - 10909:10909 +# - 10911:10911 + volumes: + - ./rocketmq/logs_ssl:/opt/logs + - ./rocketmq/store_ssl:/opt/store + - ./rocketmq/conf_ssl/broker.conf:/etc/rocketmq/broker.conf + - ./rocketmq/conf_ssl/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml + environment: + NAMESRV_ADDR: "rocketmq_namesrv_ssl:9876" + JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99" + JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn512m -Dtls.server.mode=enforcing" + command: ./mqbroker -c /etc/rocketmq/broker.conf + depends_on: + - mqnamesrvssl + networks: + - emqx_bridge + +networks: + emqx_bridge: + driver: bridge diff --git a/.ci/docker-compose-file/rocketmq/conf_ssl/broker.conf b/.ci/docker-compose-file/rocketmq/conf_ssl/broker.conf new file mode 100644 index 000000000..3da6f655d --- /dev/null +++ b/.ci/docker-compose-file/rocketmq/conf_ssl/broker.conf @@ -0,0 +1,24 @@ +brokerClusterName=DefaultClusterSSL +brokerName=broker-a +brokerId=0 + +brokerIP1=rocketmq_broker_ssl + +defaultTopicQueueNums=4 +autoCreateTopicEnable=true +autoCreateSubscriptionGroup=true + +listenPort=10911 +deleteWhen=04 + +fileReservedTime=120 +mapedFileSizeCommitLog=1073741824 +mapedFileSizeConsumeQueue=300000 +diskMaxUsedSpaceRatio=100 +maxMessageSize=65536 + +brokerRole=ASYNC_MASTER + +flushDiskType=ASYNC_FLUSH + +aclEnable=true diff --git a/.ci/docker-compose-file/rocketmq/conf_ssl/plain_acl.yml b/.ci/docker-compose-file/rocketmq/conf_ssl/plain_acl.yml new file mode 100644 index 000000000..e78e47fe5 --- /dev/null +++ b/.ci/docker-compose-file/rocketmq/conf_ssl/plain_acl.yml @@ -0,0 +1,12 @@ +globalWhiteRemoteAddresses: + +accounts: + - accessKey: RocketMQ + secretKey: 12345678 + whiteRemoteAddress: + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: PUB|SUB + topicPerms: + - TopicTest=PUB|SUB + - Topic2=PUB|SUB diff --git a/.ci/docker-compose-file/rocketmq/logs_ssl/.gitkeep b/.ci/docker-compose-file/rocketmq/logs_ssl/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/.ci/docker-compose-file/rocketmq/store_ssl/.gitkeep b/.ci/docker-compose-file/rocketmq/store_ssl/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_rocketmq/docker-ct b/apps/emqx_bridge_rocketmq/docker-ct index 463a9eb66..a4cf08c48 100644 --- a/apps/emqx_bridge_rocketmq/docker-ct +++ b/apps/emqx_bridge_rocketmq/docker-ct @@ -1,2 +1,3 @@ toxiproxy rocketmq +rocketmq_ssl diff --git a/apps/emqx_bridge_rocketmq/rebar.config b/apps/emqx_bridge_rocketmq/rebar.config index be477d7a5..8e9f42730 100644 --- a/apps/emqx_bridge_rocketmq/rebar.config +++ b/apps/emqx_bridge_rocketmq/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ - {rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.3"}}}, + {rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.6.1"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index 561daae65..4b52f8dfc 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, [ diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl index f7e6d9b57..9fe0dd587 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl @@ -178,6 +178,7 @@ fields(action_parameters) -> Parameters, [ servers, + ssl, namespace, pool_size, auto_reconnect, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 5c62fd622..369307002 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -80,7 +80,7 @@ fields(config) -> {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} - ]. + ] ++ emqx_connector_schema_lib:ssl_fields(). servers() -> Meta = #{desc => ?DESC("servers")}, @@ -98,7 +98,8 @@ on_start( servers := BinServers, access_key := AccessKey, secret_key := SecretKey, - security_token := SecurityToken + security_token := SecurityToken, + ssl := SSLOptsMap } = Config ) -> ?SLOG(info, #{ @@ -113,13 +114,21 @@ on_start( ClientId = client_id(InstanceId), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), Namespace = maps:get(namespace, Config, <<>>), - ClientCfg = #{acl_info => ACLInfo, namespace => Namespace}, - + ClientCfg0 = #{acl_info => ACLInfo, namespace => Namespace}, + SSLOpts = emqx_tls_lib:to_client_opts(SSLOptsMap), + ClientCfg = + case SSLOpts of + [] -> + ClientCfg0; + SSLOpts -> + ClientCfg0#{ssl_opts => SSLOpts} + end, State = #{ client_id => ClientId, acl_info => ACLInfo, namespace => Namespace, - installed_channels => #{} + installed_channels => #{}, + ssl_opts => SSLOpts }, ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId), @@ -142,12 +151,13 @@ on_add_channel( #{ installed_channels := InstalledChannels, namespace := Namespace, - acl_info := ACLInfo + acl_info := ACLInfo, + ssl_opts := SSLOpts } = OldState, ChannelId, ChannelConfig ) -> - {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace), + {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace, SSLOpts), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, @@ -156,7 +166,8 @@ on_add_channel( create_channel_state( #{parameters := Conf} = _ChannelConfig, ACLInfo, - Namespace + Namespace, + SSLOpts ) -> #{ topic := Topic, @@ -164,7 +175,7 @@ create_channel_state( strategy := Strategy } = Conf, TopicTks = emqx_placeholder:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy), + ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy, SSLOpts), Templates = parse_template(Conf), DispatchStrategy = parse_dispatch_strategy(Strategy), State = #{ @@ -407,9 +418,10 @@ make_producer_opts( }, ACLInfo, Namespace, - Strategy + Strategy, + SSLOpts ) -> - #{ + ProducerOpts = #{ tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, acl_info => emqx_secret:wrap(ACLInfo), @@ -419,7 +431,13 @@ make_producer_opts( roundrobin -> roundrobin; _ -> key_dispatch end - }. + }, + case SSLOpts of + [] -> + ProducerOpts; + _ -> + ProducerOpts#{ssl_opts => SSLOpts} + end. acl_info(<<>>, _, _) -> #{}; diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index 7af6c7eea..2a7f78aee 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -116,9 +116,17 @@ common_init(ConfigT) -> _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), {Name, RocketMQConf} = rocketmq_config(BridgeType, Config0), + RocketMQSSLConf = RocketMQConf#{ + <<"servers">> => <<"rocketmq_namesrv_ssl:9876">>, + <<"ssl">> => #{ + <<"enable">> => true, + <<"verify">> => verify_none + } + }, Config = [ {rocketmq_config, RocketMQConf}, + {rocketmq_config_ssl, RocketMQSSLConf}, {rocketmq_bridge_type, BridgeType}, {rocketmq_name, Name}, {proxy_host, ProxyHost}, @@ -180,6 +188,26 @@ create_bridge(Config) -> RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), emqx_bridge:create(BridgeType, Name, RocketMQConf). +create_bridge_ssl(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config_ssl, Config), + emqx_bridge:create(BridgeType, Name, RocketMQConf). + +create_bridge_ssl_bad_ssl_opts(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf0 = ?GET_CONFIG(rocketmq_config_ssl, Config), + RocketMQConf1 = maps:put( + <<"ssl">>, + #{ + <<"enable">> => true, + <<"verify">> => verify_peer + }, + RocketMQConf0 + ), + emqx_bridge:create(BridgeType, Name, RocketMQConf1). + delete_bridge(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), @@ -233,6 +261,44 @@ t_setup_via_config_and_publish(Config) -> ), ok. +t_setup_via_config_and_publish_ssl(Config) -> + ?assertMatch( + {ok, _}, + create_bridge_ssl(Config) + ), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + +%% Check that we can not connect to the SSL only RocketMQ instance +%% with incorrect SSL options +t_setup_via_config_ssl_host_bad_ssl_opts(Config) -> + ?assertMatch( + {ok, _}, + create_bridge_ssl_bad_ssl_opts(Config) + ), + Name = ?GET_CONFIG(rocketmq_name, Config), + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)), + ?assertMatch(#{status := disconnected}, emqx_bridge_v2:health_check(BridgeType, Name)), + ok. + t_setup_via_http_api_and_publish(Config) -> BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), Name = ?GET_CONFIG(rocketmq_name, Config), diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index af04fd9ee..079c58035 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -202,6 +202,9 @@ for dep in ${CT_DEPS}; do rocketmq) FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' ) ;; + rocketmq_ssl) + FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq-ssl.yaml' ) + ;; cassandra) FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' ) ;;