From 7704995279935c99f74bbbbfa2e66e5d47df9262 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 21 Apr 2023 15:10:25 +0800 Subject: [PATCH] fix(rocketmq): expose the driver parameter `sync_timeout` into the RocketMQ bridge configuration --- .../src/emqx_ee_connector_rocketmq.erl | 15 +++++++++++---- rel/i18n/emqx_ee_connector_rocketmq.hocon | 13 ++++++++++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 285fa98b4..29f8ef84d 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -44,6 +44,11 @@ fields(config) -> binary(), #{default => <<"TopicTest">>, desc => ?DESC(topic)} )}, + {sync_timeout, + mk( + emqx_schema:duration(), + #{default => <<"3s">>, desc => ?DESC(sync_timeout)} + )}, {refresh_interval, mk( emqx_schema:duration(), @@ -76,7 +81,7 @@ add_default_fn(OrigFn, Default) -> end. servers() -> - Meta = #{desc => ?DESC("server")}, + Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). relational_fields() -> @@ -97,7 +102,7 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{servers := BinServers, topic := Topic} = Config1 + #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1 ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", @@ -116,8 +121,9 @@ on_start( ProducersMapPID = create_producers_map(ClientId), State = #{ client_id => ClientId, + topic => Topic, topic_tokens => TopicTks, - config => Config, + sync_timeout => SyncTimeout, templates => Templates, producers_map_pid => ProducersMapPID, producers_opts => ProducerOpts @@ -173,9 +179,10 @@ do_query( #{ templates := Templates, client_id := ClientId, + topic := RawTopic, topic_tokens := TopicTks, producers_opts := ProducerOpts, - config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}} + sync_timeout := RequestTimeout } = State ) -> ?TRACE( diff --git a/rel/i18n/emqx_ee_connector_rocketmq.hocon b/rel/i18n/emqx_ee_connector_rocketmq.hocon index 44dda7931..7f786898e 100644 --- a/rel/i18n/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/emqx_ee_connector_rocketmq.hocon @@ -1,6 +1,6 @@ emqx_ee_connector_rocketmq { - server { + servers { desc { en: """The IPv4 or IPv6 address or the hostname to connect to.
A host entry has the following form: `Host[:Port]`.
@@ -26,6 +26,17 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified.""" } } + sync_timeout { + desc { + en: """Timeout of RocketMQ driver synchronous call.""" + zh: """RocketMQ 驱动同步调用的超时时间。""" + } + label: { + en: "Sync Timeout" + zh: "同步调用超时时间" + } + } + refresh_interval { desc { en: """RocketMQ Topic Route Refresh Interval."""