diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 3a3963786..6b7239a76 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -7,7 +7,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). -export([ conn_bridge_examples/1 @@ -80,13 +80,20 @@ values(common, RedisType, SpecificOpts) -> pool_size => 8, password => <<"secret">>, command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], - resource_opts => #{ + resource_opts => values(resource_opts, RedisType, #{}), + ssl => #{enable => false} + }, + maps:merge(Config, SpecificOpts); +values(resource_opts, "cluster", SpecificOpts) -> + SpecificOpts; +values(resource_opts, _RedisType, SpecificOpts) -> + maps:merge( + #{ batch_size => 1, batch_time => <<"20ms">> }, - ssl => #{enable => false} - }, - maps:merge(Config, SpecificOpts). + SpecificOpts + ). %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -115,29 +122,31 @@ fields("get_cluster") -> fields(Type) when Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster -> - redis_bridge_common_fields() ++ - connector_fields(Type). + redis_bridge_common_fields(Type) ++ + connector_fields(Type); +fields("creation_opts_" ++ Type) -> + resource_creation_fields(Type). method_fileds(post, ConnectorType) -> - redis_bridge_common_fields() ++ + redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType); method_fileds(get, ConnectorType) -> - redis_bridge_common_fields() ++ + redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> - redis_bridge_common_fields() ++ + redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType). -redis_bridge_common_fields() -> +redis_bridge_common_fields(Type) -> emqx_bridge_schema:common_bridge_fields() ++ [ {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {command_template, fun command_template/1} ] ++ - emqx_resource_schema:fields("resource_opts"). + resource_fields(Type). connector_fields(Type) -> RedisType = bridge_type_to_redis_conn_type(Type), @@ -156,6 +165,27 @@ type_name_fields(Type) -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} ]. +resource_fields(Type) -> + [ + {resource_opts, + mk( + ref("creation_opts_" ++ atom_to_list(Type)), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ]. + +resource_creation_fields("redis_cluster") -> + % TODO + % Cluster bridge is currently incompatible with batching. + Fields = emqx_resource_schema:fields("creation_opts"), + lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]); +resource_creation_fields(_) -> + emqx_resource_schema:fields("creation_opts"). + desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> @@ -166,6 +196,8 @@ desc(redis_sentinel) -> ?DESC(emqx_connector_redis, "sentinel"); desc(redis_cluster) -> ?DESC(emqx_connector_redis, "cluster"); +desc("creation_opts_" ++ _Type) -> + ?DESC(emqx_resource_schema, "creation_opts"); desc(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 8435204fd..31c75ede4 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -142,12 +142,13 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> ok = delete_all_rules(), ok = delete_all_bridges(), - case ?config(connector_type, Config) of - undefined -> + case {?config(connector_type, Config), ?config(batch_mode, Config)} of + {undefined, _} -> Config; - RedisType -> + {redis_cluster, batch_on} -> + {skip, "Batching is not supported by 'redis_cluster' bridge type"}; + {RedisType, BatchMode} -> Transport = ?config(transport, Config), - BatchMode = ?config(batch_mode, Config), #{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(), #{BatchMode := ResourceConfig} = resource_configs(), IsBatch = (BatchMode =:= batch_on), @@ -522,7 +523,6 @@ invalid_command_bridge_config() -> Conf1#{ <<"resource_opts">> => #{ <<"query_mode">> => <<"sync">>, - <<"batch_size">> => <<"1">>, <<"worker_pool_size">> => <<"1">>, <<"start_timeout">> => <<"15s">> }, @@ -533,7 +533,6 @@ resource_configs() -> #{ batch_off => #{ <<"query_mode">> => <<"sync">>, - <<"batch_size">> => <<"1">>, <<"start_timeout">> => <<"15s">> }, batch_on => #{