From ea9b1e13d5b50d0ae5bc46b6b578d798bd7a0187 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 2 Jun 2023 11:19:06 +0800 Subject: [PATCH] test(rocketmq): add an ACL test case --- .../docker-compose-rocketmq.yaml | 1 + .../rocketmq/conf/broker.conf | 2 + .../rocketmq/conf/plain_acl.yml | 11 +++++ .../test/emqx_bridge_rocketmq_SUITE.erl | 44 +++++++++++++++++-- 4 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 .ci/docker-compose-file/rocketmq/conf/plain_acl.yml diff --git a/.ci/docker-compose-file/docker-compose-rocketmq.yaml b/.ci/docker-compose-file/docker-compose-rocketmq.yaml index 7e5a2e42e..7c019e931 100644 --- a/.ci/docker-compose-file/docker-compose-rocketmq.yaml +++ b/.ci/docker-compose-file/docker-compose-rocketmq.yaml @@ -23,6 +23,7 @@ services: - ./rocketmq/logs:/opt/logs - ./rocketmq/store:/opt/store - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf + - ./rocketmq/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml environment: NAMESRV_ADDR: "rocketmq_namesrv:9876" JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99" diff --git a/.ci/docker-compose-file/rocketmq/conf/broker.conf b/.ci/docker-compose-file/rocketmq/conf/broker.conf index c343090e4..fbd716e54 100644 --- a/.ci/docker-compose-file/rocketmq/conf/broker.conf +++ b/.ci/docker-compose-file/rocketmq/conf/broker.conf @@ -20,3 +20,5 @@ maxMessageSize=65536 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH + +aclEnable=true diff --git a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml new file mode 100644 index 000000000..e2c41a87f --- /dev/null +++ b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml @@ -0,0 +1,11 @@ +globalWhiteRemoteAddresses: + +accounts: + - accessKey: RocketMQ + secretKey: 12345678 + whiteRemoteAddress: + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: PUB|SUB + topicPerms: + - TopicTest=PUB|SUB diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index 90047e577..a80aee810 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -13,6 +13,9 @@ % Bridge defaults -define(TOPIC, "TopicTest"). +-define(DENY_TOPIC, "DENY_TOPIC"). +-define(ACCESS_KEY, "RocketMQ"). +-define(SECRET_KEY, "12345678"). -define(BATCH_SIZE, 10). -define(PAYLOAD, <<"HELLO">>). @@ -25,17 +28,19 @@ all() -> [ {group, async}, - {group, sync} + {group, sync}, + {group, acl} ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), + TCs = emqx_common_test_helpers:all(?MODULE) -- [t_acl_deny], BatchingGroups = [{group, with_batch}, {group, without_batch}], [ {async, BatchingGroups}, {sync, BatchingGroups}, {with_batch, TCs}, - {without_batch, TCs} + {without_batch, TCs}, + {acl, [t_acl_deny]} ]. init_per_group(async, Config) -> @@ -48,6 +53,9 @@ init_per_group(with_batch, Config0) -> init_per_group(without_batch, Config0) -> Config = [{batch_size, 1} | Config0], common_init(Config); +init_per_group(acl, Config0) -> + Config = [{batch_size, 1}, {query_mode, sync} | Config0], + common_init(Config); init_per_group(_Group, Config) -> Config. @@ -137,6 +145,8 @@ rocketmq_config(BridgeType, Config) -> "bridges.~s.~s {\n" " enable = true\n" " servers = ~p\n" + " access_key = ~p\n" + " secret_key = ~p\n" " topic = ~p\n" " resource_opts = {\n" " request_timeout = 1500ms\n" @@ -148,6 +158,8 @@ rocketmq_config(BridgeType, Config) -> BridgeType, Name, Server, + ?ACCESS_KEY, + ?SECRET_KEY, ?TOPIC, BatchSize, QueryMode @@ -271,3 +283,29 @@ t_simple_query(Config) -> Result = query_resource(Config, Request), ?assertEqual(ok, Result), ok. + +t_acl_deny(Config0) -> + RocketCfg = ?GET_CONFIG(rocketmq_config, Config0), + RocketCfg2 = RocketCfg#{<<"topic">> := ?DENY_TOPIC}, + Config = lists:keyreplace(rocketmq_config, 1, Config0, {rocketmq_config, RocketCfg2}), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertMatch({error, #{<<"code">> := 1}}, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{error := #{<<"code">> := 1}}], Trace), + ok + end + ), + ok.