feat(redis connector): redis connector support cluster (#5074)

This commit is contained in:
Rory Z 2021-06-25 11:47:57 +08:00 committed by zhanghongtong
parent 36c7785fd0
commit 6f0f670550
7 changed files with 89 additions and 41 deletions

View File

@ -29,7 +29,7 @@ authz:{
# { # {
# type: redis # type: redis
# config: { # config: {
# servers: "127.0.0.1:6379" # server: "127.0.0.1:6379"
# database: 0 # database: 0
# pool_size: 1 # pool_size: 1
# password: public # password: public

View File

@ -70,14 +70,14 @@ create_resource(#{<<"type">> := DB,
case emqx_resource:check_and_create( case emqx_resource:check_and_create(
ResourceID, ResourceID,
list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
Config) #{<<"config">> => Config })
of of
{ok, _} -> {ok, _} ->
Rule#{<<"resource_id">> => ResourceID}; Rule#{<<"resource_id">> => ResourceID};
{error, already_created} -> {error, already_created} ->
Rule#{<<"resource_id">> => ResourceID}; Rule#{<<"resource_id">> => ResourceID};
{error, Reason} -> {error, Reason} ->
error({load_sql_error, Reason}) error({load_config_error, Reason})
end. end.
-spec(compile(rule()) -> rule()). -spec(compile(rule()) -> rule()).

View File

@ -19,9 +19,15 @@ fields(authz) ->
fields(redis_connector) -> fields(redis_connector) ->
[ {principal, principal()} [ {principal, principal()}
, {type, #{type => hoconsc:enum([redis])}} , {type, #{type => hoconsc:enum([redis])}}
, {config, #{type => map()}} , {config, #{type => hoconsc:union(
[ hoconsc:ref(emqx_connector_redis, cluster)
, hoconsc:ref(emqx_connector_redis, sentinel)
, hoconsc:ref(emqx_connector_redis, single)
])}
}
, {cmd, query()} , {cmd, query()}
]; ];
fields(sql_connector) -> fields(sql_connector) ->
[ {principal, principal() } [ {principal, principal() }
, {type, #{type => hoconsc:enum([mysql, pgsql])}} , {type, #{type => hoconsc:enum([mysql, pgsql])}}

View File

@ -51,7 +51,12 @@ set_special_configs(emqx_authz) ->
emqx_ct_helpers:deps_path(emqx_authz, "test")), emqx_ct_helpers:deps_path(emqx_authz, "test")),
Conf = #{<<"authz">> => Conf = #{<<"authz">> =>
#{<<"rules">> => #{<<"rules">> =>
[#{<<"config">> =>#{<<"meck">> => <<"fake">>}, [#{<<"config">> =>#{
<<"server">> => <<"127.0.0.1:6379">>,
<<"password">> => <<"public">>,
<<"pool_size">> => 1,
<<"auto_reconnect">> => true
},
<<"principal">> => all, <<"principal">> => all,
<<"cmd">> => <<"fake cmd">>, <<"cmd">> => <<"fake cmd">>,
<<"type">> => redis} <<"type">> => redis}

View File

@ -7,6 +7,7 @@
[kernel, [kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
eredis_cluster,
ecpool ecpool
]}, ]},
{env,[]}, {env,[]},

View File

@ -19,6 +19,11 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-type server() :: tuple().
-reflect_type([server/0]).
-typerefl_from_string({server/0, ?MODULE, to_server}).
-export([to_server/1]).
-export([structs/0, fields/1]). -export([structs/0, fields/1]).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
@ -39,29 +44,67 @@
structs() -> [""]. structs() -> [""].
fields("") -> fields("") ->
[ {config, #{type => hoconsc:union(
[ hoconsc:ref(cluster)
, hoconsc:ref(single)
, hoconsc:ref(sentinel)
])}
}
];
fields(single) ->
[ {server, #{type => server()}}
, {redis_type, #{type => hoconsc:enum([single]),
default => single}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(cluster) ->
[ {servers, #{type => hoconsc:array(server())}}
, {redis_type, #{type => hoconsc:enum([cluster]),
default => cluster}}
] ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields();
fields(sentinel) ->
[ {servers, #{type => hoconsc:array(server())}}
, {redis_type, #{type => hoconsc:enum([sentinel]),
default => sentinel}}
, {sentinel, #{type => string()}}
] ++
redis_fields() ++ redis_fields() ++
redis_sentinel_fields() ++
emqx_connector_schema_lib:ssl_fields(). emqx_connector_schema_lib:ssl_fields().
on_jsonify(Config) -> on_jsonify(Config) ->
Config. Config.
%% =================================================================== %% ===================================================================
on_start(InstId, #{servers := Servers, on_start(InstId, #{config :=#{redis_type := Type,
redis_type := Type,
database := Database, database := Database,
pool_size := PoolSize, pool_size := PoolSize,
auto_reconnect := AutoReconn} = Config) -> auto_reconnect := AutoReconn} = Config}) ->
logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]), logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]),
SslOpts = init_ssl_opts(Config, InstId), Servers = case Type of
single -> [{servers, [maps:get(server, Config)]}];
_ ->[{servers, maps:get(servers, Config)}]
end,
Opts = [{pool_size, PoolSize}, Opts = [{pool_size, PoolSize},
{database, Database}, {database, Database},
{password, maps:get(password, Config, "")}, {password, maps:get(password, Config, "")},
{auto_reconnect, reconn_interval(AutoReconn)}, {auto_reconnect, reconn_interval(AutoReconn)}
{servers, Servers}], ] ++ Servers,
Options = [{options, SslOpts}, {sentinel, maps:get(sentinel, Config, undefined)}], Options = init_ssl_opts(Config, InstId) ++
[{sentinel, maps:get(sentinel, Config, undefined)}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ Options), case Type of
cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
{ok, _} -> ok;
{ok, _, _} -> ok;
{error, Reason} -> error(connect_redis_cluster_failed, Reason)
end;
_ ->
_ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}])
end,
{ok, #{poolname => PoolName, type => Type}}. {ok, #{poolname => PoolName, type => Type}}.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
@ -70,7 +113,11 @@ on_stop(InstId, #{poolname := PoolName}) ->
on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
logger:debug("redis connector ~p received cmd query: ~p, at state: ~p", [InstId, Command, State]), logger:debug("redis connector ~p received cmd query: ~p, at state: ~p", [InstId, Command, State]),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) of Result = case Type of
cluster -> eredis_cluster:q(PoolName, Command);
_ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover)
end,
case Result of
{error, Reason} -> {error, Reason} ->
logger:debug("redis connector ~p do cmd query failed, cmd: ~p, reason: ~p", [InstId, Command, Reason]), logger:debug("redis connector ~p do cmd query failed, cmd: ~p, reason: ~p", [InstId, Command, Reason]),
emqx_resource:query_failed(AfterCommand); emqx_resource:query_failed(AfterCommand);
@ -79,15 +126,15 @@ on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := T
end, end,
Result. Result.
on_health_check(_InstId, #{type := cluster, poolname := PoolName}) -> on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) ->
Workers = lists:flatten([gen_server:call(PoolPid, get_all_workers) || Workers = lists:flatten([gen_server:call(PoolPid, get_all_workers) ||
PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]), PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]),
case length(Workers) > 0 andalso lists:all( case length(Workers) > 0 andalso lists:all(
fun({_, Pid, _, _}) -> fun({_, Pid, _, _}) ->
eredis_cluster_pool_worker:is_connected(Pid) =:= true eredis_cluster_pool_worker:is_connected(Pid) =:= true
end, Workers) of end, Workers) of
true -> {ok, true}; true -> {ok, State};
false -> {error, false} false -> {error, test_query_failed, State}
end; end;
on_health_check(_InstId, #{poolname := PoolName} = State) -> on_health_check(_InstId, #{poolname := PoolName} = State) ->
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
@ -118,26 +165,15 @@ init_ssl_opts(_Config, _InstId) ->
[{ssl, false}]. [{ssl, false}].
redis_fields() -> redis_fields() ->
[ {redis_type, fun redis_type/1} [ {pool_size, fun emqx_connector_schema_lib:pool_size/1}
, {servers, fun emqx_connector_schema_lib:servers/1}
, {pool_size, fun emqx_connector_schema_lib:pool_size/1}
, {password, fun emqx_connector_schema_lib:password/1} , {password, fun emqx_connector_schema_lib:password/1}
, {database, fun database/1} , {database, #{type => integer(),
default => 0}}
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} , {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ].
redis_sentinel_fields() -> to_server(Server) ->
[ {sentinel, fun sentinel_name/1} case string:tokens(Server, ":") of
]. [Host, Port] -> {ok, {Host, list_to_integer(Port)}};
_ -> {error, Server}
sentinel_name(type) -> binary(); end.
sentinel_name(nullable) -> true;
sentinel_name(_) -> undefined.
redis_type(type) -> hoconsc:enum([single, sentinel, cluster]);
redis_type(default) -> single;
redis_type(_) -> undefined.
database(type) -> integer();
database(default) -> 0;
database(_) -> undefined.

View File

@ -35,7 +35,7 @@
{deps, {deps,
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.6"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.6"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.6"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.7"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}