Merge pull request #10468 from lafirest/fix/rocketmq_sync_timeout

fix(rocketmq): expose the driver parameter `sync_timeout` into the RocketMQ bridge configuration
This commit is contained in:
lafirest 2023-04-21 19:52:35 +08:00 committed by GitHub
commit a664b74281
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 5 deletions

View File

@ -44,6 +44,11 @@ fields(config) ->
binary(), binary(),
#{default => <<"TopicTest">>, desc => ?DESC(topic)} #{default => <<"TopicTest">>, desc => ?DESC(topic)}
)}, )},
{sync_timeout,
mk(
emqx_schema:duration(),
#{default => <<"3s">>, desc => ?DESC(sync_timeout)}
)},
{refresh_interval, {refresh_interval,
mk( mk(
emqx_schema:duration(), emqx_schema:duration(),
@ -76,7 +81,7 @@ add_default_fn(OrigFn, Default) ->
end. end.
servers() -> servers() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
relational_fields() -> relational_fields() ->
@ -97,7 +102,7 @@ is_buffer_supported() -> false.
on_start( on_start(
InstanceId, InstanceId,
#{servers := BinServers, topic := Topic} = Config1 #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_rocketmq_connector", msg => "starting_rocketmq_connector",
@ -116,8 +121,9 @@ on_start(
ProducersMapPID = create_producers_map(ClientId), ProducersMapPID = create_producers_map(ClientId),
State = #{ State = #{
client_id => ClientId, client_id => ClientId,
topic => Topic,
topic_tokens => TopicTks, topic_tokens => TopicTks,
config => Config, sync_timeout => SyncTimeout,
templates => Templates, templates => Templates,
producers_map_pid => ProducersMapPID, producers_map_pid => ProducersMapPID,
producers_opts => ProducerOpts producers_opts => ProducerOpts
@ -173,9 +179,10 @@ do_query(
#{ #{
templates := Templates, templates := Templates,
client_id := ClientId, client_id := ClientId,
topic := RawTopic,
topic_tokens := TopicTks, topic_tokens := TopicTks,
producers_opts := ProducerOpts, producers_opts := ProducerOpts,
config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}} sync_timeout := RequestTimeout
} = State } = State
) -> ) ->
?TRACE( ?TRACE(

View File

@ -1,6 +1,6 @@
emqx_ee_connector_rocketmq { emqx_ee_connector_rocketmq {
server { servers {
desc { desc {
en: """The IPv4 or IPv6 address or the hostname to connect to.<br/> en: """The IPv4 or IPv6 address or the hostname to connect to.<br/>
A host entry has the following form: `Host[:Port]`.<br/> A host entry has the following form: `Host[:Port]`.<br/>
@ -26,6 +26,17 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified."""
} }
} }
sync_timeout {
desc {
en: """Timeout of RocketMQ driver synchronous call."""
zh: """RocketMQ 驱动同步调用的超时时间。"""
}
label: {
en: "Sync Timeout"
zh: "同步调用超时时间"
}
}
refresh_interval { refresh_interval {
desc { desc {
en: """RocketMQ Topic Route Refresh Interval.""" en: """RocketMQ Topic Route Refresh Interval."""