diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 2b67787b2..8435204fd 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -16,6 +16,9 @@ %% CT boilerplate %%------------------------------------------------------------------------------ +-define(KEYSHARDS, 3). +-define(KEYPREFIX, "MSGS"). + -define(REDIS_TOXYPROXY_CONNECT_CONFIG, #{ <<"server">> => <<"toxiproxy:6379">>, <<"redis_type">> => <<"single">> @@ -23,7 +26,7 @@ -define(COMMON_REDIS_OPTS, #{ <<"password">> => <<"public">>, - <<"command_template">> => [<<"RPUSH">>, <<"MSGS">>, <<"${payload}">>], + <<"command_template">> => [<<"RPUSH">>, <>, <<"${payload}">>], <<"local_topic">> => <<"local_topic/#">> }). @@ -47,7 +50,7 @@ ) ). -all() -> [{group, transport_types}, {group, rest}]. +all() -> [{group, transports}, {group, rest}]. groups() -> ResourceSpecificTCs = [t_create_delete_bridge], @@ -63,7 +66,7 @@ groups() -> ], [ {rest, TCs}, - {transport_types, [ + {transports, [ {group, tcp}, {group, tls} ]}, @@ -79,7 +82,7 @@ groups() -> init_per_group(Group, Config) when Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster -> - [{transport_type, Group} | Config]; + [{connector_type, Group} | Config]; init_per_group(Group, Config) when Group =:= tcp; Group =:= tls -> @@ -139,7 +142,7 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> ok = delete_all_rules(), ok = delete_all_bridges(), - case ?config(transport_type, Config) of + case ?config(connector_type, Config) of undefined -> Config; RedisType -> @@ -162,7 +165,7 @@ end_per_testcase(_Testcase, Config) -> t_create_delete_bridge(Config) -> Name = <<"mybridge">>, - Type = ?config(transport_type, Config), + Type = ?config(connector_type, Config), BridgeConfig = ?config(bridge_config, Config), IsBatch = ?config(is_batch, Config), ?assertMatch( @@ -350,9 +353,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> ?wait_async_action( lists:foreach( fun(I) -> - IBin = integer_to_binary(I), - Topic = <>, - _ = publish_message(Topic, RandomPayload) + _ = publish_message(format_topic(BaseTopic, I), RandomPayload) end, lists:seq(1, N) ), @@ -360,7 +361,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> 5000 ), fun(Trace) -> - AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)), + AddedMsgCount = length(added_msgs(ResourceId, BaseTopic, RandomPayload)), case IsBatch of true -> ?assertMatch( @@ -378,11 +379,23 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> end ). -added_msgs(ResourceId, Payload) -> - {ok, Results} = emqx_resource:simple_sync_query( - ResourceId, {cmd, [<<"LRANGE">>, <<"MSGS">>, <<"0">>, <<"-1">>]} - ), - [El || El <- Results, El =:= Payload]. +added_msgs(ResourceId, BaseTopic, Payload) -> + lists:flatmap( + fun(K) -> + {ok, Results} = emqx_resource:simple_sync_query( + ResourceId, + {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]} + ), + [El || El <- Results, El =:= Payload] + end, + [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)] + ). + +format_topic(Base, I) -> + iolist_to_binary(io_lib:format("~s/~2..0B", [Base, I rem ?KEYSHARDS])). + +format_redis_key(Base, I) -> + iolist_to_binary([?KEYPREFIX, "/", format_topic(Base, I)]). conf_schema(StructName) -> #{