diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl
index 285fa98b4..29f8ef84d 100644
--- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl
+++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl
@@ -44,6 +44,11 @@ fields(config) ->
binary(),
#{default => <<"TopicTest">>, desc => ?DESC(topic)}
)},
+ {sync_timeout,
+ mk(
+ emqx_schema:duration(),
+ #{default => <<"3s">>, desc => ?DESC(sync_timeout)}
+ )},
{refresh_interval,
mk(
emqx_schema:duration(),
@@ -76,7 +81,7 @@ add_default_fn(OrigFn, Default) ->
end.
servers() ->
- Meta = #{desc => ?DESC("server")},
+ Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
relational_fields() ->
@@ -97,7 +102,7 @@ is_buffer_supported() -> false.
on_start(
InstanceId,
- #{servers := BinServers, topic := Topic} = Config1
+ #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1
) ->
?SLOG(info, #{
msg => "starting_rocketmq_connector",
@@ -116,8 +121,9 @@ on_start(
ProducersMapPID = create_producers_map(ClientId),
State = #{
client_id => ClientId,
+ topic => Topic,
topic_tokens => TopicTks,
- config => Config,
+ sync_timeout => SyncTimeout,
templates => Templates,
producers_map_pid => ProducersMapPID,
producers_opts => ProducerOpts
@@ -173,9 +179,10 @@ do_query(
#{
templates := Templates,
client_id := ClientId,
+ topic := RawTopic,
topic_tokens := TopicTks,
producers_opts := ProducerOpts,
- config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}}
+ sync_timeout := RequestTimeout
} = State
) ->
?TRACE(
diff --git a/rel/i18n/emqx_ee_connector_rocketmq.hocon b/rel/i18n/emqx_ee_connector_rocketmq.hocon
index 44dda7931..7f786898e 100644
--- a/rel/i18n/emqx_ee_connector_rocketmq.hocon
+++ b/rel/i18n/emqx_ee_connector_rocketmq.hocon
@@ -1,6 +1,6 @@
emqx_ee_connector_rocketmq {
- server {
+ servers {
desc {
en: """The IPv4 or IPv6 address or the hostname to connect to.
A host entry has the following form: `Host[:Port]`.
@@ -26,6 +26,17 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified."""
}
}
+ sync_timeout {
+ desc {
+ en: """Timeout of RocketMQ driver synchronous call."""
+ zh: """RocketMQ 驱动同步调用的超时时间。"""
+ }
+ label: {
+ en: "Sync Timeout"
+ zh: "同步调用超时时间"
+ }
+ }
+
refresh_interval {
desc {
en: """RocketMQ Topic Route Refresh Interval."""