Merge pull request #10921 from lafirest/test/rocketmq_acl

test(rocketmq): add an ACL test case
This commit is contained in:
lafirest 2023-06-02 15:37:46 +08:00 committed by GitHub
commit 0f8824b40f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 3 deletions

View File

@ -23,6 +23,7 @@ services:
- ./rocketmq/logs:/opt/logs
- ./rocketmq/store:/opt/store
- ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
- ./rocketmq/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml
environment:
NAMESRV_ADDR: "rocketmq_namesrv:9876"
JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99"

View File

@ -20,3 +20,5 @@ maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
aclEnable=true

View File

@ -0,0 +1,11 @@
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: PUB|SUB
topicPerms:
- TopicTest=PUB|SUB

View File

@ -13,6 +13,9 @@
% Bridge defaults
-define(TOPIC, "TopicTest").
-define(DENY_TOPIC, "DENY_TOPIC").
-define(ACCESS_KEY, "RocketMQ").
-define(SECRET_KEY, "12345678").
-define(BATCH_SIZE, 10).
-define(PAYLOAD, <<"HELLO">>).
@ -25,17 +28,19 @@
all() ->
[
{group, async},
{group, sync}
{group, sync},
{group, acl}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
TCs = emqx_common_test_helpers:all(?MODULE) -- [t_acl_deny],
BatchingGroups = [{group, with_batch}, {group, without_batch}],
[
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs},
{without_batch, TCs}
{without_batch, TCs},
{acl, [t_acl_deny]}
].
init_per_group(async, Config) ->
@ -48,6 +53,9 @@ init_per_group(with_batch, Config0) ->
init_per_group(without_batch, Config0) ->
Config = [{batch_size, 1} | Config0],
common_init(Config);
init_per_group(acl, Config0) ->
Config = [{batch_size, 1}, {query_mode, sync} | Config0],
common_init(Config);
init_per_group(_Group, Config) ->
Config.
@ -137,6 +145,8 @@ rocketmq_config(BridgeType, Config) ->
"bridges.~s.~s {\n"
" enable = true\n"
" servers = ~p\n"
" access_key = ~p\n"
" secret_key = ~p\n"
" topic = ~p\n"
" resource_opts = {\n"
" request_timeout = 1500ms\n"
@ -148,6 +158,8 @@ rocketmq_config(BridgeType, Config) ->
BridgeType,
Name,
Server,
?ACCESS_KEY,
?SECRET_KEY,
?TOPIC,
BatchSize,
QueryMode
@ -271,3 +283,29 @@ t_simple_query(Config) ->
Result = query_resource(Config, Request),
?assertEqual(ok, Result),
ok.
t_acl_deny(Config0) ->
RocketCfg = ?GET_CONFIG(rocketmq_config, Config0),
RocketCfg2 = RocketCfg#{<<"topic">> := ?DENY_TOPIC},
Config = lists:keyreplace(rocketmq_config, 1, Config0, {rocketmq_config, RocketCfg2}),
?assertMatch(
{ok, _},
create_bridge(Config)
),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertMatch({error, #{<<"code">> := 1}}, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{error := #{<<"code">> := 1}}], Trace),
ok
end
),
ok.