diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 8e349f4aa..d274d4ba2 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise } } + query_mode_sync_only { + desc { + en: """Query mode. Only support 'sync'.""" + zh: """请求模式。目前只支持同步模式。""" + } + label { + en: """Query mode""" + zh: """请求模式""" + } + } + request_timeout { desc { en: """Timeout for requests. If query_mode is sync, calls to the resource will be blocked for this amount of time before timing out.""" diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 39513e28c..fdd65bc3c 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -30,16 +30,25 @@ namespace() -> "resource_schema". roots() -> []. +fields("resource_opts_sync_only") -> + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts_sync_only"), + resource_opts_meta() + )} + ]; +fields("creation_opts_sync_only") -> + Fields0 = fields("creation_opts"), + Fields1 = lists:keydelete(async_inflight_window, 1, Fields0), + QueryMod = {query_mode, fun query_mode_sync_only/1}, + lists:keyreplace(query_mode, 1, Fields1, QueryMod); fields("resource_opts") -> [ {resource_opts, mk( ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{}, - desc => ?DESC(<<"resource_opts">>) - } + resource_opts_meta() )} ]; fields("creation_opts") -> @@ -59,6 +68,13 @@ fields("creation_opts") -> {max_queue_bytes, fun max_queue_bytes/1} ]. +resource_opts_meta() -> + #{ + required => false, + default => #{}, + desc => ?DESC(<<"resource_opts">>) + }. + worker_pool_size(type) -> non_neg_integer(); worker_pool_size(desc) -> ?DESC("worker_pool_size"); worker_pool_size(default) -> ?WORKER_POOL_SIZE; @@ -95,6 +111,12 @@ query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. +query_mode_sync_only(type) -> enum([sync]); +query_mode_sync_only(desc) -> ?DESC("query_mode_sync_only"); +query_mode_sync_only(default) -> sync; +query_mode_sync_only(required) -> false; +query_mode_sync_only(_) -> undefined. + request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); request_timeout(desc) -> ?DESC("request_timeout"); request_timeout(default) -> <<"15s">>; @@ -139,4 +161,6 @@ max_queue_bytes(required) -> false; max_queue_bytes(_) -> undefined. desc("creation_opts") -> + ?DESC("creation_opts"); +desc("creation_opts_sync_only") -> ?DESC("creation_opts"). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl index b62871299..8312c081c 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -39,7 +39,7 @@ fields("config") -> {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})} - ] ++ fields("resource_opts"); + ] ++ emqx_resource_schema:fields("resource_opts_sync_only"); fields(mongodb_rs) -> emqx_connector_mongo:fields(rs) ++ fields("config"); fields(mongodb_sharded) -> @@ -69,32 +69,7 @@ fields("get_sharded") -> fields("get_single") -> emqx_bridge_schema:status_fields() ++ fields(mongodb_single) ++ - type_and_name_fields(mongodb_single); -fields("creation_opts") -> - lists:map( - fun - ({query_mode, _FieldSchema}) -> - {query_mode, - mk( - enum([sync, async]), - #{ - desc => ?DESC(emqx_resource_schema, "query_mode"), - default => sync - } - )}; - (Field) -> - Field - end, - emqx_resource_schema:fields("creation_opts") - ); -fields("resource_opts") -> - [ - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{default => #{}, desc => ?DESC(emqx_resource_schema, "resource_opts")} - )} - ]. + type_and_name_fields(mongodb_single). conn_bridge_examples(Method) -> [ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index fadf05848..fd4d9bdd9 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -98,8 +98,7 @@ fields("config") -> (emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); fields("creation_opts") -> - Opts = emqx_resource_schema:fields("creation_opts"), - [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; + emqx_resource_schema:fields("creation_opts_sync_only"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> @@ -118,10 +117,6 @@ desc(_) -> %% ------------------------------------------------------------------------------------------------- %% internal -is_hidden_opts(Field) -> - lists:member(Field, [ - async_inflight_window - ]). type_field() -> {type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index 8bf7b1969..b592197f9 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -100,8 +100,7 @@ fields("config") -> (emqx_connector_pgsql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); fields("creation_opts") -> - Opts = emqx_resource_schema:fields("creation_opts"), - [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; + emqx_resource_schema:fields("creation_opts_sync_only"); fields("post") -> fields("post", pgsql); fields("put") -> @@ -122,11 +121,6 @@ desc(_) -> undefined. %% ------------------------------------------------------------------------------------------------- -%% internal -is_hidden_opts(Field) -> - lists:member(Field, [ - async_inflight_window - ]). type_field(Type) -> {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 6b7239a76..18822ba11 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -181,10 +181,10 @@ resource_fields(Type) -> resource_creation_fields("redis_cluster") -> % TODO % Cluster bridge is currently incompatible with batching. - Fields = emqx_resource_schema:fields("creation_opts"), - lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]); + Fields = emqx_resource_schema:fields("creation_opts_sync_only"), + lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time, enable_batch]); resource_creation_fields(_) -> - emqx_resource_schema:fields("creation_opts"). + emqx_resource_schema:fields("creation_opts_sync_only"). desc("config") -> ?DESC("desc_config"); diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 1d4fa5da4..67a9b4a05 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -509,7 +509,7 @@ redis_connect_configs() -> toxiproxy_redis_bridge_config() -> Conf0 = ?REDIS_TOXYPROXY_CONNECT_CONFIG#{ <<"resource_opts">> => #{ - <<"query_mode">> => <<"async">>, + <<"query_mode">> => <<"sync">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"health_check_interval">> => <<"1s">>, @@ -537,7 +537,7 @@ resource_configs() -> <<"start_timeout">> => <<"15s">> }, batch_on => #{ - <<"query_mode">> => <<"async">>, + <<"query_mode">> => <<"sync">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"start_timeout">> => <<"15s">>