From bc1bdae55d137cbd47a3b67570e47d7e72345320 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 26 Apr 2023 11:27:31 +0800 Subject: [PATCH] chore: reslove confilt for sync release-50 to master --- .../src/emqx_ee_connector_dynamo.erl | 12 ++++++++---- .../src/emqx_ee_connector_rocketmq.erl | 6 ++++-- .../src/emqx_ee_connector_sqlserver.erl | 5 ++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index b6270b1b6..5eee882ce 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -80,8 +80,10 @@ on_start( config => redact(Config) }), - {Schema, Server} = get_host_schema(to_str(Url)), - #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), + {Schema, Server, DefaultPort} = get_host_info(to_str(Url)), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{ + default_port => DefaultPort + }), Options = [ {config, #{ @@ -142,8 +144,10 @@ on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, Stat on_batch_query_async(_InstanceId, Query, _Reply, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. -on_get_status(_InstanceId, #{pool_name := PoolName}) -> - Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), +on_get_status(_InstanceId, #{pool_name := Pool}) -> + Health = emqx_resource_pool:health_check_workers( + Pool, {emqx_ee_connector_dynamo_client, is_connected, []} + ), status_result(Health). status_result(_Status = true) -> connected; diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index e831b4f2f..2e1730b52 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -96,8 +96,10 @@ on_start( connector => InstanceId, config => redact(Config) }), - Config = maps:merge(default_security_info(), Config1), - #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + Servers = lists:map( + fun(#{hostname := Host, port := Port}) -> {Host, Port} end, + emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) + ), ClientId = client_id(InstanceId), TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 180d1271c..90d90cb36 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -304,11 +304,10 @@ on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> ), do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -on_get_status(_InstanceId, #{pool_name := PoolName, resource_opts := ResourceOpts} = _State) -> - RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), +on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, - {?MODULE, do_get_status, []}, + {?MODULE, do_get_status, []} ), status_result(Health).