From 903a77b47188ce2e8c3db22982aace6c1d2233d0 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 25 Jan 2023 15:33:05 +0300 Subject: [PATCH 1/3] test(redis): ensure batch query hit different cluster shards This will inevitably fail: it's not generally possible to update different keys through the same cluster connection, one or more update will fail with `MOVED` status. This testcase should serve as a regression test later. --- .../test/emqx_ee_bridge_redis_SUITE.erl | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 2b67787b2..8435204fd 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -16,6 +16,9 @@ %% CT boilerplate %%------------------------------------------------------------------------------ +-define(KEYSHARDS, 3). +-define(KEYPREFIX, "MSGS"). + -define(REDIS_TOXYPROXY_CONNECT_CONFIG, #{ <<"server">> => <<"toxiproxy:6379">>, <<"redis_type">> => <<"single">> @@ -23,7 +26,7 @@ -define(COMMON_REDIS_OPTS, #{ <<"password">> => <<"public">>, - <<"command_template">> => [<<"RPUSH">>, <<"MSGS">>, <<"${payload}">>], + <<"command_template">> => [<<"RPUSH">>, <>, <<"${payload}">>], <<"local_topic">> => <<"local_topic/#">> }). @@ -47,7 +50,7 @@ ) ). -all() -> [{group, transport_types}, {group, rest}]. +all() -> [{group, transports}, {group, rest}]. groups() -> ResourceSpecificTCs = [t_create_delete_bridge], @@ -63,7 +66,7 @@ groups() -> ], [ {rest, TCs}, - {transport_types, [ + {transports, [ {group, tcp}, {group, tls} ]}, @@ -79,7 +82,7 @@ groups() -> init_per_group(Group, Config) when Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster -> - [{transport_type, Group} | Config]; + [{connector_type, Group} | Config]; init_per_group(Group, Config) when Group =:= tcp; Group =:= tls -> @@ -139,7 +142,7 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> ok = delete_all_rules(), ok = delete_all_bridges(), - case ?config(transport_type, Config) of + case ?config(connector_type, Config) of undefined -> Config; RedisType -> @@ -162,7 +165,7 @@ end_per_testcase(_Testcase, Config) -> t_create_delete_bridge(Config) -> Name = <<"mybridge">>, - Type = ?config(transport_type, Config), + Type = ?config(connector_type, Config), BridgeConfig = ?config(bridge_config, Config), IsBatch = ?config(is_batch, Config), ?assertMatch( @@ -350,9 +353,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> ?wait_async_action( lists:foreach( fun(I) -> - IBin = integer_to_binary(I), - Topic = <>, - _ = publish_message(Topic, RandomPayload) + _ = publish_message(format_topic(BaseTopic, I), RandomPayload) end, lists:seq(1, N) ), @@ -360,7 +361,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> 5000 ), fun(Trace) -> - AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)), + AddedMsgCount = length(added_msgs(ResourceId, BaseTopic, RandomPayload)), case IsBatch of true -> ?assertMatch( @@ -378,11 +379,23 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) -> end ). -added_msgs(ResourceId, Payload) -> - {ok, Results} = emqx_resource:simple_sync_query( - ResourceId, {cmd, [<<"LRANGE">>, <<"MSGS">>, <<"0">>, <<"-1">>]} - ), - [El || El <- Results, El =:= Payload]. +added_msgs(ResourceId, BaseTopic, Payload) -> + lists:flatmap( + fun(K) -> + {ok, Results} = emqx_resource:simple_sync_query( + ResourceId, + {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]} + ), + [El || El <- Results, El =:= Payload] + end, + [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)] + ). + +format_topic(Base, I) -> + iolist_to_binary(io_lib:format("~s/~2..0B", [Base, I rem ?KEYSHARDS])). + +format_redis_key(Base, I) -> + iolist_to_binary([?KEYPREFIX, "/", format_topic(Base, I)]). conf_schema(StructName) -> #{ From 26fcaecad78c52637c06b445d82b2aee63033c88 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 25 Jan 2023 15:41:52 +0300 Subject: [PATCH 2/3] fix(redis): disable batching in `redis_cluster` bridges Through configuration subsystem. --- .../src/emqx_ee_bridge_redis.erl | 56 +++++++++++++++---- .../test/emqx_ee_bridge_redis_SUITE.erl | 11 ++-- 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 3a3963786..6b7239a76 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -7,7 +7,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). -export([ conn_bridge_examples/1 @@ -80,13 +80,20 @@ values(common, RedisType, SpecificOpts) -> pool_size => 8, password => <<"secret">>, command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], - resource_opts => #{ + resource_opts => values(resource_opts, RedisType, #{}), + ssl => #{enable => false} + }, + maps:merge(Config, SpecificOpts); +values(resource_opts, "cluster", SpecificOpts) -> + SpecificOpts; +values(resource_opts, _RedisType, SpecificOpts) -> + maps:merge( + #{ batch_size => 1, batch_time => <<"20ms">> }, - ssl => #{enable => false} - }, - maps:merge(Config, SpecificOpts). + SpecificOpts + ). %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -115,29 +122,31 @@ fields("get_cluster") -> fields(Type) when Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster -> - redis_bridge_common_fields() ++ - connector_fields(Type). + redis_bridge_common_fields(Type) ++ + connector_fields(Type); +fields("creation_opts_" ++ Type) -> + resource_creation_fields(Type). method_fileds(post, ConnectorType) -> - redis_bridge_common_fields() ++ + redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType); method_fileds(get, ConnectorType) -> - redis_bridge_common_fields() ++ + redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> - redis_bridge_common_fields() ++ + redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType). -redis_bridge_common_fields() -> +redis_bridge_common_fields(Type) -> emqx_bridge_schema:common_bridge_fields() ++ [ {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {command_template, fun command_template/1} ] ++ - emqx_resource_schema:fields("resource_opts"). + resource_fields(Type). connector_fields(Type) -> RedisType = bridge_type_to_redis_conn_type(Type), @@ -156,6 +165,27 @@ type_name_fields(Type) -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} ]. +resource_fields(Type) -> + [ + {resource_opts, + mk( + ref("creation_opts_" ++ atom_to_list(Type)), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ]. + +resource_creation_fields("redis_cluster") -> + % TODO + % Cluster bridge is currently incompatible with batching. + Fields = emqx_resource_schema:fields("creation_opts"), + lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]); +resource_creation_fields(_) -> + emqx_resource_schema:fields("creation_opts"). + desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> @@ -166,6 +196,8 @@ desc(redis_sentinel) -> ?DESC(emqx_connector_redis, "sentinel"); desc(redis_cluster) -> ?DESC(emqx_connector_redis, "cluster"); +desc("creation_opts_" ++ _Type) -> + ?DESC(emqx_resource_schema, "creation_opts"); desc(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 8435204fd..31c75ede4 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -142,12 +142,13 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> ok = delete_all_rules(), ok = delete_all_bridges(), - case ?config(connector_type, Config) of - undefined -> + case {?config(connector_type, Config), ?config(batch_mode, Config)} of + {undefined, _} -> Config; - RedisType -> + {redis_cluster, batch_on} -> + {skip, "Batching is not supported by 'redis_cluster' bridge type"}; + {RedisType, BatchMode} -> Transport = ?config(transport, Config), - BatchMode = ?config(batch_mode, Config), #{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(), #{BatchMode := ResourceConfig} = resource_configs(), IsBatch = (BatchMode =:= batch_on), @@ -522,7 +523,6 @@ invalid_command_bridge_config() -> Conf1#{ <<"resource_opts">> => #{ <<"query_mode">> => <<"sync">>, - <<"batch_size">> => <<"1">>, <<"worker_pool_size">> => <<"1">>, <<"start_timeout">> => <<"15s">> }, @@ -533,7 +533,6 @@ resource_configs() -> #{ batch_off => #{ <<"query_mode">> => <<"sync">>, - <<"batch_size">> => <<"1">>, <<"start_timeout">> => <<"15s">> }, batch_on => #{ From 2ee00b75a70bf957ad708ae71aeadfb61379fd12 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 25 Jan 2023 16:49:55 +0300 Subject: [PATCH 3/3] fix(redis): unwrap pipeline queries against redis cluster This is an additional safety measure in addition to the disabled batching on the bridge level. --- apps/emqx_connector/src/emqx_connector_redis.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 4bb46bca3..286f7dea6 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -222,6 +222,8 @@ is_unrecoverable_error(Results) when is_list(Results) -> lists:any(fun is_unrecoverable_error/1, Results); is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) -> true; +is_unrecoverable_error({error, invalid_cluster_command}) -> + true; is_unrecoverable_error(_) -> false. @@ -267,7 +269,9 @@ do_cmd(PoolName, cluster, {cmd, Command}) -> do_cmd(Conn, _Type, {cmd, Command}) -> eredis:q(Conn, Command); do_cmd(PoolName, cluster, {cmds, Commands}) -> - wrap_qp_result(eredis_cluster:qp(PoolName, Commands)); + % TODO + % Cluster mode is currently incompatible with batching. + wrap_qp_result([eredis_cluster:q(PoolName, Command) || Command <- Commands]); do_cmd(Conn, _Type, {cmds, Commands}) -> wrap_qp_result(eredis:qp(Conn, Commands)).