Merge pull request #6565 from JimMoen/fix-redis-servers-field

fix(connector): redis cluster `servers` field
This commit is contained in:
JimMoen 2021-12-30 19:33:12 +08:00 committed by GitHub
commit ff3707fbdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 23 deletions

View File

@ -182,8 +182,7 @@ definitions() ->
mongo_type => #{type => string,
enum => [<<"rs">>],
example => <<"rs">>},
servers => #{type => array,
items => #{type => string,example => <<"127.0.0.1:27017">>}},
servers => #{type => string, example => <<"127.0.0.1:27017, 127.0.0.2:27017">>},
replica_set_name => #{type => string},
pool_size => #{type => integer},
username => #{type => string},
@ -240,8 +239,7 @@ definitions() ->
mongo_type => #{type => string,
enum => [<<"sharded">>],
example => <<"sharded">>},
servers => #{type => array,
items => #{type => string,example => <<"127.0.0.1:27017">>}},
servers => #{type => string,example => <<"127.0.0.1:27017, 127.0.0.2:27017">>},
pool_size => #{type => integer},
username => #{type => string},
password => #{type => string},
@ -401,8 +399,7 @@ definitions() ->
type => string,
example => <<"HGETALL mqtt_authz">>
},
servers => #{type => array,
items => #{type => string,example => <<"127.0.0.1:3306">>}},
servers => #{type => string, example => <<"127.0.0.1:6379, 127.0.0.2:6379">>},
redis_type => #{type => string,
enum => [<<"sentinel">>],
example => <<"sentinel">>},
@ -438,8 +435,7 @@ definitions() ->
type => string,
example => <<"HGETALL mqtt_authz">>
},
servers => #{type => array,
items => #{type => string, example => <<"127.0.0.1:3306">>}},
servers => #{type => string, example => <<"127.0.0.1:6379, 127.0.0.2:6379">>},
redis_type => #{type => string,
enum => [<<"cluster">>],
example => <<"cluster">>},

View File

@ -102,6 +102,7 @@ set_special_configs(_App) ->
<<"query">> => <<"abcb">>
}).
-define(SOURCE5, #{<<"type">> => <<"redis">>,
<<"redis_type">> => <<"single">>,
<<"enable">> => true,
<<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1,

View File

@ -37,9 +37,7 @@
-define(SOURCE2, #{<<"type">> => <<"mongodb">>,
<<"enable">> => true,
<<"mongo_type">> => <<"sharded">>,
<<"servers">> => [<<"127.0.0.1:27017">>,
<<"192.168.0.1:27017">>
],
<<"servers">> => <<"127.0.0.1:27017, 192.168.0.1:27017">>,
<<"pool_size">> => 1,
<<"database">> => <<"mqtt">>,
<<"ssl">> => #{<<"enable">> => false},
@ -70,9 +68,7 @@
}).
-define(SOURCE5, #{<<"type">> => <<"redis">>,
<<"enable">> => true,
<<"servers">> => [<<"127.0.0.1:6379">>,
<<"127.0.0.1:6380">>
],
<<"servers">> => <<"127.0.0.1:6379, 127.0.0.1:6380">>,
<<"pool_size">> => 1,
<<"database">> => 0,
<<"password">> => <<"ee">>,
@ -88,7 +84,7 @@
}).
all() ->
[]. %% Todo: Waiting for @terry-xiaoyu to fix the config_not_found error
[]. %% Todo: Waiting for fixing the ssl cert test.
% emqx_common_test_helpers:all(?MODULE).
groups() ->

View File

@ -20,12 +20,19 @@
-include_lib("emqx/include/logger.hrl").
-type server() :: tuple().
%% {"127.0.0.1", 7000}
%% For eredis:start_link/1~7
-reflect_type([server/0]).
-typerefl_from_string({server/0, ?MODULE, to_server}).
-export([to_server/1]).
-type servers() :: list().
%% [{"127.0.0.1", 7000}, {"127.0.0.2", 7000}]
%% For eredis_cluster
-reflect_type([servers/0]).
-typerefl_from_string({servers/0, ?MODULE, to_servers}).
-export([ to_server/1
, to_servers/1]).
-export([roots/0, fields/1]).
@ -63,14 +70,14 @@ fields(single) ->
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(cluster) ->
[ {servers, #{type => hoconsc:array(server())}}
[ {servers, #{type => servers()}}
, {redis_type, #{type => hoconsc:enum([cluster]),
default => cluster}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(sentinel) ->
[ {servers, #{type => hoconsc:array(server())}}
[ {servers, #{type => servers()}}
, {redis_type, #{type => hoconsc:enum([sentinel]),
default => sentinel}}
, {sentinel, #{type => string()}}
@ -181,7 +188,23 @@ redis_fields() ->
].
to_server(Server) ->
case string:tokens(Server, ":") of
[Host, Port] -> {ok, {Host, list_to_integer(Port)}};
_ -> {error, Server}
try {ok, parse_server(Server)}
catch
throw : Error ->
Error
end.
to_servers(Servers) ->
try {ok, lists:map(fun parse_server/1, string:tokens(Servers, ", "))}
catch
throw : _Reason ->
{error, Servers}
end.
parse_server(Server) ->
case string:tokens(Server, ": ") of
[Host, Port] ->
{Host, list_to_integer(Port)};
_ ->
throw({error, Server})
end.

View File

@ -423,8 +423,10 @@ typename_to_spec("duration_ms()", _Mod) -> #{type => string, example => <<"32s">
typename_to_spec("percent()", _Mod) -> #{type => number, example => <<"12%">>};
typename_to_spec("file()", _Mod) -> #{type => string, example => <<"/path/to/file">>};
typename_to_spec("ip_port()", _Mod) -> #{type => string, example => <<"127.0.0.1:80">>};
typename_to_spec("ip_ports()", _Mod) -> #{type => string, example => <<"127.0.0.1:80, 127.0.0.2:80">>};
typename_to_spec("url()", _Mod) -> #{type => string, example => <<"http://127.0.0.1">>};
typename_to_spec("server()", Mod) -> typename_to_spec("ip_port()", Mod);
typename_to_spec("servers()", Mod) -> typename_to_spec("ip_ports()", Mod);
typename_to_spec("connect_timeout()", Mod) -> typename_to_spec("timeout()", Mod);
typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, example => infinity},
#{type => integer, example => 100}], example => infinity};