diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 248fdad7f..69f234e47 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -66,6 +66,12 @@ -typerefl_from_string({url/0, emqx_schema, to_url}). -typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}). +-type parsed_server() :: #{ + hostname := string(), + port => port_number(), + scheme => string() +}. + -export([ validate_heap_size/1, user_lookup_fun_tr/2, @@ -2901,10 +2907,7 @@ servers_validator(Opts, Required) -> %% `no_port': by default it's `false', when set to `true', %% a `throw' exception is raised if the port is found. -spec parse_server(undefined | string() | binary(), server_parse_option()) -> - string() - | {string(), port_number()} - | {string(), string()} - | {string(), string(), port_number()}. + undefined | parsed_server(). parse_server(Str, Opts) -> case parse_servers(Str, Opts) of undefined -> @@ -2918,12 +2921,7 @@ parse_server(Str, Opts) -> %% @doc Parse comma separated `host[:port][,host[:port]]' endpoints %% into a list of `{Host, Port}' tuples or just `Host' string. -spec parse_servers(undefined | string() | binary(), server_parse_option()) -> - [ - string() - | {string(), port_number()} - | {string(), string()} - | {string(), string(), port_number()} - ]. + undefined | [parsed_server()]. parse_servers(undefined, _Opts) -> %% should not parse 'undefined' as string, %% not to throw exception either, @@ -2988,55 +2986,112 @@ do_parse_server(Str, Opts) -> ok end, %% do not split with space, there should be no space allowed between host and port - case string:tokens(Str, ":") of - [Scheme, "//" ++ Hostname, Port] -> - NotExpectingPort andalso throw("not_expecting_port_number"), - NotExpectingScheme andalso throw("not_expecting_scheme"), - {check_scheme(Scheme, Opts), check_hostname(Hostname), parse_port(Port)}; - [Scheme, "//" ++ Hostname] -> - NotExpectingScheme andalso throw("not_expecting_scheme"), - case is_integer(DefaultPort) of - true -> - {check_scheme(Scheme, Opts), check_hostname(Hostname), DefaultPort}; - false when NotExpectingPort -> - {check_scheme(Scheme, Opts), check_hostname(Hostname)}; - false -> - throw("missing_port_number") - end; - [Hostname, Port] -> - NotExpectingPort andalso throw("not_expecting_port_number"), - case is_list(DefaultScheme) of - false -> - {check_hostname(Hostname), parse_port(Port)}; - true -> - {DefaultScheme, check_hostname(Hostname), parse_port(Port)} - end; - [Hostname] -> - case is_integer(DefaultPort) orelse NotExpectingPort of - true -> - ok; - false -> - throw("missing_port_number") - end, - case is_list(DefaultScheme) orelse NotExpectingScheme of - true -> - ok; - false -> - throw("missing_scheme") - end, - case {is_integer(DefaultPort), is_list(DefaultScheme)} of - {true, true} -> - {DefaultScheme, check_hostname(Hostname), DefaultPort}; - {true, false} -> - {check_hostname(Hostname), DefaultPort}; - {false, true} -> - {DefaultScheme, check_hostname(Hostname)}; - {false, false} -> - check_hostname(Hostname) - end; - _ -> - throw("bad_host_port") - end. + Tokens = string:tokens(Str, ":"), + Context = #{ + not_expecting_port => NotExpectingPort, + not_expecting_scheme => NotExpectingScheme, + default_port => DefaultPort, + default_scheme => DefaultScheme, + opts => Opts + }, + check_server_parts(Tokens, Context). + +check_server_parts([Scheme, "//" ++ Hostname, Port], Context) -> + #{ + not_expecting_scheme := NotExpectingScheme, + not_expecting_port := NotExpectingPort, + opts := Opts + } = Context, + NotExpectingPort andalso throw("not_expecting_port_number"), + NotExpectingScheme andalso throw("not_expecting_scheme"), + #{ + scheme => check_scheme(Scheme, Opts), + hostname => check_hostname(Hostname), + port => parse_port(Port) + }; +check_server_parts([Scheme, "//" ++ Hostname], Context) -> + #{ + not_expecting_scheme := NotExpectingScheme, + not_expecting_port := NotExpectingPort, + default_port := DefaultPort, + opts := Opts + } = Context, + NotExpectingScheme andalso throw("not_expecting_scheme"), + case is_integer(DefaultPort) of + true -> + #{ + scheme => check_scheme(Scheme, Opts), + hostname => check_hostname(Hostname), + port => DefaultPort + }; + false when NotExpectingPort -> + #{ + scheme => check_scheme(Scheme, Opts), + hostname => check_hostname(Hostname) + }; + false -> + throw("missing_port_number") + end; +check_server_parts([Hostname, Port], Context) -> + #{ + not_expecting_port := NotExpectingPort, + default_scheme := DefaultScheme + } = Context, + NotExpectingPort andalso throw("not_expecting_port_number"), + case is_list(DefaultScheme) of + false -> + #{ + hostname => check_hostname(Hostname), + port => parse_port(Port) + }; + true -> + #{ + scheme => DefaultScheme, + hostname => check_hostname(Hostname), + port => parse_port(Port) + } + end; +check_server_parts([Hostname], Context) -> + #{ + not_expecting_scheme := NotExpectingScheme, + not_expecting_port := NotExpectingPort, + default_port := DefaultPort, + default_scheme := DefaultScheme + } = Context, + case is_integer(DefaultPort) orelse NotExpectingPort of + true -> + ok; + false -> + throw("missing_port_number") + end, + case is_list(DefaultScheme) orelse NotExpectingScheme of + true -> + ok; + false -> + throw("missing_scheme") + end, + case {is_integer(DefaultPort), is_list(DefaultScheme)} of + {true, true} -> + #{ + scheme => DefaultScheme, + hostname => check_hostname(Hostname), + port => DefaultPort + }; + {true, false} -> + #{ + hostname => check_hostname(Hostname), + port => DefaultPort + }; + {false, true} -> + #{ + scheme => DefaultScheme, + hostname => check_hostname(Hostname) + }; + {false, false} -> + #{hostname => check_hostname(Hostname)} + end; +check_server_parts(_Tokens, _Context) -> + throw("bad_host_port"). check_scheme(Str, Opts) -> SupportedSchemes = maps:get(supported_schemes, Opts, []), diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index c13dc8055..cb51aca46 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -219,112 +219,124 @@ parse_server_test_() -> ?T( "single server, binary, no port", ?assertEqual( - [{"localhost", DefaultPort}], + [#{hostname => "localhost", port => DefaultPort}], Parse(<<"localhost">>) ) ), ?T( "single server, string, no port", ?assertEqual( - [{"localhost", DefaultPort}], + [#{hostname => "localhost", port => DefaultPort}], Parse("localhost") ) ), ?T( "single server, list(string), no port", ?assertEqual( - [{"localhost", DefaultPort}], + [#{hostname => "localhost", port => DefaultPort}], Parse(["localhost"]) ) ), ?T( "single server, list(binary), no port", ?assertEqual( - [{"localhost", DefaultPort}], + [#{hostname => "localhost", port => DefaultPort}], Parse([<<"localhost">>]) ) ), ?T( "single server, binary, with port", ?assertEqual( - [{"localhost", 9999}], + [#{hostname => "localhost", port => 9999}], Parse(<<"localhost:9999">>) ) ), ?T( "single server, list(string), with port", ?assertEqual( - [{"localhost", 9999}], + [#{hostname => "localhost", port => 9999}], Parse(["localhost:9999"]) ) ), ?T( "single server, string, with port", ?assertEqual( - [{"localhost", 9999}], + [#{hostname => "localhost", port => 9999}], Parse("localhost:9999") ) ), ?T( "single server, list(binary), with port", ?assertEqual( - [{"localhost", 9999}], + [#{hostname => "localhost", port => 9999}], Parse([<<"localhost:9999">>]) ) ), ?T( "multiple servers, string, no port", ?assertEqual( - [{"host1", DefaultPort}, {"host2", DefaultPort}], + [ + #{hostname => "host1", port => DefaultPort}, + #{hostname => "host2", port => DefaultPort} + ], Parse("host1, host2") ) ), ?T( "multiple servers, binary, no port", ?assertEqual( - [{"host1", DefaultPort}, {"host2", DefaultPort}], + [ + #{hostname => "host1", port => DefaultPort}, + #{hostname => "host2", port => DefaultPort} + ], Parse(<<"host1, host2,,,">>) ) ), ?T( "multiple servers, list(string), no port", ?assertEqual( - [{"host1", DefaultPort}, {"host2", DefaultPort}], + [ + #{hostname => "host1", port => DefaultPort}, + #{hostname => "host2", port => DefaultPort} + ], Parse(["host1", "host2"]) ) ), ?T( "multiple servers, list(binary), no port", ?assertEqual( - [{"host1", DefaultPort}, {"host2", DefaultPort}], + [ + #{hostname => "host1", port => DefaultPort}, + #{hostname => "host2", port => DefaultPort} + ], Parse([<<"host1">>, <<"host2">>]) ) ), ?T( "multiple servers, string, with port", ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], + [#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}], Parse("host1:1234, host2:2345") ) ), ?T( "multiple servers, binary, with port", ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], + [#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}], Parse(<<"host1:1234, host2:2345, ">>) ) ), ?T( "multiple servers, list(string), with port", ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], + [#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}], Parse([" host1:1234 ", "host2:2345"]) ) ), ?T( "multiple servers, list(binary), with port", ?assertEqual( - [{"host1", 1234}, {"host2", 2345}], + [#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}], Parse([<<"host1:1234">>, <<"host2:2345">>]) ) ), @@ -352,7 +364,7 @@ parse_server_test_() -> ?T( "multiple servers without port, mixed list(binary|string)", ?assertEqual( - ["host1", "host2"], + [#{hostname => "host1"}, #{hostname => "host2"}], Parse2([<<"host1">>, "host2"], #{no_port => true}) ) ), @@ -394,14 +406,18 @@ parse_server_test_() -> ?T( "single server map", ?assertEqual( - [{"host1.domain", 1234}], + [#{hostname => "host1.domain", port => 1234}], HoconParse("host1.domain:1234") ) ), ?T( "multiple servers map", ?assertEqual( - [{"host1.domain", 1234}, {"host2.domain", 2345}, {"host3.domain", 3456}], + [ + #{hostname => "host1.domain", port => 1234}, + #{hostname => "host2.domain", port => 2345}, + #{hostname => "host3.domain", port => 3456} + ], HoconParse("host1.domain:1234,host2.domain:2345,host3.domain:3456") ) ), @@ -451,7 +467,7 @@ parse_server_test_() -> ?T( "scheme, hostname and port", ?assertEqual( - {"pulsar+ssl", "host", 6651}, + #{scheme => "pulsar+ssl", hostname => "host", port => 6651}, emqx_schema:parse_server( "pulsar+ssl://host:6651", #{ @@ -464,7 +480,7 @@ parse_server_test_() -> ?T( "scheme and hostname, default port", ?assertEqual( - {"pulsar", "host", 6650}, + #{scheme => "pulsar", hostname => "host", port => 6650}, emqx_schema:parse_server( "pulsar://host", #{ @@ -477,7 +493,7 @@ parse_server_test_() -> ?T( "scheme and hostname, no port", ?assertEqual( - {"pulsar", "host"}, + #{scheme => "pulsar", hostname => "host"}, emqx_schema:parse_server( "pulsar://host", #{ @@ -503,7 +519,7 @@ parse_server_test_() -> ?T( "hostname, default scheme, no default port", ?assertEqual( - {"pulsar", "host"}, + #{scheme => "pulsar", hostname => "host"}, emqx_schema:parse_server( "host", #{ @@ -517,7 +533,7 @@ parse_server_test_() -> ?T( "hostname, default scheme, default port", ?assertEqual( - {"pulsar", "host", 6650}, + #{scheme => "pulsar", hostname => "host", port => 6650}, emqx_schema:parse_server( "host", #{ @@ -544,7 +560,7 @@ parse_server_test_() -> ?T( "hostname, default scheme, defined port", ?assertEqual( - {"pulsar", "host", 6651}, + #{scheme => "pulsar", hostname => "host", port => 6651}, emqx_schema:parse_server( "host:6651", #{ @@ -572,7 +588,7 @@ parse_server_test_() -> ?T( "hostname, default scheme, defined port", ?assertEqual( - {"pulsar", "host", 6651}, + #{scheme => "pulsar", hostname => "host", port => 6651}, emqx_schema:parse_server( "host:6651", #{ @@ -600,9 +616,9 @@ parse_server_test_() -> "multiple hostnames with schemes (1)", ?assertEqual( [ - {"pulsar", "host", 6649}, - {"pulsar+ssl", "other.host", 6651}, - {"pulsar", "yet.another", 6650} + #{scheme => "pulsar", hostname => "host", port => 6649}, + #{scheme => "pulsar+ssl", hostname => "other.host", port => 6651}, + #{scheme => "pulsar", hostname => "yet.another", port => 6650} ], emqx_schema:parse_servers( "pulsar://host:6649, pulsar+ssl://other.host:6651,pulsar://yet.another", diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index cf6ddff9f..d0a1df7a8 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -92,7 +92,7 @@ callback_mode() -> async_if_possible. on_start( InstId, #{ - servers := Servers, + servers := Servers0, keyspace := Keyspace, username := Username, pool_size := PoolSize, @@ -104,9 +104,16 @@ on_start( connector => InstId, config => emqx_utils:redact(Config) }), + Servers = + lists:map( + fun(#{hostname := Host, port := Port}) -> + {Host, Port} + end, + emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION) + ), Options = [ - {nodes, emqx_schema:parse_servers(Servers, ?DEFAULT_SERVER_OPTION)}, + {nodes, Servers}, {username, Username}, {password, emqx_secret:wrap(maps:get(password, Config, ""))}, {keyspace, Keyspace}, diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl index f419283a8..452db33a7 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl @@ -38,9 +38,14 @@ groups() -> []. cassandra_servers() -> - emqx_schema:parse_servers( - iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]), - #{default_port => ?CASSANDRA_DEFAULT_PORT} + lists:map( + fun(#{hostname := Host, port := Port}) -> + {Host, Port} + end, + emqx_schema:parse_servers( + iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]), + #{default_port => ?CASSANDRA_DEFAULT_PORT} + ) ). init_per_suite(Config) -> diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index a3f0ef36b..98f3e497d 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -81,7 +81,7 @@ on_start( %% emulating the emulator behavior %% https://cloud.google.com/pubsub/docs/emulator HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"), - {Host, Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}), + #{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}), PoolType = random, Transport = tls, TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}), diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index b86124417..0b195df66 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -180,7 +180,7 @@ to_bin(B) when is_binary(B) -> format_servers(Servers0) -> Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS), lists:map( - fun({Scheme, Host, Port}) -> + fun(#{scheme := Scheme, hostname := Host, port := Port}) -> Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port) end, Servers1 diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index e2121de22..c3e1db7d3 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -67,7 +67,17 @@ on_start( connector => InstId, config => emqx_utils:redact(Config) }), - Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS), + Servers1 = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS), + Servers = + lists:map( + fun + (#{hostname := Host, port := Port0}) -> + {Host, Port0}; + (#{hostname := Host}) -> + Host + end, + Servers1 + ), SslOpts = case maps:get(enable, SSL) of true -> diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index a65a32842..dde8652f0 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -537,4 +537,9 @@ format_hosts(Hosts) -> lists:map(fun format_host/1, Hosts). parse_servers(HoconValue) -> - emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS). + lists:map( + fun(#{hostname := Host, port := Port}) -> + {Host, Port} + end, + emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS) + ). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 45d459e70..b8c1250fe 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -98,7 +98,7 @@ on_start( ssl := SSL } = Config ) -> - {Host, Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS), ?SLOG(info, #{ msg => "starting_mysql_connector", connector => InstId, diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index ddbf9491d..3b2375d04 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -91,7 +91,7 @@ on_start( ssl := SSL } = Config ) -> - {Host, Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS), ?SLOG(info, #{ msg => "starting_postgresql_connector", connector => InstId, diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index e2155eb49..32ac77226 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -131,7 +131,13 @@ on_start( _ -> servers end, Servers0 = maps:get(ConfKey, Config), - Servers = [{servers, emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)}], + Servers1 = lists:map( + fun(#{hostname := Host, port := Port}) -> + {Host, Port} + end, + emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS) + ), + Servers = [{servers, Servers1}], Database = case Type of cluster -> []; diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index e08804685..2a40980af 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -293,4 +293,5 @@ qos() -> hoconsc:union([emqx_schema:qos(), binary()]). parse_server(Str) -> - emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS). + #{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS), + {Host, Port}. diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index c5a7fc1c8..b2d726b07 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -80,7 +80,7 @@ init(Conf) -> flush_time_interval := FlushTimeInterval } = Conf, FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval), - {Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], {ok, Pid} = estatsd:start_link(Opts), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index f0b70d21b..56f932aba 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -449,9 +449,14 @@ all_test_hosts() -> ). parse_servers(Servers) -> - emqx_schema:parse_servers(Servers, #{ - default_port => 6379 - }). + lists:map( + fun(#{hostname := Host, port := Port}) -> + {Host, Port} + end, + emqx_schema:parse_servers(Servers, #{ + default_port => 6379 + }) + ). redis_connect_ssl_opts(Type) -> maps:merge( diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index ebb86f577..f45f8ca2f 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -92,7 +92,7 @@ on_start( }), {Schema, Server} = get_host_schema(to_str(Url)), - {Host, Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), Options = [ {config, #{ diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 700eb2a81..331577486 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -294,7 +294,7 @@ client_config( server := Server } ) -> - {Host, Port} = emqx_schema:parse_server(Server, ?INFLUXDB_HOST_OPTIONS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?INFLUXDB_HOST_OPTIONS), [ {host, str(Host)}, {port, Port}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 205359bb8..74fb4eedd 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -105,7 +105,7 @@ on_start( config => redact(Config1) }), Config = maps:merge(default_security_info(), Config1), - {Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), Server1 = [{Host, Port}], ClientId = client_id(InstanceId), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 70bd76d14..8ea4429d0 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -355,7 +355,7 @@ conn_str([], Acc) -> conn_str([{driver, Driver} | Opts], Acc) -> conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]); conn_str([{server, Server} | Opts], Acc) -> - {Host, Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS), conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]); conn_str([{database, Database} | Opts], Acc) -> conn_str(Opts, ["Database=" ++ str(Database) | Acc]); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl index f9ca21ad7..09cbd8db8 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl @@ -96,7 +96,7 @@ on_start( config => emqx_utils:redact(Config) }), - {Host, Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS), Options = [ {host, to_bin(Host)}, {port, Port},