From 6f0f670550f392721e123c8992440d851d8d63d8 Mon Sep 17 00:00:00 2001 From: Rory Z Date: Fri, 25 Jun 2021 11:47:57 +0800 Subject: [PATCH] feat(redis connector): redis connector support cluster (#5074) --- apps/emqx_authz/etc/emqx_authz.conf | 2 +- apps/emqx_authz/src/emqx_authz.erl | 4 +- apps/emqx_authz/src/emqx_authz_schema.erl | 10 +- .../test/emqx_authz_redis_SUITE.erl | 7 +- .../emqx_connector/src/emqx_connector.app.src | 1 + .../src/emqx_connector_redis.erl | 104 ++++++++++++------ rebar.config | 2 +- 7 files changed, 89 insertions(+), 41 deletions(-) diff --git a/apps/emqx_authz/etc/emqx_authz.conf b/apps/emqx_authz/etc/emqx_authz.conf index b97bc12f0..ec8f642b1 100644 --- a/apps/emqx_authz/etc/emqx_authz.conf +++ b/apps/emqx_authz/etc/emqx_authz.conf @@ -29,7 +29,7 @@ authz:{ # { # type: redis # config: { - # servers: "127.0.0.1:6379" + # server: "127.0.0.1:6379" # database: 0 # pool_size: 1 # password: public diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 0880ef4ae..24393a4b0 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -70,14 +70,14 @@ create_resource(#{<<"type">> := DB, case emqx_resource:check_and_create( ResourceID, list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), - Config) + #{<<"config">> => Config }) of {ok, _} -> Rule#{<<"resource_id">> => ResourceID}; {error, already_created} -> Rule#{<<"resource_id">> => ResourceID}; {error, Reason} -> - error({load_sql_error, Reason}) + error({load_config_error, Reason}) end. -spec(compile(rule()) -> rule()). diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 1801583c8..04d1b2268 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -19,9 +19,15 @@ fields(authz) -> fields(redis_connector) -> [ {principal, principal()} , {type, #{type => hoconsc:enum([redis])}} - , {config, #{type => map()}} + , {config, #{type => hoconsc:union( + [ hoconsc:ref(emqx_connector_redis, cluster) + , hoconsc:ref(emqx_connector_redis, sentinel) + , hoconsc:ref(emqx_connector_redis, single) + ])} + } , {cmd, query()} ]; + fields(sql_connector) -> [ {principal, principal() } , {type, #{type => hoconsc:enum([mysql, pgsql])}} @@ -33,7 +39,7 @@ fields(simple_rule) -> , {action, #{type => action()}} , {topics, #{type => union_array( [ binary() - , hoconsc:ref(eq_topic) + , hoconsc:ref(eq_topic) ] )}} , {principal, principal()} diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index e4b49dd5a..3f8bea166 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -51,7 +51,12 @@ set_special_configs(emqx_authz) -> emqx_ct_helpers:deps_path(emqx_authz, "test")), Conf = #{<<"authz">> => #{<<"rules">> => - [#{<<"config">> =>#{<<"meck">> => <<"fake">>}, + [#{<<"config">> =>#{ + <<"server">> => <<"127.0.0.1:6379">>, + <<"password">> => <<"public">>, + <<"pool_size">> => 1, + <<"auto_reconnect">> => true + }, <<"principal">> => all, <<"cmd">> => <<"fake cmd">>, <<"type">> => redis} diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index a821b8f13..5ff7d0828 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -7,6 +7,7 @@ [kernel, stdlib, emqx_resource, + eredis_cluster, ecpool ]}, {env,[]}, diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 32ec81616..1aa6263b6 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -19,6 +19,11 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-type server() :: tuple(). +-reflect_type([server/0]). +-typerefl_from_string({server/0, ?MODULE, to_server}). +-export([to_server/1]). + -export([structs/0, fields/1]). %% callbacks of behaviour emqx_resource @@ -39,29 +44,67 @@ structs() -> [""]. fields("") -> + [ {config, #{type => hoconsc:union( + [ hoconsc:ref(cluster) + , hoconsc:ref(single) + , hoconsc:ref(sentinel) + ])} + } + ]; +fields(single) -> + [ {server, #{type => server()}} + , {redis_type, #{type => hoconsc:enum([single]), + default => single}} + ] ++ + redis_fields() ++ + emqx_connector_schema_lib:ssl_fields(); +fields(cluster) -> + [ {servers, #{type => hoconsc:array(server())}} + , {redis_type, #{type => hoconsc:enum([cluster]), + default => cluster}} + ] ++ + redis_fields() ++ + emqx_connector_schema_lib:ssl_fields(); +fields(sentinel) -> + [ {servers, #{type => hoconsc:array(server())}} + , {redis_type, #{type => hoconsc:enum([sentinel]), + default => sentinel}} + , {sentinel, #{type => string()}} + ] ++ redis_fields() ++ - redis_sentinel_fields() ++ emqx_connector_schema_lib:ssl_fields(). on_jsonify(Config) -> Config. %% =================================================================== -on_start(InstId, #{servers := Servers, - redis_type := Type, - database := Database, - pool_size := PoolSize, - auto_reconnect := AutoReconn} = Config) -> +on_start(InstId, #{config :=#{redis_type := Type, + database := Database, + pool_size := PoolSize, + auto_reconnect := AutoReconn} = Config}) -> logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]), - SslOpts = init_ssl_opts(Config, InstId), + Servers = case Type of + single -> [{servers, [maps:get(server, Config)]}]; + _ ->[{servers, maps:get(servers, Config)}] + end, Opts = [{pool_size, PoolSize}, {database, Database}, {password, maps:get(password, Config, "")}, - {auto_reconnect, reconn_interval(AutoReconn)}, - {servers, Servers}], - Options = [{options, SslOpts}, {sentinel, maps:get(sentinel, Config, undefined)}], + {auto_reconnect, reconn_interval(AutoReconn)} + ] ++ Servers, + Options = init_ssl_opts(Config, InstId) ++ + [{sentinel, maps:get(sentinel, Config, undefined)}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), - _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ Options), + case Type of + cluster -> + case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of + {ok, _} -> ok; + {ok, _, _} -> ok; + {error, Reason} -> error(connect_redis_cluster_failed, Reason) + end; + _ -> + _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) + end, {ok, #{poolname => PoolName, type => Type}}. on_stop(InstId, #{poolname := PoolName}) -> @@ -70,7 +113,11 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> logger:debug("redis connector ~p received cmd query: ~p, at state: ~p", [InstId, Command, State]), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) of + Result = case Type of + cluster -> eredis_cluster:q(PoolName, Command); + _ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) + end, + case Result of {error, Reason} -> logger:debug("redis connector ~p do cmd query failed, cmd: ~p, reason: ~p", [InstId, Command, Reason]), emqx_resource:query_failed(AfterCommand); @@ -79,15 +126,15 @@ on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := T end, Result. -on_health_check(_InstId, #{type := cluster, poolname := PoolName}) -> +on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) -> Workers = lists:flatten([gen_server:call(PoolPid, get_all_workers) || PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]), case length(Workers) > 0 andalso lists:all( fun({_, Pid, _, _}) -> eredis_cluster_pool_worker:is_connected(Pid) =:= true end, Workers) of - true -> {ok, true}; - false -> {error, false} + true -> {ok, State}; + false -> {error, test_query_failed, State} end; on_health_check(_InstId, #{poolname := PoolName} = State) -> emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). @@ -118,26 +165,15 @@ init_ssl_opts(_Config, _InstId) -> [{ssl, false}]. redis_fields() -> - [ {redis_type, fun redis_type/1} - , {servers, fun emqx_connector_schema_lib:servers/1} - , {pool_size, fun emqx_connector_schema_lib:pool_size/1} + [ {pool_size, fun emqx_connector_schema_lib:pool_size/1} , {password, fun emqx_connector_schema_lib:password/1} - , {database, fun database/1} + , {database, #{type => integer(), + default => 0}} , {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ]. -redis_sentinel_fields() -> - [ {sentinel, fun sentinel_name/1} - ]. - -sentinel_name(type) -> binary(); -sentinel_name(nullable) -> true; -sentinel_name(_) -> undefined. - -redis_type(type) -> hoconsc:enum([single, sentinel, cluster]); -redis_type(default) -> single; -redis_type(_) -> undefined. - -database(type) -> integer(); -database(default) -> 0; -database(_) -> undefined. +to_server(Server) -> + case string:tokens(Server, ":") of + [Host, Port] -> {ok, {Host, list_to_integer(Port)}}; + _ -> {error, Server} + end. diff --git a/rebar.config b/rebar.config index 257674431..e12ca02ba 100644 --- a/rebar.config +++ b/rebar.config @@ -35,7 +35,7 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.6"}}} - , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.6"}}} + , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.7"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}