diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 951fb5ef5..d74ff40a1 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -548,6 +548,8 @@ fields(consumer_kafka_opts) -> #{default => <<"5s">>, desc => ?DESC(consumer_offset_commit_interval_seconds)} )} ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields(resource_opts) -> SupportedFields = [health_check_interval], CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(), @@ -568,6 +570,8 @@ desc("config_connector") -> ?DESC("desc_config"); desc(resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer"; Type =:= "connector"; Type =:= "bridge_v2" -> @@ -626,7 +630,7 @@ kafka_connector_config_fields() -> })}, {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}, {ssl, mk(ref(ssl_client_opts), #{})} - ] ++ [resource_opts()]. + ] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts). producer_opts(ActionOrBridgeV1) -> [