fix(rocketmq): allow setting multiple addresses in RocketMQ bridge

This commit is contained in:
firest 2023-04-20 17:58:12 +08:00
parent dfb089e6d5
commit 6e12abff39
1 changed files with 5 additions and 6 deletions

View File

@ -38,7 +38,7 @@ roots() ->
fields(config) ->
[
{server, server()},
{servers, servers()},
{topic,
mk(
binary(),
@ -75,7 +75,7 @@ add_default_fn(OrigFn, Default) ->
(Field) -> OrigFn(Field)
end.
server() ->
servers() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
@ -97,7 +97,7 @@ is_buffer_supported() -> false.
on_start(
InstanceId,
#{server := Server, topic := Topic} = Config1
#{servers := BinServers, topic := Topic} = Config1
) ->
?SLOG(info, #{
msg => "starting_rocketmq_connector",
@ -105,9 +105,8 @@ on_start(
config => redact(Config1)
}),
Config = maps:merge(default_security_info(), Config1),
{Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS),
Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS),
Server1 = [{Host, Port}],
ClientId = client_id(InstanceId),
ClientCfg = #{acl_info => #{}},
@ -124,7 +123,7 @@ on_start(
producers_opts => ProducerOpts
},
case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
{ok, _Pid} ->
{ok, State};
{error, _Reason} = Error ->