Merge pull request #10461 from lafirest/fix/dynamo_db_svrs

fix(rocketmq): allow setting multiple addresses in RocketMQ bridge
This commit is contained in:
lafirest 2023-04-21 14:37:37 +08:00 committed by GitHub
commit 48402d0476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 7 deletions

View File

@ -136,7 +136,7 @@ rocketmq_config(BridgeType, Config) ->
io_lib:format(
"bridges.~s.~s {\n"
" enable = true\n"
" server = ~p\n"
" servers = ~p\n"
" topic = ~p\n"
" resource_opts = {\n"
" request_timeout = 1500ms\n"

View File

@ -38,7 +38,7 @@ roots() ->
fields(config) ->
[
{server, server()},
{servers, servers()},
{topic,
mk(
binary(),
@ -75,7 +75,7 @@ add_default_fn(OrigFn, Default) ->
(Field) -> OrigFn(Field)
end.
server() ->
servers() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
@ -97,7 +97,7 @@ is_buffer_supported() -> false.
on_start(
InstanceId,
#{server := Server, topic := Topic} = Config1
#{servers := BinServers, topic := Topic} = Config1
) ->
?SLOG(info, #{
msg => "starting_rocketmq_connector",
@ -105,9 +105,8 @@ on_start(
config => redact(Config1)
}),
Config = maps:merge(default_security_info(), Config1),
{Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS),
Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS),
Server1 = [{Host, Port}],
ClientId = client_id(InstanceId),
ClientCfg = #{acl_info => #{}},
@ -124,7 +123,7 @@ on_start(
producers_opts => ProducerOpts
},
case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
{ok, _Pid} ->
{ok, State};
{error, _Reason} = Error ->