fix(connector): redis cluster `servers` field

This commit is contained in:
JimMoen 2021-12-29 16:12:14 +08:00
parent c6cc92c608
commit 39c29b2396
2 changed files with 33 additions and 8 deletions

View File

@ -20,12 +20,19 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-type server() :: tuple(). -type server() :: tuple().
%% {"127.0.0.1", 7000}
%% For eredis:start_link/1~7
-reflect_type([server/0]). -reflect_type([server/0]).
-typerefl_from_string({server/0, ?MODULE, to_server}). -typerefl_from_string({server/0, ?MODULE, to_server}).
-export([to_server/1]). -type servers() :: list().
%% [{"127.0.0.1", 7000}, {"127.0.0.2", 7000}]
%% For eredis_cluster
-reflect_type([servers/0]).
-typerefl_from_string({servers/0, ?MODULE, to_servers}).
-export([ to_server/1
, to_servers/1]).
-export([roots/0, fields/1]). -export([roots/0, fields/1]).
@ -63,14 +70,14 @@ fields(single) ->
redis_fields() ++ redis_fields() ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(cluster) -> fields(cluster) ->
[ {servers, #{type => hoconsc:array(server())}} [ {servers, #{type => servers()}}
, {redis_type, #{type => hoconsc:enum([cluster]), , {redis_type, #{type => hoconsc:enum([cluster]),
default => cluster}} default => cluster}}
] ++ ] ++
redis_fields() ++ redis_fields() ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(sentinel) -> fields(sentinel) ->
[ {servers, #{type => hoconsc:array(server())}} [ {servers, #{type => servers()}}
, {redis_type, #{type => hoconsc:enum([sentinel]), , {redis_type, #{type => hoconsc:enum([sentinel]),
default => sentinel}} default => sentinel}}
, {sentinel, #{type => string()}} , {sentinel, #{type => string()}}
@ -181,7 +188,23 @@ redis_fields() ->
]. ].
to_server(Server) -> to_server(Server) ->
case string:tokens(Server, ":") of try {ok, parse_server(Server)}
[Host, Port] -> {ok, {Host, list_to_integer(Port)}}; catch
_ -> {error, Server} throw : Error ->
Error
end.
to_servers(Servers) ->
try {ok, lists:map(fun parse_server/1, string:tokens(Servers, ", "))}
catch
throw : _Reason ->
{error, Servers}
end.
parse_server(Server) ->
case string:tokens(Server, ": ") of
[Host, Port] ->
{Host, list_to_integer(Port)};
_ ->
throw({error, Server})
end. end.

View File

@ -423,8 +423,10 @@ typename_to_spec("duration_ms()", _Mod) -> #{type => string, example => <<"32s">
typename_to_spec("percent()", _Mod) -> #{type => number, example => <<"12%">>}; typename_to_spec("percent()", _Mod) -> #{type => number, example => <<"12%">>};
typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/file">>}; typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/file">>};
typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>}; typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>};
typename_to_spec("ip_ports()", _Mod) -> #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>};
typename_to_spec("url()", _Mod) -> #{type => string, example => <<"http://127.0.0.1">>}; typename_to_spec("url()", _Mod) -> #{type => string, example => <<"http://127.0.0.1">>};
typename_to_spec("server()", Mod) -> typename_to_spec("ip_port()", Mod); typename_to_spec("server()", Mod) -> typename_to_spec("ip_port()", Mod);
typename_to_spec("servers()", Mod) -> typename_to_spec("ip_ports()", Mod);
typename_to_spec("connect_timeout()", Mod) -> typename_to_spec("timeout()", Mod); typename_to_spec("connect_timeout()", Mod) -> typename_to_spec("timeout()", Mod);
typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, example => infinity}, typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, example => infinity},
#{type => integer, example => 100}], example => infinity}; #{type => integer, example => 100}], example => infinity};