refactor: split `parse_server` into smaller functions, improve return type to use map

This commit is contained in:
Thales Macedo Garitezi 2023-04-24 11:01:33 -03:00
parent cb149ac345
commit 377b143325
19 changed files with 220 additions and 110 deletions

View File

@ -66,6 +66,12 @@
-typerefl_from_string({url/0, emqx_schema, to_url}).
-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}).
-type parsed_server() :: #{
hostname := string(),
port => port_number(),
scheme => string()
}.
-export([
validate_heap_size/1,
user_lookup_fun_tr/2,
@ -2901,10 +2907,7 @@ servers_validator(Opts, Required) ->
%% `no_port': by default it's `false', when set to `true',
%% a `throw' exception is raised if the port is found.
-spec parse_server(undefined | string() | binary(), server_parse_option()) ->
string()
| {string(), port_number()}
| {string(), string()}
| {string(), string(), port_number()}.
undefined | parsed_server().
parse_server(Str, Opts) ->
case parse_servers(Str, Opts) of
undefined ->
@ -2918,12 +2921,7 @@ parse_server(Str, Opts) ->
%% @doc Parse comma separated `host[:port][,host[:port]]' endpoints
%% into a list of `{Host, Port}' tuples or just `Host' string.
-spec parse_servers(undefined | string() | binary(), server_parse_option()) ->
[
string()
| {string(), port_number()}
| {string(), string()}
| {string(), string(), port_number()}
].
undefined | [parsed_server()].
parse_servers(undefined, _Opts) ->
%% should not parse 'undefined' as string,
%% not to throw exception either,
@ -2988,55 +2986,112 @@ do_parse_server(Str, Opts) ->
ok
end,
%% do not split with space, there should be no space allowed between host and port
case string:tokens(Str, ":") of
[Scheme, "//" ++ Hostname, Port] ->
NotExpectingPort andalso throw("not_expecting_port_number"),
NotExpectingScheme andalso throw("not_expecting_scheme"),
{check_scheme(Scheme, Opts), check_hostname(Hostname), parse_port(Port)};
[Scheme, "//" ++ Hostname] ->
NotExpectingScheme andalso throw("not_expecting_scheme"),
case is_integer(DefaultPort) of
true ->
{check_scheme(Scheme, Opts), check_hostname(Hostname), DefaultPort};
false when NotExpectingPort ->
{check_scheme(Scheme, Opts), check_hostname(Hostname)};
false ->
throw("missing_port_number")
end;
[Hostname, Port] ->
NotExpectingPort andalso throw("not_expecting_port_number"),
case is_list(DefaultScheme) of
false ->
{check_hostname(Hostname), parse_port(Port)};
true ->
{DefaultScheme, check_hostname(Hostname), parse_port(Port)}
end;
[Hostname] ->
case is_integer(DefaultPort) orelse NotExpectingPort of
true ->
ok;
false ->
throw("missing_port_number")
end,
case is_list(DefaultScheme) orelse NotExpectingScheme of
true ->
ok;
false ->
throw("missing_scheme")
end,
case {is_integer(DefaultPort), is_list(DefaultScheme)} of
{true, true} ->
{DefaultScheme, check_hostname(Hostname), DefaultPort};
{true, false} ->
{check_hostname(Hostname), DefaultPort};
{false, true} ->
{DefaultScheme, check_hostname(Hostname)};
{false, false} ->
check_hostname(Hostname)
end;
_ ->
throw("bad_host_port")
end.
Tokens = string:tokens(Str, ":"),
Context = #{
not_expecting_port => NotExpectingPort,
not_expecting_scheme => NotExpectingScheme,
default_port => DefaultPort,
default_scheme => DefaultScheme,
opts => Opts
},
check_server_parts(Tokens, Context).
check_server_parts([Scheme, "//" ++ Hostname, Port], Context) ->
#{
not_expecting_scheme := NotExpectingScheme,
not_expecting_port := NotExpectingPort,
opts := Opts
} = Context,
NotExpectingPort andalso throw("not_expecting_port_number"),
NotExpectingScheme andalso throw("not_expecting_scheme"),
#{
scheme => check_scheme(Scheme, Opts),
hostname => check_hostname(Hostname),
port => parse_port(Port)
};
check_server_parts([Scheme, "//" ++ Hostname], Context) ->
#{
not_expecting_scheme := NotExpectingScheme,
not_expecting_port := NotExpectingPort,
default_port := DefaultPort,
opts := Opts
} = Context,
NotExpectingScheme andalso throw("not_expecting_scheme"),
case is_integer(DefaultPort) of
true ->
#{
scheme => check_scheme(Scheme, Opts),
hostname => check_hostname(Hostname),
port => DefaultPort
};
false when NotExpectingPort ->
#{
scheme => check_scheme(Scheme, Opts),
hostname => check_hostname(Hostname)
};
false ->
throw("missing_port_number")
end;
check_server_parts([Hostname, Port], Context) ->
#{
not_expecting_port := NotExpectingPort,
default_scheme := DefaultScheme
} = Context,
NotExpectingPort andalso throw("not_expecting_port_number"),
case is_list(DefaultScheme) of
false ->
#{
hostname => check_hostname(Hostname),
port => parse_port(Port)
};
true ->
#{
scheme => DefaultScheme,
hostname => check_hostname(Hostname),
port => parse_port(Port)
}
end;
check_server_parts([Hostname], Context) ->
#{
not_expecting_scheme := NotExpectingScheme,
not_expecting_port := NotExpectingPort,
default_port := DefaultPort,
default_scheme := DefaultScheme
} = Context,
case is_integer(DefaultPort) orelse NotExpectingPort of
true ->
ok;
false ->
throw("missing_port_number")
end,
case is_list(DefaultScheme) orelse NotExpectingScheme of
true ->
ok;
false ->
throw("missing_scheme")
end,
case {is_integer(DefaultPort), is_list(DefaultScheme)} of
{true, true} ->
#{
scheme => DefaultScheme,
hostname => check_hostname(Hostname),
port => DefaultPort
};
{true, false} ->
#{
hostname => check_hostname(Hostname),
port => DefaultPort
};
{false, true} ->
#{
scheme => DefaultScheme,
hostname => check_hostname(Hostname)
};
{false, false} ->
#{hostname => check_hostname(Hostname)}
end;
check_server_parts(_Tokens, _Context) ->
throw("bad_host_port").
check_scheme(Str, Opts) ->
SupportedSchemes = maps:get(supported_schemes, Opts, []),

View File

@ -219,112 +219,124 @@ parse_server_test_() ->
?T(
"single server, binary, no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse(<<"localhost">>)
)
),
?T(
"single server, string, no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse("localhost")
)
),
?T(
"single server, list(string), no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse(["localhost"])
)
),
?T(
"single server, list(binary), no port",
?assertEqual(
[{"localhost", DefaultPort}],
[#{hostname => "localhost", port => DefaultPort}],
Parse([<<"localhost">>])
)
),
?T(
"single server, binary, with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse(<<"localhost:9999">>)
)
),
?T(
"single server, list(string), with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse(["localhost:9999"])
)
),
?T(
"single server, string, with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse("localhost:9999")
)
),
?T(
"single server, list(binary), with port",
?assertEqual(
[{"localhost", 9999}],
[#{hostname => "localhost", port => 9999}],
Parse([<<"localhost:9999">>])
)
),
?T(
"multiple servers, string, no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse("host1, host2")
)
),
?T(
"multiple servers, binary, no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse(<<"host1, host2,,,">>)
)
),
?T(
"multiple servers, list(string), no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse(["host1", "host2"])
)
),
?T(
"multiple servers, list(binary), no port",
?assertEqual(
[{"host1", DefaultPort}, {"host2", DefaultPort}],
[
#{hostname => "host1", port => DefaultPort},
#{hostname => "host2", port => DefaultPort}
],
Parse([<<"host1">>, <<"host2">>])
)
),
?T(
"multiple servers, string, with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse("host1:1234, host2:2345")
)
),
?T(
"multiple servers, binary, with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse(<<"host1:1234, host2:2345, ">>)
)
),
?T(
"multiple servers, list(string), with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse([" host1:1234 ", "host2:2345"])
)
),
?T(
"multiple servers, list(binary), with port",
?assertEqual(
[{"host1", 1234}, {"host2", 2345}],
[#{hostname => "host1", port => 1234}, #{hostname => "host2", port => 2345}],
Parse([<<"host1:1234">>, <<"host2:2345">>])
)
),
@ -352,7 +364,7 @@ parse_server_test_() ->
?T(
"multiple servers without port, mixed list(binary|string)",
?assertEqual(
["host1", "host2"],
[#{hostname => "host1"}, #{hostname => "host2"}],
Parse2([<<"host1">>, "host2"], #{no_port => true})
)
),
@ -394,14 +406,18 @@ parse_server_test_() ->
?T(
"single server map",
?assertEqual(
[{"host1.domain", 1234}],
[#{hostname => "host1.domain", port => 1234}],
HoconParse("host1.domain:1234")
)
),
?T(
"multiple servers map",
?assertEqual(
[{"host1.domain", 1234}, {"host2.domain", 2345}, {"host3.domain", 3456}],
[
#{hostname => "host1.domain", port => 1234},
#{hostname => "host2.domain", port => 2345},
#{hostname => "host3.domain", port => 3456}
],
HoconParse("host1.domain:1234,host2.domain:2345,host3.domain:3456")
)
),
@ -451,7 +467,7 @@ parse_server_test_() ->
?T(
"scheme, hostname and port",
?assertEqual(
{"pulsar+ssl", "host", 6651},
#{scheme => "pulsar+ssl", hostname => "host", port => 6651},
emqx_schema:parse_server(
"pulsar+ssl://host:6651",
#{
@ -464,7 +480,7 @@ parse_server_test_() ->
?T(
"scheme and hostname, default port",
?assertEqual(
{"pulsar", "host", 6650},
#{scheme => "pulsar", hostname => "host", port => 6650},
emqx_schema:parse_server(
"pulsar://host",
#{
@ -477,7 +493,7 @@ parse_server_test_() ->
?T(
"scheme and hostname, no port",
?assertEqual(
{"pulsar", "host"},
#{scheme => "pulsar", hostname => "host"},
emqx_schema:parse_server(
"pulsar://host",
#{
@ -503,7 +519,7 @@ parse_server_test_() ->
?T(
"hostname, default scheme, no default port",
?assertEqual(
{"pulsar", "host"},
#{scheme => "pulsar", hostname => "host"},
emqx_schema:parse_server(
"host",
#{
@ -517,7 +533,7 @@ parse_server_test_() ->
?T(
"hostname, default scheme, default port",
?assertEqual(
{"pulsar", "host", 6650},
#{scheme => "pulsar", hostname => "host", port => 6650},
emqx_schema:parse_server(
"host",
#{
@ -544,7 +560,7 @@ parse_server_test_() ->
?T(
"hostname, default scheme, defined port",
?assertEqual(
{"pulsar", "host", 6651},
#{scheme => "pulsar", hostname => "host", port => 6651},
emqx_schema:parse_server(
"host:6651",
#{
@ -572,7 +588,7 @@ parse_server_test_() ->
?T(
"hostname, default scheme, defined port",
?assertEqual(
{"pulsar", "host", 6651},
#{scheme => "pulsar", hostname => "host", port => 6651},
emqx_schema:parse_server(
"host:6651",
#{
@ -600,9 +616,9 @@ parse_server_test_() ->
"multiple hostnames with schemes (1)",
?assertEqual(
[
{"pulsar", "host", 6649},
{"pulsar+ssl", "other.host", 6651},
{"pulsar", "yet.another", 6650}
#{scheme => "pulsar", hostname => "host", port => 6649},
#{scheme => "pulsar+ssl", hostname => "other.host", port => 6651},
#{scheme => "pulsar", hostname => "yet.another", port => 6650}
],
emqx_schema:parse_servers(
"pulsar://host:6649, pulsar+ssl://other.host:6651,pulsar://yet.another",

View File

@ -92,7 +92,7 @@ callback_mode() -> async_if_possible.
on_start(
InstId,
#{
servers := Servers,
servers := Servers0,
keyspace := Keyspace,
username := Username,
pool_size := PoolSize,
@ -104,9 +104,16 @@ on_start(
connector => InstId,
config => emqx_utils:redact(Config)
}),
Servers =
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers0, ?DEFAULT_SERVER_OPTION)
),
Options = [
{nodes, emqx_schema:parse_servers(Servers, ?DEFAULT_SERVER_OPTION)},
{nodes, Servers},
{username, Username},
{password, emqx_secret:wrap(maps:get(password, Config, ""))},
{keyspace, Keyspace},

View File

@ -38,9 +38,14 @@ groups() ->
[].
cassandra_servers() ->
emqx_schema:parse_servers(
iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
#{default_port => ?CASSANDRA_DEFAULT_PORT}
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(
iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
#{default_port => ?CASSANDRA_DEFAULT_PORT}
)
).
init_per_suite(Config) ->

View File

@ -81,7 +81,7 @@ on_start(
%% emulating the emulator behavior
%% https://cloud.google.com/pubsub/docs/emulator
HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"),
{Host, Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
#{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
PoolType = random,
Transport = tls,
TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),

View File

@ -180,7 +180,7 @@ to_bin(B) when is_binary(B) ->
format_servers(Servers0) ->
Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS),
lists:map(
fun({Scheme, Host, Port}) ->
fun(#{scheme := Scheme, hostname := Host, port := Port}) ->
Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port)
end,
Servers1

View File

@ -67,7 +67,17 @@ on_start(
connector => InstId,
config => emqx_utils:redact(Config)
}),
Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS),
Servers1 = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS),
Servers =
lists:map(
fun
(#{hostname := Host, port := Port0}) ->
{Host, Port0};
(#{hostname := Host}) ->
Host
end,
Servers1
),
SslOpts =
case maps:get(enable, SSL) of
true ->

View File

@ -537,4 +537,9 @@ format_hosts(Hosts) ->
lists:map(fun format_host/1, Hosts).
parse_servers(HoconValue) ->
emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS).
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS)
).

View File

@ -98,7 +98,7 @@ on_start(
ssl := SSL
} = Config
) ->
{Host, Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS),
?SLOG(info, #{
msg => "starting_mysql_connector",
connector => InstId,

View File

@ -91,7 +91,7 @@ on_start(
ssl := SSL
} = Config
) ->
{Host, Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
?SLOG(info, #{
msg => "starting_postgresql_connector",
connector => InstId,

View File

@ -131,7 +131,13 @@ on_start(
_ -> servers
end,
Servers0 = maps:get(ConfKey, Config),
Servers = [{servers, emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)}],
Servers1 = lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)
),
Servers = [{servers, Servers1}],
Database =
case Type of
cluster -> [];

View File

@ -293,4 +293,5 @@ qos() ->
hoconsc:union([emqx_schema:qos(), binary()]).
parse_server(Str) ->
emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS).
#{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS),
{Host, Port}.

View File

@ -80,7 +80,7 @@ init(Conf) ->
flush_time_interval := FlushTimeInterval
} = Conf,
FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval),
{Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],
{ok, Pid} = estatsd:start_link(Opts),

View File

@ -449,9 +449,14 @@ all_test_hosts() ->
).
parse_servers(Servers) ->
emqx_schema:parse_servers(Servers, #{
default_port => 6379
}).
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(Servers, #{
default_port => 6379
})
).
redis_connect_ssl_opts(Type) ->
maps:merge(

View File

@ -92,7 +92,7 @@ on_start(
}),
{Schema, Server} = get_host_schema(to_str(Url)),
{Host, Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS),
Options = [
{config, #{

View File

@ -294,7 +294,7 @@ client_config(
server := Server
}
) ->
{Host, Port} = emqx_schema:parse_server(Server, ?INFLUXDB_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?INFLUXDB_HOST_OPTIONS),
[
{host, str(Host)},
{port, Port},

View File

@ -105,7 +105,7 @@ on_start(
config => redact(Config1)
}),
Config = maps:merge(default_security_info(), Config1),
{Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS),
Server1 = [{Host, Port}],
ClientId = client_id(InstanceId),

View File

@ -355,7 +355,7 @@ conn_str([], Acc) ->
conn_str([{driver, Driver} | Opts], Acc) ->
conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]);
conn_str([{server, Server} | Opts], Acc) ->
{Host, Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS),
conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]);
conn_str([{database, Database} | Opts], Acc) ->
conn_str(Opts, ["Database=" ++ str(Database) | Acc]);

View File

@ -96,7 +96,7 @@ on_start(
config => emqx_utils:redact(Config)
}),
{Host, Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS),
Options = [
{host, to_bin(Host)},
{port, Port},