diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl index 0cb14e5c3..33a83d2d8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl @@ -136,7 +136,7 @@ rocketmq_config(BridgeType, Config) -> io_lib:format( "bridges.~s.~s {\n" " enable = true\n" - " server = ~p\n" + " servers = ~p\n" " topic = ~p\n" " resource_opts = {\n" " request_timeout = 1500ms\n" diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 67f2d2562..285fa98b4 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -38,7 +38,7 @@ roots() -> fields(config) -> [ - {server, server()}, + {servers, servers()}, {topic, mk( binary(), @@ -75,7 +75,7 @@ add_default_fn(OrigFn, Default) -> (Field) -> OrigFn(Field) end. -server() -> +servers() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). @@ -97,7 +97,7 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{server := Server, topic := Topic} = Config1 + #{servers := BinServers, topic := Topic} = Config1 ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", @@ -105,9 +105,8 @@ on_start( config => redact(Config1) }), Config = maps:merge(default_security_info(), Config1), - {Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS), - Server1 = [{Host, Port}], ClientId = client_id(InstanceId), ClientCfg = #{acl_info => #{}}, @@ -124,7 +123,7 @@ on_start( producers_opts => ProducerOpts }, - case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of + case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of {ok, _Pid} -> {ok, State}; {error, _Reason} = Error ->