From 6e12abff39189f4cb1eb82a7ac32b1620a64dda4 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 20 Apr 2023 17:58:12 +0800 Subject: [PATCH] fix(rocketmq): allow setting multiple addresses in RocketMQ bridge --- .../src/emqx_ee_connector_rocketmq.erl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 205359bb8..389e1e366 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -38,7 +38,7 @@ roots() -> fields(config) -> [ - {server, server()}, + {servers, servers()}, {topic, mk( binary(), @@ -75,7 +75,7 @@ add_default_fn(OrigFn, Default) -> (Field) -> OrigFn(Field) end. -server() -> +servers() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). @@ -97,7 +97,7 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{server := Server, topic := Topic} = Config1 + #{servers := BinServers, topic := Topic} = Config1 ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", @@ -105,9 +105,8 @@ on_start( config => redact(Config1) }), Config = maps:merge(default_security_info(), Config1), - {Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS), - Server1 = [{Host, Port}], ClientId = client_id(InstanceId), ClientCfg = #{acl_info => #{}}, @@ -124,7 +123,7 @@ on_start( producers_opts => ProducerOpts }, - case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of + case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of {ok, _Pid} -> {ok, State}; {error, _Reason} = Error ->